Updated Branches: refs/heads/master 18ad59e2c -> 0b448df6a
Merge pull request #450 from kayousterhout/fetch_failures. Closes #450. Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs. Author: Kay Ousterhout <kayousterh...@gmail.com> == Merge branch commits == commit e603784b3a562980e6f1863845097effe2129d3b Author: Kay Ousterhout <kayousterh...@gmail.com> Date: Wed Feb 5 11:34:41 2014 -0800 Re-add check for empty set of failed stages commit d258f0ef50caff4bbb19fb95a6b82186db1935bf Author: Kay Ousterhout <kayousterh...@gmail.com> Date: Wed Jan 15 23:35:41 2014 -0800 Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0b448df6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0b448df6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0b448df6 Branch: refs/heads/master Commit: 0b448df6ac520a7977b1eb51e8c55e33f3fd2da8 Parents: 18ad59e Author: Kay Ousterhout <kayousterh...@gmail.com> Authored: Thu Feb 6 16:15:24 2014 -0800 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Thu Feb 6 16:15:24 2014 -0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 33 +++++++------------- 1 file changed, 11 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b448df6/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8212415..21d16fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -155,7 +155,6 @@ class DAGScheduler( val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures // Missing tasks from each stage val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] - var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] @@ -177,22 +176,6 @@ class DAGScheduler( def start() { eventProcessActor = env.actorSystem.actorOf(Props(new Actor { /** - * A handle to the periodical task, used to cancel the task when the actor is stopped. - */ - var resubmissionTask: Cancellable = _ - - override def preStart() { - import context.dispatcher - /** - * A message is sent to the actor itself periodically to remind the actor to resubmit failed - * stages. In this way, stage resubmission can be done within the same thread context of - * other event processing logic to avoid unnecessary synchronization overhead. - */ - resubmissionTask = context.system.scheduler.schedule( - RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages) - } - - /** * The main event loop of the DAG scheduler. */ def receive = { @@ -207,7 +190,6 @@ class DAGScheduler( if (!processEvent(event)) { submitWaitingStages() } else { - resubmissionTask.cancel() context.stop(self) } } @@ -620,6 +602,8 @@ class DAGScheduler( case ResubmitFailedStages => if (failed.size > 0) { + // Failed stages may be removed by job cancellation, so failed might be empty even if + // the ResubmitFailedStages event has been scheduled. resubmitFailedStages() } @@ -926,7 +910,6 @@ class DAGScheduler( // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) running -= failedStage - failed += failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") @@ -938,10 +921,16 @@ class DAGScheduler( } logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + "); marking it for resubmission") + if (failed.isEmpty && eventProcessActor != null) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. eventProcessActor may be + // null during unit tests. + import env.actorSystem.dispatcher + env.actorSystem.scheduler.scheduleOnce( + RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) + } + failed += failedStage failed += mapStage - // Remember that a fetch failed now; this is used to resubmit the broken - // stages later, after a small wait (to give other tasks the chance to fail) - lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch))