Repository: spark
Updated Branches:
  refs/heads/master 331f0b10f -> b78c65b03


[SPARK-5259] [CORE] don't submit stage until its dependencies map outputs are 
registered

Track pending tasks by partition ID instead of Task objects.

Before this change, failure & retry could result in a case where a stage got 
submitted before the map output from its dependencies get registered.  This was 
due to an error in the condition for registering map outputs.

Author: hushan[胡珊] <[email protected]>
Author: Imran Rashid <[email protected]>

Closes #7699 from squito/SPARK-5259.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b78c65b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b78c65b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b78c65b0

Branch: refs/heads/master
Commit: b78c65b03ae87a3ba348c9d29ff4c296349eb49c
Parents: 331f0b1
Author: hushan[胡珊] <[email protected]>
Authored: Mon Sep 21 14:26:15 2015 -0500
Committer: Imran Rashid <[email protected]>
Committed: Mon Sep 21 14:26:15 2015 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  12 +-
 .../org/apache/spark/scheduler/Stage.scala      |   2 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   4 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 197 +++++++++++++++++--
 4 files changed, 191 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b78c65b0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3c9a66e..394228b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -944,7 +944,7 @@ class DAGScheduler(
   private def submitMissingTasks(stage: Stage, jobId: Int) {
     logDebug("submitMissingTasks(" + stage + ")")
     // Get our pending tasks and remember them in our pendingTasks entry
-    stage.pendingTasks.clear()
+    stage.pendingPartitions.clear()
 
     // First figure out the indexes of partition ids to compute.
     val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
@@ -1060,8 +1060,8 @@ class DAGScheduler(
 
     if (tasks.size > 0) {
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " 
(" + stage.rdd + ")")
-      stage.pendingTasks ++= tasks
-      logDebug("New pending tasks: " + stage.pendingTasks)
+      stage.pendingPartitions ++= tasks.map(_.partitionId)
+      logDebug("New pending partitions: " + stage.pendingPartitions)
       taskScheduler.submitTasks(new TaskSet(
         tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, 
properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
@@ -1152,7 +1152,7 @@ class DAGScheduler(
       case Success =>
         listenerBus.post(SparkListenerTaskEnd(stageId, 
stage.latestInfo.attemptId, taskType,
           event.reason, event.taskInfo, event.taskMetrics))
-        stage.pendingTasks -= task
+        stage.pendingPartitions -= task.partitionId
         task match {
           case rt: ResultTask[_, _] =>
             // Cast to ResultStage here because it's part of the ResultTask
@@ -1198,7 +1198,7 @@ class DAGScheduler(
               shuffleStage.addOutputLoc(smt.partitionId, status)
             }
 
-            if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingTasks.isEmpty) {
+            if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
               markStageAsFinished(shuffleStage)
               logInfo("looking for newly runnable stages")
               logInfo("running: " + runningStages)
@@ -1242,7 +1242,7 @@ class DAGScheduler(
 
       case Resubmitted =>
         logInfo("Resubmitted " + task + ", so marking it as still running")
-        stage.pendingTasks += task
+        stage.pendingPartitions += task.partitionId
 
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) 
=>
         val failedStage = stageIdToStage(task.stageId)

http://git-wip-us.apache.org/repos/asf/spark/blob/b78c65b0/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index b37eccb..a3829c3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -66,7 +66,7 @@ private[scheduler] abstract class Stage(
   /** Set of jobs that this stage belongs to. */
   val jobIds = new HashSet[Int]
 
-  var pendingTasks = new HashSet[Task[_]]
+  val pendingPartitions = new HashSet[Int]
 
   /** The ID to use for the next new attempt for this stage. */
   private var nextAttemptId: Int = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/b78c65b0/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 62af903..c02597c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -487,8 +487,8 @@ private[spark] class TaskSetManager(
           // a good proxy to task serialization time.
           // val timeTaken = clock.getTime() - startTime
           val taskName = s"task ${info.id} in stage ${taskSet.id}"
-          logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
-              taskName, taskId, host, taskLocality, serializedTask.limit))
+          logInfo(s"Starting $taskName (TID $taskId, $host, partition 
${task.partitionId}," +
+            s"$taskLocality, ${serializedTask.limit} bytes)")
 
           sched.dagScheduler.taskStarted(task, info)
           return Some(new TaskDescription(taskId = taskId, attemptNumber = 
attemptNum, execId,

http://git-wip-us.apache.org/repos/asf/spark/blob/b78c65b0/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 1c55f90..6b5bcf0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -479,8 +479,8 @@ class DAGSchedulerSuite
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     complete(taskSets(0), Seq(
-        (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
-        (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+        (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+        (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
     // the 2nd ResultTask failed
     complete(taskSets(1), Seq(
         (Success, 42),
@@ -490,7 +490,7 @@ class DAGSchedulerSuite
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
-    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 
reduceRdd.partitions.size))))
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 
reduceRdd.partitions.length))))
     // we can see both result blocks now
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1.host).toSet ===
       HashSet("hostA", "hostB"))
@@ -782,8 +782,8 @@ class DAGSchedulerSuite
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     complete(taskSets(0), Seq(
-      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
-      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+      (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
     // The MapOutputTracker should know about both map output locations.
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1.host).toSet ===
       HashSet("hostA", "hostB"))
@@ -1036,6 +1036,173 @@ class DAGSchedulerSuite
   }
 
   /**
+   * This test runs a three stage job, with a fetch failure in stage 1.  but 
during the retry, we
+   * have completions from both the first & second attempt of stage 1.  So all 
the map output is
+   * available before we finish any task set for stage 1.  We want to make 
sure that we don't
+   * submit stage 2 until the map output for stage 1 is registered
+   */
+  test("don't submit stage until its dependencies map outputs are registered 
(SPARK-5259)") {
+    val firstRDD = new MyRDD(sc, 3, Nil)
+    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
+    val firstShuffleId = firstShuffleDep.shuffleId
+    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+    submit(reduceRdd, Array(0))
+
+    // things start out smoothly, stage 0 completes with no issues
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
+      (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
+    ))
+
+    // then one executor dies, and a task fails in stage 1
+    runEvent(ExecutorLost("exec-hostA"))
+    runEvent(CompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
+      null,
+      null,
+      createFakeTaskInfo(),
+      null))
+
+    // so we resubmit stage 0, which completes happily
+    scheduler.resubmitFailedStages()
+    val stage0Resubmit = taskSets(2)
+    assert(stage0Resubmit.stageId == 0)
+    assert(stage0Resubmit.stageAttemptId === 1)
+    val task = stage0Resubmit.tasks(0)
+    assert(task.partitionId === 2)
+    runEvent(CompletionEvent(
+      task,
+      Success,
+      makeMapStatus("hostC", shuffleMapRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+
+    // now here is where things get tricky : we will now have a task set 
representing
+    // the second attempt for stage 1, but we *also* have some tasks for the 
first attempt for
+    // stage 1 still going
+    val stage1Resubmit = taskSets(3)
+    assert(stage1Resubmit.stageId == 1)
+    assert(stage1Resubmit.stageAttemptId === 1)
+    assert(stage1Resubmit.tasks.length === 3)
+
+    // we'll have some tasks finish from the first attempt, and some finish 
from the second attempt,
+    // so that we actually have all stage outputs, though no attempt has 
completed all its
+    // tasks
+    runEvent(CompletionEvent(
+      taskSets(3).tasks(0),
+      Success,
+      makeMapStatus("hostC", reduceRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+    runEvent(CompletionEvent(
+      taskSets(3).tasks(1),
+      Success,
+      makeMapStatus("hostC", reduceRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+    // late task finish from the first attempt
+    runEvent(CompletionEvent(
+      taskSets(1).tasks(2),
+      Success,
+      makeMapStatus("hostB", reduceRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+
+    // What should happen now is that we submit stage 2.  However, we might 
not see an error
+    // b/c of DAGScheduler's error handling (it tends to swallow errors and 
just log them).  But
+    // we can check some conditions.
+    // Note that the really important thing here is not so much that we submit 
stage 2 *immediately*
+    // but that we don't end up with some error from these interleaved 
completions.  It would also
+    // be OK (though sub-optimal) if stage 2 simply waited until the 
resubmission of stage 1 had
+    // all its tasks complete
+
+    // check that we have all the map output for stage 0 (it should have been 
there even before
+    // the last round of completions from stage 1, but just to double check it 
hasn't been messed
+    // up) and also the newly available stage 1
+    val stageToReduceIdxs = Seq(
+      0 -> (0 until 3),
+      1 -> (0 until 1)
+    )
+    for {
+      (stage, reduceIdxs) <- stageToReduceIdxs
+      reduceIdx <- reduceIdxs
+    } {
+      // this would throw an exception if the map status hadn't been registered
+      val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx)
+      // really we should have already thrown an exception rather than fail 
either of these
+      // asserts, but just to be extra defensive let's double check the 
statuses are OK
+      assert(statuses != null)
+      assert(statuses.nonEmpty)
+    }
+
+    // and check that stage 2 has been submitted
+    assert(taskSets.size == 5)
+    val stage2TaskSet = taskSets(4)
+    assert(stage2TaskSet.stageId == 2)
+    assert(stage2TaskSet.stageAttemptId == 0)
+  }
+
+  /**
+   * We lose an executor after completing some shuffle map tasks on it.  Those 
tasks get
+   * resubmitted, and when they finish the job completes normally
+   */
+  test("register map outputs correctly after ExecutorLost and task 
Resubmitted") {
+    val firstRDD = new MyRDD(sc, 3, Nil)
+    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
+    val reduceRdd = new MyRDD(sc, 5, List(firstShuffleDep))
+    submit(reduceRdd, Array(0))
+
+    // complete some of the tasks from the first stage, on one host
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(0), Success,
+      makeMapStatus("hostA", reduceRdd.partitions.length), null, 
createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(1), Success,
+      makeMapStatus("hostA", reduceRdd.partitions.length), null, 
createFakeTaskInfo(), null))
+
+    // now that host goes down
+    runEvent(ExecutorLost("exec-hostA"))
+
+    // so we resubmit those tasks
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), 
null))
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(1), Resubmitted, null, null, createFakeTaskInfo(), 
null))
+
+    // now complete everything on a different host
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))
+    ))
+
+    // now we should submit stage 1, and the map output from stage 0 should be 
registered
+
+    // check that we have all the map output for stage 0
+    (0 until reduceRdd.partitions.length).foreach { reduceIdx =>
+      val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
+      // really we should have already thrown an exception rather than fail 
either of these
+      // asserts, but just to be extra defensive let's double check the 
statuses are OK
+      assert(statuses != null)
+      assert(statuses.nonEmpty)
+    }
+
+    // and check that stage 1 has been submitted
+    assert(taskSets.size == 2)
+    val stage1TaskSet = taskSets(1)
+    assert(stage1TaskSet.stageId == 1)
+    assert(stage1TaskSet.stageAttemptId == 0)
+  }
+
+  /**
    * Makes sure that failures of stage used by multiple jobs are correctly 
handled.
    *
    * This test creates the following dependency graph:
@@ -1393,8 +1560,8 @@ class DAGSchedulerSuite
     // Submit a map stage by itself
     submitMapStage(shuffleDep)
     complete(taskSets(0), Seq(
-      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
-      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+      (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
     assert(results.size === 1)
     results.clear()
     assertDataStructuresEmpty()
@@ -1407,7 +1574,7 @@ class DAGSchedulerSuite
     // Ask the scheduler to try it again; TaskSet 2 will rerun the map task 
that we couldn't fetch
     // from, then TaskSet 3 will run the reduce stage
     scheduler.resubmitFailedStages()
-    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 
reduceRdd.partitions.size))))
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 
reduceRdd.partitions.length))))
     complete(taskSets(3), Seq((Success, 43)))
     assert(results === Map(0 -> 42, 1 -> 43))
     results.clear()
@@ -1452,8 +1619,8 @@ class DAGSchedulerSuite
     // Complete the first stage
     assert(taskSets(0).stageId === 0)
     complete(taskSets(0), Seq(
-      (Success, makeMapStatus("hostA", rdd1.partitions.size)),
-      (Success, makeMapStatus("hostB", rdd1.partitions.size))))
+      (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+      (Success, makeMapStatus("hostB", rdd1.partitions.length))))
     assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
     assert(listener1.results.size === 1)
@@ -1461,7 +1628,7 @@ class DAGSchedulerSuite
     // When attempting the second stage, show a fetch failure
     assert(taskSets(1).stageId === 1)
     complete(taskSets(1), Seq(
-      (Success, makeMapStatus("hostA", rdd2.partitions.size)),
+      (Success, makeMapStatus("hostA", rdd2.partitions.length)),
       (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 
"ignored"), null)))
     scheduler.resubmitFailedStages()
     assert(listener2.results.size === 0)    // Second stage listener should 
not have a result yet
@@ -1469,7 +1636,7 @@ class DAGSchedulerSuite
     // Stage 0 should now be running as task set 2; make its task succeed
     assert(taskSets(2).stageId === 0)
     complete(taskSets(2), Seq(
-      (Success, makeMapStatus("hostC", rdd2.partitions.size))))
+      (Success, makeMapStatus("hostC", rdd2.partitions.length))))
     assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
     assert(listener2.results.size === 0)    // Second stage listener should 
still not have a result
@@ -1477,8 +1644,8 @@ class DAGSchedulerSuite
     // Stage 1 should now be running as task set 3; make its first task succeed
     assert(taskSets(3).stageId === 1)
     complete(taskSets(3), Seq(
-      (Success, makeMapStatus("hostB", rdd2.partitions.size)),
-      (Success, makeMapStatus("hostD", rdd2.partitions.size))))
+      (Success, makeMapStatus("hostB", rdd2.partitions.length)),
+      (Success, makeMapStatus("hostD", rdd2.partitions.length))))
     assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 
0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
     assert(listener2.results.size === 1)
@@ -1494,7 +1661,7 @@ class DAGSchedulerSuite
     // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun 
stage 2
     assert(taskSets(5).stageId === 1)
     complete(taskSets(5), Seq(
-      (Success, makeMapStatus("hostE", rdd2.partitions.size))))
+      (Success, makeMapStatus("hostE", rdd2.partitions.length))))
     complete(taskSets(6), Seq(
       (Success, 53)))
     assert(listener3.results === Map(0 -> 52, 1 -> 53))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to