Updated Branches: refs/heads/master 83bf1920c -> 87954d4c8
replace the thread with a Akka scheduler Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/28115fa8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/28115fa8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/28115fa8 Branch: refs/heads/master Commit: 28115fa8cb942c907a90e48ee1171f2a9b698411 Parents: dd63c54 Author: soulmachine <[email protected]> Authored: Sat Nov 9 22:38:27 2013 +0800 Committer: soulmachine <[email protected]> Committed: Sat Nov 9 22:38:27 2013 +0800 ---------------------------------------------------------------------- .../scheduler/cluster/ClusterScheduler.scala | 23 +++++++------------- 1 file changed, 8 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/28115fa8/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 8503395..53a5896 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +import akka.util.duration._ + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ @@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext) backend.start() if (System.getProperty("spark.speculation", "false").toBoolean) { - new Thread("ClusterScheduler speculation check") { - setDaemon(true) - - override def run() { - logInfo("Starting speculative execution thread") - while (true) { - try { - Thread.sleep(SPECULATION_INTERVAL) - } catch { - case e: InterruptedException => {} - } - checkSpeculatableTasks() - } - } - }.start() + logInfo("Starting speculative execution thread") + + sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, + SPECULATION_INTERVAL milliseconds) { + checkSpeculatableTasks() + } } }
