Repository: spark Updated Branches: refs/heads/master c4bb02abf -> f1def573f
[SPARK-13112][CORE] Make sure RegisterExecutorResponse arrive before LaunchTask ## What changes were proposed in this pull request? Send `RegisterExecutorResponse` using `executorRef` in order to make sure RegisterExecutorResponse and LaunchTask are both sent using the same channel. Then RegisterExecutorResponse will always arrive before LaunchTask ## How was this patch tested? Existing unit tests Closes #12078 Author: Shixiong Zhu <shixi...@databricks.com> Closes #12211 from zsxwing/SPARK-13112. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1def573 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1def573 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1def573 Branch: refs/heads/master Commit: f1def573f4c1c757f727476521a1509b5285051d Parents: c4bb02a Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed Apr 6 16:18:04 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed Apr 6 16:18:04 2016 -0700 ---------------------------------------------------------------------- .../spark/executor/CoarseGrainedExecutorBackend.scala | 7 +++---- .../cluster/CoarseGrainedSchedulerBackend.scala | 6 ++++-- .../scala/org/apache/spark/HeartbeatReceiverSuite.scala | 11 ++++++++--- .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 2 +- 4 files changed, 16 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 81e41e6..d4ed584 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -57,12 +57,11 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls)) + ref.ask[Boolean](RegisterExecutor(executorId, self, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" - case Success(msg) => Utils.tryLogNonFatalError { - Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse - } + case Success(msg) => + // Always receive `true`. Just ignore it case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) System.exit(1) http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e5abf0e..8896391 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -150,7 +150,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case RegisterExecutor(executorId, executorRef, cores, logUrls) => if (executorDataMap.contains(executorId)) { - context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) + executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) + context.reply(true) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. @@ -177,8 +178,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } + executorRef.send(RegisteredExecutor(executorAddress.host)) // Note: some tests expect the reply to come after we put the executor in the map - context.reply(RegisteredExecutor(executorAddress.host)) + context.reply(true) listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 3777d77..713d5e5 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -174,9 +174,9 @@ class HeartbeatReceiverSuite val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv) val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) - fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( + fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty)) - fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( + fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean]( RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty)) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) @@ -255,7 +255,12 @@ class HeartbeatReceiverSuite /** * Dummy RPC endpoint to simulate executors. */ -private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint +private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint { + + override def receive: PartialFunction[Any, Unit] = { + case _ => + } +} /** * Dummy scheduler backend to simulate executor allocation requests to the cluster manager. http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index d2e2491..3d39bd4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -561,7 +561,7 @@ class StandaloneDynamicAllocationSuite when(endpointRef.address).thenReturn(mockAddress) val message = RegisterExecutor(id, endpointRef, 10, Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] - backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message) + backend.driverEndpoint.askWithRetry[Boolean](message) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org