Repository: spark Updated Branches: refs/heads/master f6c447f87 -> 3ddb9b323
[SPARK-10247] [CORE] improve readability of a test case in DAGSchedulerSuite This is pretty minor, just trying to improve the readability of `DAGSchedulerSuite`, I figure every bit helps. Before whenever I read this test, I never knew what "should work" and "should be ignored" really meant -- this adds some asserts & updates comments to make it more clear. Also some reformatting per a suggestion from markhamstra on https://github.com/apache/spark/pull/7699 Author: Imran Rashid <iras...@cloudera.com> Closes #8434 from squito/SPARK-10247. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ddb9b32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ddb9b32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ddb9b32 Branch: refs/heads/master Commit: 3ddb9b32335154e47890a0c761e0dfea3ccaac7b Parents: f6c447f Author: Imran Rashid <iras...@cloudera.com> Authored: Wed Sep 2 22:14:50 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed Sep 2 22:14:50 2015 -0700 ---------------------------------------------------------------------- .../spark/scheduler/DAGSchedulerSuite.scala | 57 ++++++++++++++++---- 1 file changed, 47 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3ddb9b32/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 62957c6..80f64de 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -926,27 +926,64 @@ class DAGSchedulerSuite val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) + // pretend we were told hostA went away val oldEpoch = mapOutputTracker.getEpoch runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) + + // now start completing some tasks in the shuffle map stage, under different hosts + // and epochs, and make sure scheduler updates its state correctly val taskSet = taskSets(0) + val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage] + assert(shuffleStage.numAvailableOutputs === 0) + // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) - // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent( + taskSet.tasks(0), + Success, + makeMapStatus("hostA", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) + assert(shuffleStage.numAvailableOutputs === 0) + + // should work because it's a non-failed host (so the available map outputs will increase) + runEvent(CompletionEvent( + taskSet.tasks(0), + Success, + makeMapStatus("hostB", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) + assert(shuffleStage.numAvailableOutputs === 1) + // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) - // should work because it's a new epoch + runEvent(CompletionEvent( + taskSet.tasks(0), + Success, + makeMapStatus("hostA", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) + assert(shuffleStage.numAvailableOutputs === 1) + + // should work because it's a new epoch, which will increase the number of available map + // outputs, and also finish the stage taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", - reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent( + taskSet.tasks(1), + Success, + makeMapStatus("hostA", reduceRdd.partitions.size), + null, + createFakeTaskInfo(), + null)) + assert(shuffleStage.numAvailableOutputs === 2) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) + + // finish the next stage normally, which completes the job complete(taskSets(1), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org