Repository: incubator-spark Updated Branches: refs/heads/master c0ef3afa8 -> d8d190efd
SPARK-1124: Fix infinite retries of reduce stage when a map stage failed In the previous code, if you had a failing map stage and then tried to run reduce stages on it repeatedly, the first reduce stage would fail correctly, but the later ones would mistakenly believe that all map outputs are available and start failing infinitely with fetch failures from "null". Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/cd32d5e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/cd32d5e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/cd32d5e4 Branch: refs/heads/master Commit: cd32d5e4dee1291e4509e5965322b7ffe620b1f3 Parents: c0ef3af Author: Matei Zaharia <ma...@databricks.com> Authored: Sun Feb 23 23:45:48 2014 -0800 Committer: Matei Zaharia <ma...@databricks.com> Committed: Sun Feb 23 23:48:32 2014 -0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 31 +++++++++++--------- .../scala/org/apache/spark/FailureSuite.scala | 13 ++++++++ 2 files changed, 30 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cd32d5e4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 729f518..789d5e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -272,8 +272,10 @@ class DAGScheduler( if (mapOutputTracker.has(shuffleDep.shuffleId)) { val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) val locs = MapOutputTracker.deserializeMapStatuses(serLocs) - for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i)) - stage.numAvailableOutputs = locs.size + for (i <- 0 until locs.size) { + stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing + } + stage.numAvailableOutputs = locs.count(_ != null) } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown @@ -373,25 +375,26 @@ class DAGScheduler( } else { def removeStage(stageId: Int) { // data structures based on Stage - stageIdToStage.get(stageId).foreach { s => - if (running.contains(s)) { + for (stage <- stageIdToStage.get(stageId)) { + if (running.contains(stage)) { logDebug("Removing running stage %d".format(stageId)) - running -= s + running -= stage + } + stageToInfos -= stage + for (shuffleDep <- stage.shuffleDep) { + shuffleToMapStage.remove(shuffleDep.shuffleId) } - stageToInfos -= s - shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId => - shuffleToMapStage.remove(shuffleId)) - if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) { + if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) { logDebug("Removing pending status for stage %d".format(stageId)) } - pendingTasks -= s - if (waiting.contains(s)) { + pendingTasks -= stage + if (waiting.contains(stage)) { logDebug("Removing stage %d from waiting set.".format(stageId)) - waiting -= s + waiting -= stage } - if (failed.contains(s)) { + if (failed.contains(stage)) { logDebug("Removing stage %d from failed set.".format(stageId)) - failed -= s + failed -= stage } } // data structures based on StageId http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cd32d5e4/core/src/test/scala/org/apache/spark/FailureSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index ac3c867..f3fb64d 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -81,6 +81,19 @@ class FailureSuite extends FunSuite with LocalSparkContext { FailureSuiteState.clear() } + // Run a map-reduce job in which the map stage always fails. + test("failure in a map stage") { + sc = new SparkContext("local", "test") + val data = sc.makeRDD(1 to 3).map(x => { throw new Exception; (x, x) }).groupByKey(3) + intercept[SparkException] { + data.collect() + } + // Make sure that running new jobs with the same map stage also fails + intercept[SparkException] { + data.collect() + } + } + test("failure because task results are not serializable") { sc = new SparkContext("local[1,1]", "test") val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)