Repository: spark Updated Branches: refs/heads/branch-1.0 1083f2bde -> 0aaa2c6d0
[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete description, but in essence: If the Worker or AppClient actor restarts before successfully registering with Master, multiple retryTimers will be running, which will lead to less than the full number of registration retries being attempted before the new actor is forced to give up. Author: Mark Hamstra <[email protected]> Closes #602 from markhamstra/SPARK-1685 and squashes the following commits: 11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer 69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient (cherry picked from commit fbfe69de69aa6767d95167711622ec34c59a1f6d) Signed-off-by: Matei Zaharia <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0aaa2c6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0aaa2c6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0aaa2c6d Branch: refs/heads/branch-1.0 Commit: 0aaa2c6d050a48f00fe4c0f8e30f6be599cb09bc Parents: 1083f2b Author: Mark Hamstra <[email protected]> Authored: Tue May 6 12:53:39 2014 -0700 Committer: Matei Zaharia <[email protected]> Committed: Tue May 6 12:53:48 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/client/AppClient.scala | 13 +++++++++---- .../scala/org/apache/spark/deploy/worker/Worker.scala | 10 ++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0aaa2c6d/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 57085fc..896913d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -60,6 +60,7 @@ private[spark] class AppClient( var master: ActorSelection = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times + var registrationRetryTimer: Option[Cancellable] = None override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -83,21 +84,20 @@ private[spark] class AppClient( def registerWithMaster() { tryRegisterAllMasters() - import context.dispatcher var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + registrationRetryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { tryRegisterAllMasters() } } - retryTimer // start timer + } } def changeMaster(url: String) { @@ -177,6 +177,11 @@ private[spark] class AppClient( alreadyDead = true } } + + override def postStop() { + registrationRetryTimer.foreach(_.cancel()) + } + } def start() { http://git-wip-us.apache.org/repos/asf/spark/blob/0aaa2c6d/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index cd6bd2c..85d25dc 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -100,6 +100,8 @@ private[spark] class Worker( val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) val workerSource = new WorkerSource(this) + var registrationRetryTimer: Option[Cancellable] = None + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -161,13 +163,12 @@ private[spark] class Worker( def registerWithMaster() { tryRegisterAllMasters() - var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + registrationRetryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { logError("All masters are unresponsive! Giving up.") System.exit(1) @@ -175,7 +176,7 @@ private[spark] class Worker( tryRegisterAllMasters() } } - retryTimer // start timer + } } override def receive = { @@ -344,6 +345,7 @@ private[spark] class Worker( } override def postStop() { + registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) webUi.stop()
