Repository: spark Updated Branches: refs/heads/master d6cbec759 -> 7026ee23e
[SPARK-17755][CORE] Use workerRef to send RegisterWorkerResponse to avoid the race condition ## What changes were proposed in this pull request? The root cause of this issue is that RegisterWorkerResponse and LaunchExecutor are sent via two different channels (TCP connections) and their order is not guaranteed. This PR changes the master and worker codes to use `workerRef` to send RegisterWorkerResponse, so that RegisterWorkerResponse and LaunchExecutor are sent via the same connection. Hence `LaunchExecutor` will always be after `RegisterWorkerResponse` and never be ignored. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16345 from zsxwing/SPARK-17755. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7026ee23 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7026ee23 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7026ee23 Branch: refs/heads/master Commit: 7026ee23e0a684e13f9d7dfbb8f85e810106d022 Parents: d6cbec7 Author: Shixiong Zhu <[email protected]> Authored: Sun Dec 25 23:48:14 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Sun Dec 25 23:48:14 2016 -0800 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 47 ++++++++++---------- .../org/apache/spark/deploy/worker/Worker.scala | 24 ++++------ .../spark/deploy/master/MasterSuite.scala | 2 +- 3 files changed, 32 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7026ee23/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4618e61..c5f7c07 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -231,6 +231,29 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) + case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => + logInfo("Registering worker %s:%d with %d cores, %s RAM".format( + workerHost, workerPort, cores, Utils.megabytesToString(memory))) + if (state == RecoveryState.STANDBY) { + workerRef.send(MasterInStandby) + } else if (idToWorker.contains(id)) { + workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) + } else { + val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, + workerRef, workerWebUiUrl) + if (registerWorker(worker)) { + persistenceEngine.addWorker(worker) + workerRef.send(RegisteredWorker(self, masterWebUiUrl)) + schedule() + } else { + val workerAddress = worker.endpoint.address + logWarning("Worker registration failed. Attempted to re-register worker at same " + + "address: " + workerAddress) + workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " + + workerAddress)) + } + } + case RegisterApplication(description, driver) => // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { @@ -386,30 +409,6 @@ private[deploy] class Master( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => - logInfo("Registering worker %s:%d with %d cores, %s RAM".format( - workerHost, workerPort, cores, Utils.megabytesToString(memory))) - if (state == RecoveryState.STANDBY) { - context.reply(MasterInStandby) - } else if (idToWorker.contains(id)) { - context.reply(RegisterWorkerFailed("Duplicate worker ID")) - } else { - val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerWebUiUrl) - if (registerWorker(worker)) { - persistenceEngine.addWorker(worker) - context.reply(RegisteredWorker(self, masterWebUiUrl)) - schedule() - } else { - val workerAddress = worker.endpoint.address - logWarning("Worker registration failed. Attempted to re-register worker at same " + - "address: " + workerAddress) - context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " - + workerAddress)) - } - } - case RequestSubmitDriver(description) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + http://git-wip-us.apache.org/repos/asf/spark/blob/7026ee23/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 0940f3c..f963a46 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} import scala.concurrent.ExecutionContext -import scala.util.{Failure, Random, Success} +import scala.util.Random import scala.util.control.NonFatal import org.apache.spark.{SecurityManager, SparkConf} @@ -216,7 +216,7 @@ private[deploy] class Worker( try { logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) - registerWithMaster(masterEndpoint) + sendRegisterMessageToMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) @@ -272,7 +272,7 @@ private[deploy] class Worker( try { logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) - registerWithMaster(masterEndpoint) + sendRegisterMessageToMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) @@ -341,19 +341,8 @@ private[deploy] class Worker( } } - private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { - masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( - workerId, host, port, self, cores, memory, workerWebUiUrl)) - .onComplete { - // This is a very fast action so we can use "ThreadUtils.sameThread" - case Success(msg) => - Utils.tryLogNonFatalError { - handleRegisterResponse(msg) - } - case Failure(e) => - logError(s"Cannot register with master: ${masterEndpoint.address}", e) - System.exit(1) - }(ThreadUtils.sameThread) + private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = { + masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)) } private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { @@ -394,6 +383,9 @@ private[deploy] class Worker( } override def receive: PartialFunction[Any, Unit] = synchronized { + case msg: RegisterWorkerResponse => + handleRegisterResponse(msg) + case SendHeartbeat => if (connected) { sendToMaster(Heartbeat(workerId, self)) } http://git-wip-us.apache.org/repos/asf/spark/blob/7026ee23/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 831a7bc..da7253b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -447,7 +447,7 @@ class MasterSuite extends SparkFunSuite } }) - master.self.ask( + master.self.send( RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, "http://localhost:8080")) val executors = (0 until 3).map { i => new ExecutorDescription(appId = i.toString, execId = i, 2, ExecutorState.RUNNING) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
