Repository: spark
Updated Branches:
  refs/heads/master 7b978c1ac -> fbfe69de6


[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient

See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete 
description, but in essence: If the Worker or AppClient actor restarts before 
successfully registering with Master, multiple retryTimers will be running, 
which will lead to less than the full number of registration retries being 
attempted before the new actor is forced to give up.

Author: Mark Hamstra <[email protected]>

Closes #602 from markhamstra/SPARK-1685 and squashes the following commits:

11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer
69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbfe69de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbfe69de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbfe69de

Branch: refs/heads/master
Commit: fbfe69de69aa6767d95167711622ec34c59a1f6d
Parents: 7b978c1
Author: Mark Hamstra <[email protected]>
Authored: Tue May 6 12:53:39 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Tue May 6 12:53:39 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/client/AppClient.scala     | 13 +++++++++----
 .../scala/org/apache/spark/deploy/worker/Worker.scala  | 10 ++++++----
 2 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fbfe69de/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 57085fc..896913d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -60,6 +60,7 @@ private[spark] class AppClient(
     var master: ActorSelection = null
     var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
     var alreadyDead = false  // To avoid calling listener.dead() multiple times
+    var registrationRetryTimer: Option[Cancellable] = None
 
     override def preStart() {
       context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
@@ -83,21 +84,20 @@ private[spark] class AppClient(
 
     def registerWithMaster() {
       tryRegisterAllMasters()
-
       import context.dispatcher
       var retries = 0
-      lazy val retryTimer: Cancellable =
+      registrationRetryTimer = Some {
         context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
           retries += 1
           if (registered) {
-            retryTimer.cancel()
+            registrationRetryTimer.foreach(_.cancel())
           } else if (retries >= REGISTRATION_RETRIES) {
             markDead("All masters are unresponsive! Giving up.")
           } else {
             tryRegisterAllMasters()
           }
         }
-      retryTimer // start timer
+      }
     }
 
     def changeMaster(url: String) {
@@ -177,6 +177,11 @@ private[spark] class AppClient(
         alreadyDead = true
       }
     }
+
+    override def postStop() {
+      registrationRetryTimer.foreach(_.cancel())
+    }
+
   }
 
   def start() {

http://git-wip-us.apache.org/repos/asf/spark/blob/fbfe69de/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index cd6bd2c..85d25dc 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -100,6 +100,8 @@ private[spark] class Worker(
   val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, 
securityMgr)
   val workerSource = new WorkerSource(this)
 
+  var registrationRetryTimer: Option[Cancellable] = None
+
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
@@ -161,13 +163,12 @@ private[spark] class Worker(
 
   def registerWithMaster() {
     tryRegisterAllMasters()
-
     var retries = 0
-    lazy val retryTimer: Cancellable =
+    registrationRetryTimer = Some {
       context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
         retries += 1
         if (registered) {
-          retryTimer.cancel()
+          registrationRetryTimer.foreach(_.cancel())
         } else if (retries >= REGISTRATION_RETRIES) {
           logError("All masters are unresponsive! Giving up.")
           System.exit(1)
@@ -175,7 +176,7 @@ private[spark] class Worker(
           tryRegisterAllMasters()
         }
       }
-    retryTimer // start timer
+    }
   }
 
   override def receive = {
@@ -344,6 +345,7 @@ private[spark] class Worker(
   }
 
   override def postStop() {
+    registrationRetryTimer.foreach(_.cancel())
     executors.values.foreach(_.kill())
     drivers.values.foreach(_.kill())
     webUi.stop()

Reply via email to