Repository: spark Updated Branches: refs/heads/branch-0.9 45561cd9f -> 9e2c59efe
[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete description, but in essence: If the Worker or AppClient actor restarts before successfully registering with Master, multiple retryTimers will be running, which will lead to less than the full number of registration retries being attempted before the new actor is forced to give up. Author: Mark Hamstra <[email protected]> Closes #602 from markhamstra/SPARK-1685 and squashes the following commits: 11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer 69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e2c59ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e2c59ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e2c59ef Branch: refs/heads/branch-0.9 Commit: 9e2c59efe02a7ccfa9b62b1265915fa0a5cf5efd Parents: 45561cd Author: Mark Hamstra <[email protected]> Authored: Tue May 6 12:53:39 2014 -0700 Committer: Matei Zaharia <[email protected]> Committed: Tue May 6 12:56:29 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/client/AppClient.scala | 13 +++++++++---- .../scala/org/apache/spark/deploy/worker/Worker.scala | 10 ++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c59ef/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 1415e2f..7e98c7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -60,6 +60,7 @@ private[spark] class AppClient( var master: ActorSelection = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times + var registrationRetryTimer: Option[Cancellable] = None override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -83,14 +84,13 @@ private[spark] class AppClient( def registerWithMaster() { tryRegisterAllMasters() - import context.dispatcher var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + registrationRetryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { logError("All masters are unresponsive! Giving up.") markDead() @@ -98,7 +98,7 @@ private[spark] class AppClient( tryRegisterAllMasters() } } - retryTimer // start timer + } } def changeMaster(url: String) { @@ -178,6 +178,11 @@ private[spark] class AppClient( alreadyDead = true } } + + override def postStop() { + registrationRetryTimer.foreach(_.cancel()) + } + } def start() { http://git-wip-us.apache.org/repos/asf/spark/blob/9e2c59ef/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index cecb2c8..bc7dd7d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -95,6 +95,8 @@ private[spark] class Worker( val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf) val workerSource = new WorkerSource(this) + var registrationRetryTimer: Option[Cancellable] = None + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -158,13 +160,12 @@ private[spark] class Worker( def registerWithMaster() { tryRegisterAllMasters() - var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + registrationRetryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { logError("All masters are unresponsive! Giving up.") System.exit(1) @@ -172,7 +173,7 @@ private[spark] class Worker( tryRegisterAllMasters() } } - retryTimer // start timer + } } override def receive = { @@ -313,6 +314,7 @@ private[spark] class Worker( } override def postStop() { + registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) webUi.stop()
