Updated Branches:
  refs/heads/master 83bf1920c -> 87954d4c8

replace the thread with a Akka scheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/28115fa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/28115fa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/28115fa8

Branch: refs/heads/master
Commit: 28115fa8cb942c907a90e48ee1171f2a9b698411
Parents: dd63c54
Author: soulmachine <[email protected]>
Authored: Sat Nov 9 22:38:27 2013 +0800
Committer: soulmachine <[email protected]>
Committed: Sat Nov 9 22:38:27 2013 +0800

----------------------------------------------------------------------
 .../scheduler/cluster/ClusterScheduler.scala    | 23 +++++++-------------
 1 file changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/28115fa8/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 8503395..53a5896 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 
+import akka.util.duration._
+
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._
@@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: 
SparkContext)
     backend.start()
 
     if (System.getProperty("spark.speculation", "false").toBoolean) {
-      new Thread("ClusterScheduler speculation check") {
-        setDaemon(true)
-
-        override def run() {
-          logInfo("Starting speculative execution thread")
-          while (true) {
-            try {
-              Thread.sleep(SPECULATION_INTERVAL)
-            } catch {
-              case e: InterruptedException => {}
-            }
-            checkSpeculatableTasks()
-          }
-        }
-      }.start()
+      logInfo("Starting speculative execution thread")
+
+      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
+            SPECULATION_INTERVAL milliseconds) {
+        checkSpeculatableTasks()
+      }
     }
   }
 

Reply via email to