Updated Branches:
  refs/heads/master 9290e5bcd -> 2054c61a1

Replaced the daemon thread started by DAGScheduler with an actor


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2539c067
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2539c067
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2539c067

Branch: refs/heads/master
Commit: 2539c0674501432fb62073577db6da52a26db850
Parents: bf4e613
Author: Lian, Cheng <[email protected]>
Authored: Sat Nov 9 19:05:18 2013 +0800
Committer: Lian, Cheng <[email protected]>
Committed: Sat Nov 9 19:05:18 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   1 -
 .../apache/spark/scheduler/DAGScheduler.scala   | 105 ++++++++-----------
 .../org/apache/spark/storage/BlockManager.scala |   2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +-
 4 files changed, 45 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 880b49e..03ffcc6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -238,7 +238,6 @@ class SparkContext(
   taskScheduler.start()
 
   @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
-  dagScheduler.start()
 
   ui.start()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d0b21e8..cb19969 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -19,9 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.util.Properties
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
+import akka.actor.{Props, Actor, ActorRef}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 
 import org.apache.spark._
@@ -65,12 +65,12 @@ class DAGScheduler(
 
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
-    eventQueue.put(BeginEvent(task, taskInfo))
+    eventProcessActor ! BeginEvent(task, taskInfo)
   }
 
   // Called to report that a task has completed and results are being fetched 
remotely.
   def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
-    eventQueue.put(GettingResultEvent(task, taskInfo))
+    eventProcessActor ! GettingResultEvent(task, taskInfo)
   }
 
   // Called by TaskScheduler to report task completions or failures.
@@ -81,23 +81,23 @@ class DAGScheduler(
       accumUpdates: Map[Long, Any],
       taskInfo: TaskInfo,
       taskMetrics: TaskMetrics) {
-    eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, 
taskInfo, taskMetrics))
+    eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, 
taskInfo, taskMetrics)
   }
 
   // Called by TaskScheduler when an executor fails.
   def executorLost(execId: String) {
-    eventQueue.put(ExecutorLost(execId))
+    eventProcessActor ! ExecutorLost(execId)
   }
 
   // Called by TaskScheduler when a host is added
   def executorGained(execId: String, host: String) {
-    eventQueue.put(ExecutorGained(execId, host))
+    eventProcessActor ! ExecutorGained(execId, host)
   }
 
   // Called by TaskScheduler to cancel an entire TaskSet due to either 
repeated failures or
   // cancellation of the job itself.
   def taskSetFailed(taskSet: TaskSet, reason: String) {
-    eventQueue.put(TaskSetFailed(taskSet, reason))
+    eventProcessActor ! TaskSetFailed(taskSet, reason)
   }
 
   // The time, in millis, to wait for fetch failure events to stop coming in 
after one is detected;
@@ -109,7 +109,36 @@ class DAGScheduler(
   // resubmit failed stages
   val POLL_TIMEOUT = 10L
 
-  private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
+  private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new 
Actor {
+    /**
+     * The main event loop of the DAG scheduler, which waits for new-job / 
task-finished / failure
+     * events and responds by launching tasks. This runs in a dedicated thread 
and receives events
+     * via the eventQueue.
+     */
+    def receive = {
+      case event: DAGSchedulerEvent =>
+        if (event != null) {
+          logDebug("Got event of type " + event.getClass.getName)
+        }
+
+        if (!processEvent(event)) {
+          val time = System.currentTimeMillis() // TODO: use a pluggable clock 
for testability
+          // Periodically resubmit failed stages if some map output fetches 
have failed and we have
+          // waited at least RESUBMIT_TIMEOUT. We wait for this short time 
because when a node fails,
+          // tasks on many other nodes are bound to get a fetch failure, and 
they won't all get it at
+          // the same time, so we want to make sure we've identified all the 
reduce tasks that depend
+          // on the failed node.
+          if (failed.size > 0 && time > lastFetchFailureTime + 
RESUBMIT_TIMEOUT) {
+            resubmitFailedStages()
+          } else {
+            submitWaitingStages()
+          }
+        }
+        else {
+          context.stop(self)
+        }
+    }
+  }))
 
   private[scheduler] val nextJobId = new AtomicInteger(0)
 
@@ -150,16 +179,6 @@ class DAGScheduler(
 
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, 
this.cleanup)
 
-  // Start a thread to run the DAGScheduler event loop
-  def start() {
-    new Thread("DAGScheduler") {
-      setDaemon(true)
-      override def run() {
-        DAGScheduler.this.run()
-      }
-    }.start()
-  }
-
   def addSparkListener(listener: SparkListener) {
     listenerBus.addListener(listener)
   }
@@ -301,8 +320,7 @@ class DAGScheduler(
     assert(partitions.size > 0)
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
-    eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions.toArray, 
allowLocal, callSite,
-      waiter, properties))
+    eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, 
allowLocal, callSite, waiter, properties)
     waiter
   }
 
@@ -337,8 +355,7 @@ class DAGScheduler(
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val partitions = (0 until rdd.partitions.size).toArray
     val jobId = nextJobId.getAndIncrement()
-    eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions, allowLocal = 
false, callSite,
-      listener, properties))
+    eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal 
= false, callSite, listener, properties)
     listener.awaitResult()    // Will throw an exception if the job fails
   }
 
@@ -347,19 +364,19 @@ class DAGScheduler(
    */
   def cancelJob(jobId: Int) {
     logInfo("Asked to cancel job " + jobId)
-    eventQueue.put(JobCancelled(jobId))
+    eventProcessActor ! JobCancelled(jobId)
   }
 
   def cancelJobGroup(groupId: String) {
     logInfo("Asked to cancel job group " + groupId)
-    eventQueue.put(JobGroupCancelled(groupId))
+    eventProcessActor ! JobGroupCancelled(groupId)
   }
 
   /**
    * Cancel all jobs that are running or waiting in the queue.
    */
   def cancelAllJobs() {
-    eventQueue.put(AllJobsCancelled)
+    eventProcessActor ! AllJobsCancelled
   }
 
   /**
@@ -474,42 +491,6 @@ class DAGScheduler(
     }
   }
 
-
-  /**
-   * The main event loop of the DAG scheduler, which waits for new-job / 
task-finished / failure
-   * events and responds by launching tasks. This runs in a dedicated thread 
and receives events
-   * via the eventQueue.
-   */
-  private def run() {
-    SparkEnv.set(env)
-
-    while (true) {
-      val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
-      if (event != null) {
-        logDebug("Got event of type " + event.getClass.getName)
-      }
-      this.synchronized { // needed in case other threads makes calls into 
methods of this class
-        if (event != null) {
-          if (processEvent(event)) {
-            return
-          }
-        }
-
-        val time = System.currentTimeMillis() // TODO: use a pluggable clock 
for testability
-        // Periodically resubmit failed stages if some map output fetches have 
failed and we have
-        // waited at least RESUBMIT_TIMEOUT. We wait for this short time 
because when a node fails,
-        // tasks on many other nodes are bound to get a fetch failure, and 
they won't all get it at
-        // the same time, so we want to make sure we've identified all the 
reduce tasks that depend
-        // on the failed node.
-        if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) 
{
-          resubmitFailedStages()
-        } else {
-          submitWaitingStages()
-        }
-      }
-    }
-  }
-
   /**
    * Run a job on an RDD locally, assuming it has only a single partition and 
no dependencies.
    * We run the operation in a separate thread just in case it takes a bunch 
of time, so that we
@@ -909,7 +890,7 @@ class DAGScheduler(
   }
 
   def stop() {
-    eventQueue.put(StopDAGScheduler)
+    eventProcessActor ! StopDAGScheduler
     metadataCleaner.cancel()
     taskSched.stop()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a34c95b..2c21134 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -893,7 +893,7 @@ private[spark] object BlockManager extends Logging {
   {
     // env == null and blockManagerMaster != null is used in tests
     assert (env != null || blockManagerMaster != null)
-    val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
+    val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == 
null) {
       env.blockManager.getLocationBlockIds(blockIds)
     } else {
       blockManagerMaster.getLocations(blockIds)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 00f2fdd..a4d41eb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -100,7 +100,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     cacheLocations.clear()
     results.clear()
     mapOutputTracker = new MapOutputTrackerMaster()
-    scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, 
blockManagerMaster, null) {
+    scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, 
blockManagerMaster, sc.env) {
       override def runLocally(job: ActiveJob) {
         // don't bother with the thread while unit testing
         runLocallyWithinThread(job)

Reply via email to