Repository: spark Updated Branches: refs/heads/master eb4632f28 -> 4fb52f954
[SPARK-7624] Revert #4147 Author: Davies Liu <[email protected]> Closes #6172 from davies/revert_4147 and squashes the following commits: 3bfbbde [Davies Liu] Revert #4147 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fb52f95 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fb52f95 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fb52f95 Branch: refs/heads/master Commit: 4fb52f9545ae338fae2d3aeea4bfc35d5df44853 Parents: eb4632f Author: Davies Liu <[email protected]> Authored: Mon May 18 16:55:45 2015 -0700 Committer: Josh Rosen <[email protected]> Committed: Mon May 18 16:55:45 2015 -0700 ---------------------------------------------------------------------- .../spark/scheduler/local/LocalBackend.scala | 23 ++------------------ 1 file changed, 2 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4fb52f95/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index e64d06c..3078a1b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -18,14 +18,12 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} -import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} -import org.apache.spark.util.{ThreadUtils, Utils} private case class ReviveOffers() @@ -47,9 +45,6 @@ private[spark] class LocalEndpoint( private val totalCores: Int) extends ThreadSafeRpcEndpoint with Logging { - private val reviveThread = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread") - private var freeCores = totalCores private val localExecutorId = SparkContext.DRIVER_IDENTIFIER @@ -79,27 +74,13 @@ private[spark] class LocalEndpoint( context.reply(true) } - def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) - val tasks = scheduler.resourceOffers(offers).flatten - for (task <- tasks) { + for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } - if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) { - // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout - reviveThread.schedule(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { - Option(self).foreach(_.send(ReviveOffers)) - } - }, 1000, TimeUnit.MILLISECONDS) - } - } - - override def onStop(): Unit = { - reviveThread.shutdownNow() } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
