Repository: spark
Updated Branches:
  refs/heads/master 69d0da637 -> 5cbd3b59b


[SPARK-19560] Improve DAGScheduler tests.

This commit improves the tests that check the case when a
ShuffleMapTask completes successfully on an executor that has
failed.  This commit improves the commenting around the existing
test for this, and adds some additional checks to make it more
clear what went wrong if the tests fail (the fact that these
tests are hard to understand came up in the context of markhamstra's
proposed fix for #16620).

This commit also removes a test that I realized tested exactly
the same functionality.

markhamstra, I verified that the new version of the test still fails (and
in a more helpful way) for your proposed change for #16620.

Author: Kay Ousterhout <[email protected]>

Closes #16892 from kayousterhout/SPARK-19560.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cbd3b59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cbd3b59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cbd3b59

Branch: refs/heads/master
Commit: 5cbd3b59ba6735c59153416fa15721af6da09acf
Parents: 69d0da6
Author: Kay Ousterhout <[email protected]>
Authored: Fri Feb 24 11:42:45 2017 -0800
Committer: Kay Ousterhout <[email protected]>
Committed: Fri Feb 24 11:42:45 2017 -0800

----------------------------------------------------------------------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 58 +++++++++++++++++---
 1 file changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5cbd3b59/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c735220..8eaf9df 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1569,24 +1569,45 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     assertDataStructuresEmpty()
   }
 
-  test("run trivial shuffle with out-of-band failure and retry") {
+  /**
+   * In this test, we run a map stage where one of the executors fails but we 
still receive a
+   * "zombie" complete message from a task that ran on that executor. We want 
to make sure the
+   * stage is resubmitted so that the task that ran on the failed executor is 
re-executed, and
+   * that the stage is only marked as finished once that task completes.
+   */
+  test("run trivial shuffle with out-of-band executor failure and retry") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(2))
     val shuffleId = shuffleDep.shuffleId
     val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = 
mapOutputTracker)
     submit(reduceRdd, Array(0))
-    // blockManagerMaster.removeExecutor("exec-hostA")
-    // pretend we were told hostA went away
+    // Tell the DAGScheduler that hostA was lost.
     runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
-    // DAGScheduler will immediately resubmit the stage after it appears to 
have no pending tasks
-    // rather than marking it is as failed and waiting.
     complete(taskSets(0), Seq(
       (Success, makeMapStatus("hostA", 1)),
       (Success, makeMapStatus("hostB", 1))))
+
+    // At this point, no more tasks are running for the stage (and the 
TaskSetManager considers the
+    // stage complete), but the tasks that ran on HostA need to be re-run, so 
the DAGScheduler
+    // should re-submit the stage with one task (the task that originally ran 
on HostA).
+    assert(taskSets.size === 2)
+    assert(taskSets(1).tasks.size === 1)
+
+    // Make sure that the stage that was re-submitted was the ShuffleMapStage 
(not the reduce
+    // stage, which shouldn't be run until all of the tasks in the 
ShuffleMapStage complete on
+    // alive executors).
+    assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
+
     // have hostC complete the resubmitted task
     complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+    // Make sure that the reduce stage was now submitted.
+    assert(taskSets.size === 3)
+    assert(taskSets(2).tasks(0).isInstanceOf[ResultTask[_, _]])
+
+    // Complete the reduce stage.
     complete(taskSets(2), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty()
@@ -2031,6 +2052,11 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
    * In this test, we run a map stage where one of the executors fails but we 
still receive a
    * "zombie" complete message from that executor. We want to make sure the 
stage is not reported
    * as done until all tasks have completed.
+   *
+   * Most of the functionality in this test is tested in "run trivial shuffle 
with out-of-band
+   * executor failure and retry".  However, that test uses ShuffleMapStages 
that are followed by
+   * a ResultStage, whereas in this test, the ShuffleMapStage is tested in 
isolation, without a
+   * ResultStage after it.
    */
   test("map stage submission with executor failure late map task completions") 
{
     val shuffleMapRdd = new MyRDD(sc, 3, Nil)
@@ -2042,7 +2068,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
     runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, 
makeMapStatus("hostA", 2)))
     assert(results.size === 0)    // Map stage job should not be complete yet
 
-    // Pretend host A was lost
+    // Pretend host A was lost. This will cause the TaskSetManager to resubmit 
task 0, because it
+    // completed on hostA.
     val oldEpoch = mapOutputTracker.getEpoch
     runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
     val newEpoch = mapOutputTracker.getEpoch
@@ -2054,13 +2081,26 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 
     // A completion from another task should work because it's a non-failed 
host
     runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, 
makeMapStatus("hostB", 2)))
-    assert(results.size === 0)    // Map stage job should not be complete yet
+
+    // At this point, no more tasks are running for the stage (and the 
TaskSetManager considers
+    // the stage complete), but the task that ran on hostA needs to be re-run, 
so the map stage
+    // shouldn't be marked as complete, and the DAGScheduler should re-submit 
the stage.
+    assert(results.size === 0)
+    assert(taskSets.size === 2)
 
     // Now complete tasks in the second task set
     val newTaskSet = taskSets(1)
-    assert(newTaskSet.tasks.size === 2)     // Both tasks 0 and 1 were on hostA
+    // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on 
hostA).
+    assert(newTaskSet.tasks.size === 2)
+    // Complete task 0 from the original task set (i.e., not hte one that's 
currently active).
+    // This should still be counted towards the job being complete (but 
there's still one
+    // outstanding task).
     runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, 
makeMapStatus("hostB", 2)))
-    assert(results.size === 0)    // Map stage job should not be complete yet
+    assert(results.size === 0)
+
+    // Complete the final task, from the currently active task set.  There's 
still one
+    // running task, task 0 in the currently active stage attempt, but the 
success of task 0 means
+    // the DAGScheduler can mark the stage as finished.
     runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, 
makeMapStatus("hostB", 2)))
     assert(results.size === 1)    // Map stage job should now finally be 
complete
     assertDataStructuresEmpty()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to