Repository: spark
Updated Branches:
  refs/heads/branch-1.0 1083f2bde -> 0aaa2c6d0


[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient

See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete 
description, but in essence: If the Worker or AppClient actor restarts before 
successfully registering with Master, multiple retryTimers will be running, 
which will lead to less than the full number of registration retries being 
attempted before the new actor is forced to give up.

Author: Mark Hamstra <[email protected]>

Closes #602 from markhamstra/SPARK-1685 and squashes the following commits:

11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer
69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient

(cherry picked from commit fbfe69de69aa6767d95167711622ec34c59a1f6d)
Signed-off-by: Matei Zaharia <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0aaa2c6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0aaa2c6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0aaa2c6d

Branch: refs/heads/branch-1.0
Commit: 0aaa2c6d050a48f00fe4c0f8e30f6be599cb09bc
Parents: 1083f2b
Author: Mark Hamstra <[email protected]>
Authored: Tue May 6 12:53:39 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Tue May 6 12:53:48 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/client/AppClient.scala     | 13 +++++++++----
 .../scala/org/apache/spark/deploy/worker/Worker.scala  | 10 ++++++----
 2 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0aaa2c6d/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 57085fc..896913d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -60,6 +60,7 @@ private[spark] class AppClient(
     var master: ActorSelection = null
     var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
     var alreadyDead = false  // To avoid calling listener.dead() multiple times
+    var registrationRetryTimer: Option[Cancellable] = None
 
     override def preStart() {
       context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
@@ -83,21 +84,20 @@ private[spark] class AppClient(
 
     def registerWithMaster() {
       tryRegisterAllMasters()
-
       import context.dispatcher
       var retries = 0
-      lazy val retryTimer: Cancellable =
+      registrationRetryTimer = Some {
         context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
           retries += 1
           if (registered) {
-            retryTimer.cancel()
+            registrationRetryTimer.foreach(_.cancel())
           } else if (retries >= REGISTRATION_RETRIES) {
             markDead("All masters are unresponsive! Giving up.")
           } else {
             tryRegisterAllMasters()
           }
         }
-      retryTimer // start timer
+      }
     }
 
     def changeMaster(url: String) {
@@ -177,6 +177,11 @@ private[spark] class AppClient(
         alreadyDead = true
       }
     }
+
+    override def postStop() {
+      registrationRetryTimer.foreach(_.cancel())
+    }
+
   }
 
   def start() {

http://git-wip-us.apache.org/repos/asf/spark/blob/0aaa2c6d/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index cd6bd2c..85d25dc 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -100,6 +100,8 @@ private[spark] class Worker(
   val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, 
securityMgr)
   val workerSource = new WorkerSource(this)
 
+  var registrationRetryTimer: Option[Cancellable] = None
+
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
@@ -161,13 +163,12 @@ private[spark] class Worker(
 
   def registerWithMaster() {
     tryRegisterAllMasters()
-
     var retries = 0
-    lazy val retryTimer: Cancellable =
+    registrationRetryTimer = Some {
       context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
         retries += 1
         if (registered) {
-          retryTimer.cancel()
+          registrationRetryTimer.foreach(_.cancel())
         } else if (retries >= REGISTRATION_RETRIES) {
           logError("All masters are unresponsive! Giving up.")
           System.exit(1)
@@ -175,7 +176,7 @@ private[spark] class Worker(
           tryRegisterAllMasters()
         }
       }
-    retryTimer // start timer
+    }
   }
 
   override def receive = {
@@ -344,6 +345,7 @@ private[spark] class Worker(
   }
 
   override def postStop() {
+    registrationRetryTimer.foreach(_.cancel())
     executors.values.foreach(_.kill())
     drivers.values.foreach(_.kill())
     webUi.stop()

Reply via email to