Updated some inline comments in DAGScheduler
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1e250860 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1e250860 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1e250860 Branch: refs/heads/master Commit: 1e25086009ff6421790609e406d00e1b978d6dbe Parents: 18def5d Author: Lian, Cheng <[email protected]> Authored: Fri Nov 29 15:56:47 2013 +0800 Committer: Lian, Cheng <[email protected]> Committed: Fri Nov 29 15:56:47 2013 +0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 31 ++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e250860/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e2bf08c..08cf763 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -154,24 +154,43 @@ class DAGScheduler( val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup) + /** + * Starts the event processing actor. The actor has two responsibilities: + * + * 1. Waits for events like job submission, task finished, task failure etc., and calls + * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them. + * 2. Schedules a periodical task to resubmit failed stages. + * + * NOTE: the actor cannot be started in the constructor, because the periodical task references + * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus + * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed. + */ def start() { eventProcessActor = env.actorSystem.actorOf(Props(new Actor { var resubmissionTask: Cancellable = _ override def preStart() { + /** + * A message is sent to the actor itself periodically to remind the actor to resubmit failed + * stages. In this way, stage resubmission can be done within the same thread context of + * other event processing logic to avoid unnecessary synchronization overhead. + */ resubmissionTask = context.system.scheduler.schedule( RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages) } /** - * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure - * events and responds by launching tasks. This runs in a dedicated thread and receives events - * via the eventQueue. + * The main event loop of the DAG scheduler. */ def receive = { case event: DAGSchedulerEvent => logDebug("Got event of type " + event.getClass.getName) + /** + * All events are forwarded to `processEvent()`, so that the event processing logic can + * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite` + * for details. + */ if (!processEvent(event)) { submitWaitingStages() } else { @@ -383,8 +402,10 @@ class DAGScheduler( } /** - * Process one event retrieved from the event queue. - * Returns true if we should stop the event loop. + * Process one event retrieved from the event processing actor. + * + * @param event The event to be processed. + * @return `true` if we should stop the event loop. */ private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { event match {
