Repository: spark
Updated Branches:
  refs/heads/master 6a9a058e0 -> 8d707b060


[SPARK-24755][CORE] Executor loss can cause task to not be resubmitted

**Description**
As described in 
[SPARK-24755](https://issues.apache.org/jira/browse/SPARK-24755), when 
speculation is enabled, there is scenario that executor loss can cause task to 
not be resubmitted.
This patch changes the variable killedByOtherAttempt to keeps track of the 
taskId of tasks that are killed by other attempt. By doing this, we can still 
prevent resubmitting task killed by other attempt while resubmit successful 
attempt when executor lost.

**How was this patch tested?**
A UT is added based on the UT written by xuanyuanking with modification to 
simulate the scenario described in SPARK-24755.

Author: Hieu Huynh <“[email protected]”>

Closes #21729 from hthuynh2/SPARK_24755.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d707b06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d707b06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d707b06

Branch: refs/heads/master
Commit: 8d707b06003bc97d06630b22e6ae7c35f99b3cdd
Parents: 6a9a058
Author: Hieu Huynh <“[email protected]”>
Authored: Thu Jul 19 09:52:07 2018 -0500
Committer: Thomas Graves <[email protected]>
Committed: Thu Jul 19 09:52:07 2018 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala |  10 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 112 +++++++++++++++++++
 2 files changed, 117 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d707b06/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6071605..defed1e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -84,10 +84,10 @@ private[spark] class TaskSetManager(
   val successful = new Array[Boolean](numTasks)
   private val numFailures = new Array[Int](numTasks)
 
-  // Set the coresponding index of Boolean var when the task killed by other 
attempt tasks,
-  // this happened while we set the `spark.speculation` to true. The task 
killed by others
+  // Add the tid of task into this HashSet when the task is killed by other 
attempt tasks.
+  // This happened while we set the `spark.speculation` to true. The task 
killed by others
   // should not resubmit while executor lost.
-  private val killedByOtherAttempt: Array[Boolean] = new 
Array[Boolean](numTasks)
+  private val killedByOtherAttempt = new HashSet[Long]
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
   private[scheduler] var tasksSuccessful = 0
@@ -735,7 +735,7 @@ private[spark] class TaskSetManager(
       logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
         s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
         s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
-      killedByOtherAttempt(index) = true
+      killedByOtherAttempt += attemptInfo.taskId
       sched.backend.killTask(
         attemptInfo.taskId,
         attemptInfo.executorId,
@@ -947,7 +947,7 @@ private[spark] class TaskSetManager(
         && !isZombie) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = taskInfos(tid).index
-        if (successful(index) && !killedByOtherAttempt(index)) {
+        if (successful(index) && !killedByOtherAttempt.contains(tid)) {
           successful(index) = false
           copiesRunning(index) -= 1
           tasksSuccessful -= 1

http://git-wip-us.apache.org/repos/asf/spark/blob/8d707b06/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ae571e5..206b9f4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1414,6 +1414,118 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     taskSetManager2.checkSpeculatableTasks(0)
   }
 
+
+  test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
+    val conf = new SparkConf().set("spark.speculation", "true")
+    sc = new SparkContext("local", "test", conf)
+    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+    sc.conf.set("spark.speculation.multiplier", "0.0")
+
+    sc.conf.set("spark.speculation.quantile", "0.5")
+    sc.conf.set("spark.speculation", "true")
+
+    var killTaskCalled = false
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+      ("exec2", "host2"), ("exec3", "host3"))
+    sched.initialize(new FakeSchedulerBackend() {
+      override def killTask(
+          taskId: Long,
+          executorId: String,
+          interruptThread: Boolean,
+          reason: String): Unit = {
+        // Check the only one killTask event in this case, which triggered by
+        // task 2.1 completed.
+        assert(taskId === 2)
+        assert(executorId === "exec3")
+        assert(interruptThread)
+        assert(reason === "another attempt succeeded")
+        killTaskCalled = true
+      }
+    })
+
+    // Keep track of the index of tasks that are resubmitted,
+    // so that the test can check that task is resubmitted correctly
+    var resubmittedTasks = new mutable.HashSet[Int]
+    val dagScheduler = new FakeDAGScheduler(sc, sched) {
+      override def taskEnded(
+          task: Task[_],
+          reason: TaskEndReason,
+          result: Any,
+          accumUpdates: Seq[AccumulatorV2[_, _]],
+          taskInfo: TaskInfo): Unit = {
+        super.taskEnded(task, reason, result, accumUpdates, taskInfo)
+        reason match {
+          case Resubmitted => resubmittedTasks += taskInfo.index
+          case _ =>
+        }
+      }
+    }
+    sched.dagScheduler.stop()
+    sched.setDAGScheduler(dagScheduler)
+
+    val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host3", "exec3")),
+      Seq(TaskLocation("host2", "exec2")))
+
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 4 tasks to start
+    for ((exec, host) <- Seq(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec3" -> "host3",
+        "exec2" -> "host2")) {
+      val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === exec)
+      // Add an extra assert to make sure task 2.0 is running on exec3
+      if (task.index == 2) {
+        assert(task.attemptNumber === 0)
+        assert(task.executorId === "exec3")
+      }
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+    clock.advance(1)
+    // Complete the 2 tasks and leave 2 task in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the 
threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need 
to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+
+    // Offer resource to start the speculative attempt for the running task 2.0
+    val taskOption = manager.resourceOffer("exec2", "host2", ANY)
+    assert(taskOption.isDefined)
+    val task4 = taskOption.get
+    assert(task4.index === 2)
+    assert(task4.taskId === 4)
+    assert(task4.executorId === "exec2")
+    assert(task4.attemptNumber === 1)
+    // Complete the speculative attempt for the running task
+    manager.handleSuccessfulTask(4, createTaskResult(2, accumUpdatesByTask(2)))
+    // Make sure schedBackend.killTask(2, "exec3", true, "another attempt 
succeeded") gets called
+    assert(killTaskCalled)
+
+    assert(resubmittedTasks.isEmpty)
+    // Host 2 Losts, meaning we lost the map output task4
+    manager.executorLost("exec2", "host2", SlaveLost())
+    // Make sure that task with index 2 is re-submitted
+    assert(resubmittedTasks.contains(2))
+
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): 
DirectTaskResult[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to