Repository: spark
Updated Branches:
  refs/heads/branch-2.2 17db57213 -> 144426cff


[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to 
MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, 
but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median 
java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is 
enabled

Author: sychen <syc...@ctrip.com>

Closes #21656 from cxzl25/fix_MedianHeap_empty.

(cherry picked from commit c8bee932cb644627c4049b5a07dd8028968572d9)
Signed-off-by: Thomas Graves <tgra...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144426cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144426cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144426cf

Branch: refs/heads/branch-2.2
Commit: 144426cffd6e4b26b676004f5489e218140f7df2
Parents: 17db572
Author: sychen <syc...@ctrip.com>
Authored: Wed Jul 18 13:24:41 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Wed Jul 18 13:26:24 2018 -0500

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  7 ++-
 .../apache/spark/scheduler/TaskSetManager.scala |  7 ++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 49 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index df6407b..f8c62b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -701,9 +701,12 @@ private[spark] class TaskSchedulerImpl private[scheduler](
    * do not also submit those same tasks.  That also means that a task 
completion from an  earlier
    * attempt can lead to the entire stage getting marked as successful.
    */
-  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, 
partitionId: Int) = {
+  private[scheduler] def markPartitionCompletedInAllTaskSets(
+      stageId: Int,
+      partitionId: Int,
+      taskInfo: TaskInfo) = {
     taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm 
=>
-      tsm.markPartitionCompleted(partitionId)
+      tsm.markPartitionCompleted(partitionId, taskInfo)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d9515fb..705b896 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -748,7 +748,7 @@ private[spark] class TaskSetManager(
     }
     // There may be multiple tasksets for this stage -- we let all of them 
know that the partition
     // was completed.  This may result in some of the tasksets getting 
completed.
-    sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
+    sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId, info)
     // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which 
holds the
     // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, 
we should not
     // "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
@@ -759,9 +759,12 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
-  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+  private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: 
TaskInfo): Unit = {
     partitionToIndex.get(partitionId).foreach { index =>
       if (!successful(index)) {
+        if (speculationEnabled && !isZombie) {
+          successfulTaskDurations.insert(taskInfo.duration)
+        }
         tasksSuccessful += 1
         successful(index) = true
         if (tasksSuccessful == numTasks) {

http://git-wip-us.apache.org/repos/asf/spark/blob/144426cf/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 7d31a66..904f0b6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1214,6 +1214,55 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
+    val conf = new SparkConf().set("spark.speculation", "true")
+    sc = new SparkContext("local", "test", conf)
+    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+    sc.conf.set("spark.speculation.multiplier", "0.0")
+    sc.conf.set("spark.speculation.quantile", "0.1")
+    sc.conf.set("spark.speculation", "true")
+
+    sched = new FakeTaskScheduler(sc)
+    sched.initialize(new FakeSchedulerBackend())
+
+    val dagScheduler = new FakeDAGScheduler(sc, sched)
+    sched.setDAGScheduler(dagScheduler)
+
+    val taskSet1 = FakeTask.createTaskSet(10)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet1.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    sched.submitTasks(taskSet1)
+    sched.resourceOffers(
+      (0 until 10).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
+
+    val taskSetManager1 = sched.taskSetManagerForAttempt(0, 0).get
+
+    // fail fetch
+    taskSetManager1.handleFailedTask(
+      taskSetManager1.taskAttempts.head.head.taskId, TaskState.FAILED,
+      FetchFailed(null, 0, 0, 0, "fetch failed"))
+
+    assert(taskSetManager1.isZombie)
+    assert(taskSetManager1.runningTasks === 9)
+
+    val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1)
+    sched.submitTasks(taskSet2)
+    sched.resourceOffers(
+      (11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
+
+    // Complete the 2 tasks and leave 8 task in running
+    for (id <- Set(0, 1)) {
+      taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    val taskSetManager2 = sched.taskSetManagerForAttempt(0, 1).get
+    assert(!taskSetManager2.successfulTaskDurations.isEmpty())
+    taskSetManager2.checkSpeculatableTasks(0)
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): 
DirectTaskResult[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to