Repository: spark
Updated Branches:
  refs/heads/master c4bb02abf -> f1def573f


[SPARK-13112][CORE] Make sure RegisterExecutorResponse arrive before LaunchTask

## What changes were proposed in this pull request?

Send `RegisterExecutorResponse` using `executorRef` in order to make sure 
RegisterExecutorResponse and LaunchTask are both sent using the same channel. 
Then RegisterExecutorResponse will always arrive before LaunchTask

## How was this patch tested?

Existing unit tests

Closes #12078

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #12211 from zsxwing/SPARK-13112.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1def573
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1def573
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1def573

Branch: refs/heads/master
Commit: f1def573f4c1c757f727476521a1509b5285051d
Parents: c4bb02a
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Apr 6 16:18:04 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Apr 6 16:18:04 2016 -0700

----------------------------------------------------------------------
 .../spark/executor/CoarseGrainedExecutorBackend.scala    |  7 +++----
 .../cluster/CoarseGrainedSchedulerBackend.scala          |  6 ++++--
 .../scala/org/apache/spark/HeartbeatReceiverSuite.scala  | 11 ++++++++---
 .../spark/deploy/StandaloneDynamicAllocationSuite.scala  |  2 +-
 4 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 81e41e6..d4ed584 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -57,12 +57,11 @@ private[spark] class CoarseGrainedExecutorBackend(
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
-      ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, 
cores, extractLogUrls))
+      ref.ask[Boolean](RegisterExecutor(executorId, self, cores, 
extractLogUrls))
     }(ThreadUtils.sameThread).onComplete {
       // This is a very fast action so we can use "ThreadUtils.sameThread"
-      case Success(msg) => Utils.tryLogNonFatalError {
-        Option(self).foreach(_.send(msg)) // msg must be 
RegisterExecutorResponse
-      }
+      case Success(msg) =>
+        // Always receive `true`. Just ignore it
       case Failure(e) => {
         logError(s"Cannot register with driver: $driverUrl", e)
         System.exit(1)

http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e5abf0e..8896391 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -150,7 +150,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
       case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
         if (executorDataMap.contains(executorId)) {
-          context.reply(RegisterExecutorFailed("Duplicate executor ID: " + 
executorId))
+          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + 
executorId))
+          context.reply(true)
         } else {
           // If the executor's rpc env is not listening for incoming 
connections, `hostPort`
           // will be null, and the client connection should be used to contact 
the executor.
@@ -177,8 +178,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
               logDebug(s"Decremented number of pending executors 
($numPendingExecutors left)")
             }
           }
+          executorRef.send(RegisteredExecutor(executorAddress.host))
           // Note: some tests expect the reply to come after we put the 
executor in the map
-          context.reply(RegisteredExecutor(executorAddress.host))
+          context.reply(true)
           listenerBus.post(
             SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, 
data))
           makeOffers()

http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 3777d77..713d5e5 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -174,9 +174,9 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", 
dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", 
dummyExecutorEndpoint2)
-    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
+    fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
       RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
-    fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
+    fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
       RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
     heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
@@ -255,7 +255,12 @@ class HeartbeatReceiverSuite
 /**
  * Dummy RPC endpoint to simulate executors.
  */
-private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends 
RpcEndpoint
+private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends 
RpcEndpoint {
+
+  override def receive: PartialFunction[Any, Unit] = {
+    case _ =>
+  }
+}
 
 /**
  * Dummy scheduler backend to simulate executor allocation requests to the 
cluster manager.

http://git-wip-us.apache.org/repos/asf/spark/blob/f1def573/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index d2e2491..3d39bd4 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -561,7 +561,7 @@ class StandaloneDynamicAllocationSuite
       when(endpointRef.address).thenReturn(mockAddress)
       val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
       val backend = 
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
-      backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
+      backend.driverEndpoint.askWithRetry[Boolean](message)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to