Repository: spark
Updated Branches:
  refs/heads/master d6cbec759 -> 7026ee23e


[SPARK-17755][CORE] Use workerRef to send RegisterWorkerResponse to avoid the 
race condition

## What changes were proposed in this pull request?

The root cause of this issue is that RegisterWorkerResponse and LaunchExecutor 
are sent via two different channels (TCP connections) and their order is not 
guaranteed.

This PR changes the master and worker codes to use `workerRef` to send 
RegisterWorkerResponse, so that RegisterWorkerResponse and LaunchExecutor are 
sent via the same connection. Hence `LaunchExecutor` will always be after 
`RegisterWorkerResponse` and never be ignored.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #16345 from zsxwing/SPARK-17755.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7026ee23
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7026ee23
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7026ee23

Branch: refs/heads/master
Commit: 7026ee23e0a684e13f9d7dfbb8f85e810106d022
Parents: d6cbec7
Author: Shixiong Zhu <[email protected]>
Authored: Sun Dec 25 23:48:14 2016 -0800
Committer: Shixiong Zhu <[email protected]>
Committed: Sun Dec 25 23:48:14 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala | 47 ++++++++++----------
 .../org/apache/spark/deploy/worker/Worker.scala | 24 ++++------
 .../spark/deploy/master/MasterSuite.scala       |  2 +-
 3 files changed, 32 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7026ee23/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4618e61..c5f7c07 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -231,6 +231,29 @@ private[deploy] class Master(
       logError("Leadership has been revoked -- master shutting down.")
       System.exit(0)
 
+    case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, 
workerWebUiUrl) =>
+      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
+        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
+      if (state == RecoveryState.STANDBY) {
+        workerRef.send(MasterInStandby)
+      } else if (idToWorker.contains(id)) {
+        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
+      } else {
+        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+          workerRef, workerWebUiUrl)
+        if (registerWorker(worker)) {
+          persistenceEngine.addWorker(worker)
+          workerRef.send(RegisteredWorker(self, masterWebUiUrl))
+          schedule()
+        } else {
+          val workerAddress = worker.endpoint.address
+          logWarning("Worker registration failed. Attempted to re-register 
worker at same " +
+            "address: " + workerAddress)
+          workerRef.send(RegisterWorkerFailed("Attempted to re-register worker 
at same address: "
+            + workerAddress))
+        }
+      }
+
     case RegisterApplication(description, driver) =>
       // TODO Prevent repeated registrations from some driver
       if (state == RecoveryState.STANDBY) {
@@ -386,30 +409,6 @@ private[deploy] class Master(
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
-    case RegisterWorker(
-        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) 
=>
-      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
-        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
-      if (state == RecoveryState.STANDBY) {
-        context.reply(MasterInStandby)
-      } else if (idToWorker.contains(id)) {
-        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
-      } else {
-        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
-          workerRef, workerWebUiUrl)
-        if (registerWorker(worker)) {
-          persistenceEngine.addWorker(worker)
-          context.reply(RegisteredWorker(self, masterWebUiUrl))
-          schedule()
-        } else {
-          val workerAddress = worker.endpoint.address
-          logWarning("Worker registration failed. Attempted to re-register 
worker at same " +
-            "address: " + workerAddress)
-          context.reply(RegisterWorkerFailed("Attempted to re-register worker 
at same address: "
-            + workerAddress))
-        }
-      }
-
     case RequestSubmitDriver(description) =>
       if (state != RecoveryState.ALIVE) {
         val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/7026ee23/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0940f3c..f963a46 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, 
ScheduledFuture => JScheduledFut
 
 import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
 import scala.concurrent.ExecutionContext
-import scala.util.{Failure, Random, Success}
+import scala.util.Random
 import scala.util.control.NonFatal
 
 import org.apache.spark.{SecurityManager, SparkConf}
@@ -216,7 +216,7 @@ private[deploy] class Worker(
           try {
             logInfo("Connecting to master " + masterAddress + "...")
             val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, 
Master.ENDPOINT_NAME)
-            registerWithMaster(masterEndpoint)
+            sendRegisterMessageToMaster(masterEndpoint)
           } catch {
             case ie: InterruptedException => // Cancelled
             case NonFatal(e) => logWarning(s"Failed to connect to master 
$masterAddress", e)
@@ -272,7 +272,7 @@ private[deploy] class Worker(
                 try {
                   logInfo("Connecting to master " + masterAddress + "...")
                   val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, 
Master.ENDPOINT_NAME)
-                  registerWithMaster(masterEndpoint)
+                  sendRegisterMessageToMaster(masterEndpoint)
                 } catch {
                   case ie: InterruptedException => // Cancelled
                   case NonFatal(e) => logWarning(s"Failed to connect to master 
$masterAddress", e)
@@ -341,19 +341,8 @@ private[deploy] class Worker(
     }
   }
 
-  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
-    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
-      workerId, host, port, self, cores, memory, workerWebUiUrl))
-      .onComplete {
-        // This is a very fast action so we can use "ThreadUtils.sameThread"
-        case Success(msg) =>
-          Utils.tryLogNonFatalError {
-            handleRegisterResponse(msg)
-          }
-        case Failure(e) =>
-          logError(s"Cannot register with master: ${masterEndpoint.address}", 
e)
-          System.exit(1)
-      }(ThreadUtils.sameThread)
+  private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): 
Unit = {
+    masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, 
memory, workerWebUiUrl))
   }
 
   private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = 
synchronized {
@@ -394,6 +383,9 @@ private[deploy] class Worker(
   }
 
   override def receive: PartialFunction[Any, Unit] = synchronized {
+    case msg: RegisterWorkerResponse =>
+      handleRegisterResponse(msg)
+
     case SendHeartbeat =>
       if (connected) { sendToMaster(Heartbeat(workerId, self)) }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7026ee23/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 831a7bc..da7253b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -447,7 +447,7 @@ class MasterSuite extends SparkFunSuite
       }
     })
 
-    master.self.ask(
+    master.self.send(
       RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, 
"http://localhost:8080";))
     val executors = (0 until 3).map { i =>
       new ExecutorDescription(appId = i.toString, execId = i, 2, 
ExecutorState.RUNNING)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to