Repository: spark
Updated Branches:
  refs/heads/master eb4632f28 -> 4fb52f954


[SPARK-7624] Revert #4147

Author: Davies Liu <[email protected]>

Closes #6172 from davies/revert_4147 and squashes the following commits:

3bfbbde [Davies Liu] Revert #4147


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fb52f95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fb52f95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fb52f95

Branch: refs/heads/master
Commit: 4fb52f9545ae338fae2d3aeea4bfc35d5df44853
Parents: eb4632f
Author: Davies Liu <[email protected]>
Authored: Mon May 18 16:55:45 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Mon May 18 16:55:45 2015 -0700

----------------------------------------------------------------------
 .../spark/scheduler/local/LocalBackend.scala    | 23 ++------------------
 1 file changed, 2 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4fb52f95/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index e64d06c..3078a1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -18,14 +18,12 @@
 package org.apache.spark.scheduler.local
 
 import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
 
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, 
RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, 
WorkerOffer}
-import org.apache.spark.util.{ThreadUtils, Utils}
 
 private case class ReviveOffers()
 
@@ -47,9 +45,6 @@ private[spark] class LocalEndpoint(
     private val totalCores: Int)
   extends ThreadSafeRpcEndpoint with Logging {
 
-  private val reviveThread =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
-
   private var freeCores = totalCores
 
   private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -79,27 +74,13 @@ private[spark] class LocalEndpoint(
       context.reply(true)
   }
 
-
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, 
freeCores))
-    val tasks = scheduler.resourceOffers(offers).flatten
-    for (task <- tasks) {
+    for (task <- scheduler.resourceOffers(offers).flatten) {
       freeCores -= scheduler.CPUS_PER_TASK
       executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber 
= task.attemptNumber,
         task.name, task.serializedTask)
     }
-    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
-      // Try to reviveOffer after 1 second, because scheduler may wait for 
locality timeout
-      reviveThread.schedule(new Runnable {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          Option(self).foreach(_.send(ReviveOffers))
-        }
-      }, 1000, TimeUnit.MILLISECONDS)
-    }
-  }
-
-  override def onStop(): Unit = {
-    reviveThread.shutdownNow()
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to