Updated Branches:
  refs/heads/master 743a31a7c -> 60e23a58b

Bugfix: SPARK-965 & SPARK-966

SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966

* Add back DAGScheduler.start(), eventProcessActor is created and started here.

  Notice that function is only called by SparkContext.

* Cancel the scheduled stage resubmission task when stopping eventProcessActor

* Add a new DAGSchedulerEvent ResubmitFailedStages

  This event message is sent by the scheduled stage resubmission task to 
eventProcessActor.  In this way, DAGScheduler.resubmitFailedStages is 
guaranteed to be executed from the same thread that runs 
DAGScheduler.processEvent.

  Please refer to discussion in SPARK-966 for details.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/18def5d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/18def5d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/18def5d6

Branch: refs/heads/master
Commit: 18def5d6f20b33c946f9b8b2cea8cfb6848dcc34
Parents: 743a31a
Author: Lian, Cheng <[email protected]>
Authored: Thu Nov 28 17:46:06 2013 +0800
Committer: Lian, Cheng <[email protected]>
Committed: Thu Nov 28 17:46:06 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  1 +
 .../apache/spark/scheduler/DAGScheduler.scala   | 62 ++++++++++++--------
 .../spark/scheduler/DAGSchedulerEvent.scala     |  2 +
 3 files changed, 40 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18def5d6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3a80241..c314f01 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -270,6 +270,7 @@ class SparkContext(
   taskScheduler.start()
 
   @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
+  dagScheduler.start()
 
   ui.start()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18def5d6/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4457525..e2bf08c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -113,30 +113,7 @@ class DAGScheduler(
   // Warns the user if a stage contains a task with size greater than this 
value (in KB)
   val TASK_SIZE_TO_WARN = 100
 
-  private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new 
Actor {
-    override def preStart() {
-      context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, 
RESUBMIT_TIMEOUT milliseconds) {
-        if (failed.size > 0) {
-          resubmitFailedStages()
-        }
-      }
-    }
-
-    /**
-     * The main event loop of the DAG scheduler, which waits for new-job / 
task-finished / failure
-     * events and responds by launching tasks. This runs in a dedicated thread 
and receives events
-     * via the eventQueue.
-     */
-    def receive = {
-      case event: DAGSchedulerEvent =>
-        logDebug("Got event of type " + event.getClass.getName)
-
-        if (!processEvent(event))
-          submitWaitingStages()
-        else
-          context.stop(self)
-    }
-  }))
+  private var eventProcessActor: ActorRef = _
 
   private[scheduler] val nextJobId = new AtomicInteger(0)
 
@@ -177,6 +154,34 @@ class DAGScheduler(
 
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, 
this.cleanup)
 
+  def start() {
+    eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
+      var resubmissionTask: Cancellable = _
+
+      override def preStart() {
+        resubmissionTask = context.system.scheduler.schedule(
+          RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, 
ResubmitFailedStages)
+      }
+
+      /**
+       * The main event loop of the DAG scheduler, which waits for new-job / 
task-finished / failure
+       * events and responds by launching tasks. This runs in a dedicated 
thread and receives events
+       * via the eventQueue.
+       */
+      def receive = {
+        case event: DAGSchedulerEvent =>
+          logDebug("Got event of type " + event.getClass.getName)
+
+          if (!processEvent(event)) {
+            submitWaitingStages()
+          } else {
+            resubmissionTask.cancel()
+            context.stop(self)
+          }
+      }
+    }))
+  }
+
   def addSparkListener(listener: SparkListener) {
     listenerBus.addListener(listener)
   }
@@ -457,6 +462,11 @@ class DAGScheduler(
       case TaskSetFailed(taskSet, reason) =>
         abortStage(stageIdToStage(taskSet.stageId), reason)
 
+      case ResubmitFailedStages =>
+        if (failed.size > 0) {
+          resubmitFailedStages()
+        }
+
       case StopDAGScheduler =>
         // Cancel any active jobs
         for (job <- activeJobs) {
@@ -900,7 +910,9 @@ class DAGScheduler(
   }
 
   def stop() {
-    eventProcessActor ! StopDAGScheduler
+    if (eventProcessActor != null) {
+      eventProcessActor ! StopDAGScheduler
+    }
     metadataCleaner.cancel()
     taskSched.stop()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18def5d6/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 708d221..5353cd2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -73,4 +73,6 @@ private[scheduler] case class ExecutorLost(execId: String) 
extends DAGSchedulerE
 private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String) extends 
DAGSchedulerEvent
 
+private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
+
 private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent

Reply via email to