Repository: spark
Updated Branches:
  refs/heads/branch-0.9 45561cd9f -> 9e2c59efe


[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient

See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete 
description, but in essence: If the Worker or AppClient actor restarts before 
successfully registering with Master, multiple retryTimers will be running, 
which will lead to less than the full number of registration retries being 
attempted before the new actor is forced to give up.

Author: Mark Hamstra <[email protected]>

Closes #602 from markhamstra/SPARK-1685 and squashes the following commits:

11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer
69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e2c59ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e2c59ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e2c59ef

Branch: refs/heads/branch-0.9
Commit: 9e2c59efe02a7ccfa9b62b1265915fa0a5cf5efd
Parents: 45561cd
Author: Mark Hamstra <[email protected]>
Authored: Tue May 6 12:53:39 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Tue May 6 12:56:29 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/client/AppClient.scala     | 13 +++++++++----
 .../scala/org/apache/spark/deploy/worker/Worker.scala  | 10 ++++++----
 2 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c59ef/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 1415e2f..7e98c7f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -60,6 +60,7 @@ private[spark] class AppClient(
     var master: ActorSelection = null
     var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
     var alreadyDead = false  // To avoid calling listener.dead() multiple times
+    var registrationRetryTimer: Option[Cancellable] = None
 
     override def preStart() {
       context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
@@ -83,14 +84,13 @@ private[spark] class AppClient(
 
     def registerWithMaster() {
       tryRegisterAllMasters()
-
       import context.dispatcher
       var retries = 0
-      lazy val retryTimer: Cancellable =
+      registrationRetryTimer = Some {
         context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
           retries += 1
           if (registered) {
-            retryTimer.cancel()
+            registrationRetryTimer.foreach(_.cancel())
           } else if (retries >= REGISTRATION_RETRIES) {
             logError("All masters are unresponsive! Giving up.")
             markDead()
@@ -98,7 +98,7 @@ private[spark] class AppClient(
             tryRegisterAllMasters()
           }
         }
-      retryTimer // start timer
+      }
     }
 
     def changeMaster(url: String) {
@@ -178,6 +178,11 @@ private[spark] class AppClient(
         alreadyDead = true
       }
     }
+
+    override def postStop() {
+      registrationRetryTimer.foreach(_.cancel())
+    }
+
   }
 
   def start() {

http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c59ef/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index cecb2c8..bc7dd7d 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -95,6 +95,8 @@ private[spark] class Worker(
   val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf)
   val workerSource = new WorkerSource(this)
 
+  var registrationRetryTimer: Option[Cancellable] = None
+
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
@@ -158,13 +160,12 @@ private[spark] class Worker(
 
   def registerWithMaster() {
     tryRegisterAllMasters()
-
     var retries = 0
-    lazy val retryTimer: Cancellable =
+    registrationRetryTimer = Some {
       context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
         retries += 1
         if (registered) {
-          retryTimer.cancel()
+          registrationRetryTimer.foreach(_.cancel())
         } else if (retries >= REGISTRATION_RETRIES) {
           logError("All masters are unresponsive! Giving up.")
           System.exit(1)
@@ -172,7 +173,7 @@ private[spark] class Worker(
           tryRegisterAllMasters()
         }
       }
-    retryTimer // start timer
+    }
   }
 
   override def receive = {
@@ -313,6 +314,7 @@ private[spark] class Worker(
   }
 
   override def postStop() {
+    registrationRetryTimer.foreach(_.cancel())
     executors.values.foreach(_.kill())
     drivers.values.foreach(_.kill())
     webUi.stop()

Reply via email to