Put the periodical resubmitFailedStages() call into a scheduled task
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ba552851 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ba552851 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ba552851 Branch: refs/heads/master Commit: ba552851771cf8eaf90b72b661c3df60080d0ef9 Parents: 765ebca Author: Lian, Cheng <[email protected]> Authored: Mon Nov 11 01:25:35 2013 +0800 Committer: Lian, Cheng <[email protected]> Committed: Mon Nov 11 01:25:35 2013 +0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 28 +++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba552851/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a73a6e1..7499570 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -21,7 +21,8 @@ import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import akka.actor.{Props, Actor, ActorRef} +import akka.actor._ +import akka.util.duration._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import org.apache.spark._ @@ -110,6 +111,13 @@ class DAGScheduler( val POLL_TIMEOUT = 10L private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { + override def preStart() { + env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { + if (failed.size > 0) + resubmitFailedStages() + } + } + /** * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure * events and responds by launching tasks. This runs in a dedicated thread and receives events @@ -119,22 +127,10 @@ class DAGScheduler( case event: DAGSchedulerEvent => logDebug("Got event of type " + event.getClass.getName) - if (!processEvent(event)) { - val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability - // Periodically resubmit failed stages if some map output fetches have failed and we have - // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, - // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at - // the same time, so we want to make sure we've identified all the reduce tasks that depend - // on the failed node. - if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - resubmitFailedStages() - } else { - submitWaitingStages() - } - } - else { + if (!processEvent(event)) + submitWaitingStages() + else context.stop(self) - } } }))
