Repository: spark
Updated Branches:
  refs/heads/master ea054e1fc -> 4afe9a485


[SPARK-3736] Workers reconnect when disassociated from the master.

Before, if the master node is killed and restarted, the worker nodes
would not attempt to reconnect to the Master. Therefore, when the Master
node was restarted, the worker nodes needed to be restarted as well.

Now, when the Master node is disconnected, the worker nodes will
continuously ping the master node in attempts to reconnect to it. Once
the master node restarts, it will detect one of the registration
requests from its former workers. The result is that the cluster
re-enters a healthy state.

In addition, when the master does not receive a heartbeat from the
worker, the worker was removed; however, when the worker sent a
heartbeat to the master, the master used to ignore the heartbeat. Now,
a master that receives a heartbeat from a worker that had been
disconnected will request the worker to re-attempt the registration
process, at which point the worker will send a RegisterWorker request
and be re-connected accordingly.

Re-connection attempts per worker are submitted every N seconds, where N
is configured by the property spark.worker.reconnect.interval - this has
a default of 60 seconds right now.

Author: mcheah <[email protected]>

Closes #2828 from mccheah/reconnect-dead-workers and squashes the following 
commits:

83f8bc9 [mcheah] [SPARK-3736] More informative log message, and fixing some 
indentation.
fe0e02f [mcheah] [SPARK-3736] Moving reconnection logic to registerWithMaster().
94ddeca [mcheah] [SPARK-3736] Changing a log warning to a log info.
a698e35 [mcheah] [SPARK-3736] Addressing PR comment to make some defs private.
b9a3077 [mcheah] [SPARK-3736] Addressing PR comments related to reconnection.
2ad5ed5 [mcheah] [SPARK-3736] Cancel attempts to reconnect if the master 
changes.
b5b34af [mcheah] [SPARK-3736] Workers reconnect when disassociated from the 
master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4afe9a48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4afe9a48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4afe9a48

Branch: refs/heads/master
Commit: 4afe9a4852ebeb4cc77322a14225cd3dec165f3f
Parents: ea054e1
Author: mcheah <[email protected]>
Authored: Mon Oct 20 11:35:18 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Mon Oct 20 11:35:18 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala |  2 +
 .../org/apache/spark/deploy/master/Master.scala |  9 ++-
 .../org/apache/spark/deploy/worker/Worker.scala | 81 +++++++++++++++-----
 3 files changed, 72 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4afe9a48/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index a7368f9..b9dd855 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -71,6 +71,8 @@ private[deploy] object DeployMessages {
 
   case class RegisterWorkerFailed(message: String) extends DeployMessage
 
+  case class ReconnectWorker(masterUrl: String) extends DeployMessage
+
   case class KillExecutor(masterUrl: String, appId: String, execId: Int) 
extends DeployMessage
 
   case class LaunchExecutor(

http://git-wip-us.apache.org/repos/asf/spark/blob/4afe9a48/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f98b531..3b6bb9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -341,7 +341,14 @@ private[spark] class Master(
         case Some(workerInfo) =>
           workerInfo.lastHeartbeat = System.currentTimeMillis()
         case None =>
-          logWarning("Got heartbeat from unregistered worker " + workerId)
+          if (workers.map(_.id).contains(workerId)) {
+            logWarning(s"Got heartbeat from unregistered worker $workerId." +
+              " Asking it to re-register.")
+            sender ! ReconnectWorker(masterUrl)
+          } else {
+            logWarning(s"Got heartbeat from unregistered worker $workerId." +
+              " This worker was never registered, so ignoring the heartbeat.")
+          }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4afe9a48/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 9b52cb0..c4a8ec2 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -20,12 +20,14 @@ package org.apache.spark.deploy.worker
 import java.io.File
 import java.io.IOException
 import java.text.SimpleDateFormat
-import java.util.Date
+import java.util.{UUID, Date}
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.Random
 
 import akka.actor._
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
@@ -64,8 +66,22 @@ private[spark] class Worker(
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
   val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
 
-  val REGISTRATION_TIMEOUT = 20.seconds
-  val REGISTRATION_RETRIES = 3
+  // Model retries to connect to the master, after Hadoop's model.
+  // The first six attempts to reconnect are in shorter intervals (between 5 
and 15 seconds)
+  // Afterwards, the next 10 attempts are between 30 and 90 seconds.
+  // A bit of randomness is introduced so that not all of the workers attempt 
to reconnect at
+  // the same time.
+  val INITIAL_REGISTRATION_RETRIES = 6
+  val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
+  val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
+  val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
+    val randomNumberGenerator = new 
Random(UUID.randomUUID.getMostSignificantBits)
+    randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
+  }
+  val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 *
+    REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
+  val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60
+    * REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
 
   val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
   // How often worker will clean up old app folders
@@ -103,6 +119,7 @@ private[spark] class Worker(
 
   var coresUsed = 0
   var memoryUsed = 0
+  var connectionAttemptCount = 0
 
   val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, 
securityMgr)
   val workerSource = new WorkerSource(this)
@@ -158,7 +175,7 @@ private[spark] class Worker(
     connected = true
   }
 
-  def tryRegisterAllMasters() {
+  private def tryRegisterAllMasters() {
     for (masterUrl <- masterUrls) {
       logInfo("Connecting to master " + masterUrl + "...")
       val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
@@ -166,26 +183,47 @@ private[spark] class Worker(
     }
   }
 
-  def registerWithMaster() {
-    tryRegisterAllMasters()
-    var retries = 0
-    registrationRetryTimer = Some {
-      context.system.scheduler.schedule(REGISTRATION_TIMEOUT, 
REGISTRATION_TIMEOUT) {
-        Utils.tryOrExit {
-          retries += 1
-          if (registered) {
-            registrationRetryTimer.foreach(_.cancel())
-          } else if (retries >= REGISTRATION_RETRIES) {
-            logError("All masters are unresponsive! Giving up.")
-            System.exit(1)
-          } else {
-            tryRegisterAllMasters()
+  private def retryConnectToMaster() {
+    Utils.tryOrExit {
+      connectionAttemptCount += 1
+      logInfo(s"Attempting to connect to master (attempt # 
$connectionAttemptCount")
+      if (registered) {
+        registrationRetryTimer.foreach(_.cancel())
+        registrationRetryTimer = None
+      } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
+        tryRegisterAllMasters()
+        if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
+          registrationRetryTimer.foreach(_.cancel())
+          registrationRetryTimer = Some {
+            
context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL,
+              PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
           }
         }
+      } else {
+        logError("All masters are unresponsive! Giving up.")
+        System.exit(1)
       }
     }
   }
 
+  def registerWithMaster() {
+    // DisassociatedEvent may be triggered multiple times, so don't attempt 
registration
+    // if there are outstanding registration attempts scheduled.
+    registrationRetryTimer match {
+      case None =>
+        registered = false
+        tryRegisterAllMasters()
+        connectionAttemptCount = 0
+        registrationRetryTimer = Some {
+          
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
+            INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
+        }
+      case Some(_) =>
+        logInfo("Not spawning another attempt to register with the master, 
since there is an" +
+          " attempt scheduled already.")
+    }
+  }
+
   override def receiveWithLogging = {
     case RegisteredWorker(masterUrl, masterWebUiUrl) =>
       logInfo("Successfully registered with master " + masterUrl)
@@ -243,6 +281,10 @@ private[spark] class Worker(
         System.exit(1)
       }
 
+    case ReconnectWorker(masterUrl) =>
+      logInfo(s"Master with url $masterUrl requested this worker to 
reconnect.")
+      registerWithMaster()
+
     case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
       if (masterUrl != activeMasterUrl) {
         logWarning("Invalid Master (" + masterUrl + ") attempted to launch 
executor.")
@@ -362,9 +404,10 @@ private[spark] class Worker(
     }
   }
 
-  def masterDisconnected() {
+  private def masterDisconnected() {
     logError("Connection to master failed! Waiting for master to reconnect...")
     connected = false
+    registerWithMaster()
   }
 
   def generateWorkerId(): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to