This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new acf8f66650a [SPARK-39647][CORE] Register the executor with ESS before 
registering the BlockManager
acf8f66650a is described below

commit acf8f66650af53718b08f3778c2a2a3a5d10a88f
Author: Chandni Singh <[email protected]>
AuthorDate: Tue Jul 12 00:20:43 2022 -0500

    [SPARK-39647][CORE] Register the executor with ESS before registering the 
BlockManager
    
    ### What changes were proposed in this pull request?
    Currently the executors register with the ESS after the `BlockManager` 
registration with the `BlockManagerMaster`.  This order creates a problem with 
the push-based shuffle. A registered BlockManager node is picked up by the 
driver as a merger but the shuffle service on that node is not yet ready to 
merge the data which causes block pushes to fail until the local executor 
registers with it. This fix is to reverse the order, that is, register with the 
ESS before registering the `BlockManager`
    
    ### Why are the changes needed?
    They are needed to fix the issue which causes block pushes to fail.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a UT.
    
    Closes #37052 from otterc/SPARK-39647.
    
    Authored-by: Chandni Singh <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 79ba2890f51c5f676b9cd6e3a6682c7969462999)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../org/apache/spark/storage/BlockManager.scala    | 30 ++++++++++++------
 .../apache/spark/storage/BlockManagerSuite.scala   | 36 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d5901888d1a..53d2d054121 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -516,9 +516,27 @@ private[spark] class BlockManager(
       ret
     }
 
+    // Register Executors' configuration with the local shuffle service, if 
one should exist.
+    // Registration with the ESS should happen before registering the block 
manager with the
+    // BlockManagerMaster. In push-based shuffle, the registered BM is 
selected by the driver
+    // as a merger. However, for the ESS on this host to be able to merge 
blocks successfully,
+    // it needs the merge directories metadata which is provided by the local 
executor during
+    // the registration with the ESS. Therefore, this registration should be 
prior to
+    // the BlockManager registration. See SPARK-39647.
+    if (externalShuffleServiceEnabled) {
+      logInfo(s"external shuffle service port = $externalShuffleServicePort")
+      shuffleServerId = BlockManagerId(executorId, 
blockTransferService.hostName,
+        externalShuffleServicePort)
+      if (!isDriver) {
+        registerWithExternalShuffleServer()
+      }
+    }
+
     val id =
       BlockManagerId(executorId, blockTransferService.hostName, 
blockTransferService.port, None)
 
+    // The idFromMaster has just additional topology information. Otherwise, 
it has the same
+    // executor id/host/port of idWithoutTopologyInfo which is not expected to 
be changed.
     val idFromMaster = master.registerBlockManager(
       id,
       diskBlockManager.localDirsString,
@@ -528,16 +546,8 @@ private[spark] class BlockManager(
 
     blockManagerId = if (idFromMaster != null) idFromMaster else id
 
-    shuffleServerId = if (externalShuffleServiceEnabled) {
-      logInfo(s"external shuffle service port = $externalShuffleServicePort")
-      BlockManagerId(executorId, blockTransferService.hostName, 
externalShuffleServicePort)
-    } else {
-      blockManagerId
-    }
-
-    // Register Executors' configuration with the local shuffle service, if 
one should exist.
-    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
-      registerWithExternalShuffleServer()
+    if (!externalShuffleServiceEnabled) {
+      shuffleServerId = blockManagerId
     }
 
     hostLocalDirManager = {
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 45e05b2cc2d..874b2b4f005 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -2175,6 +2175,42 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(kryoException.getMessage === "java.io.IOException: Input/output 
error")
   }
 
+  test("SPARK-39647: Failure to register with ESS should prevent registering 
the BM") {
+    val handler = new NoOpRpcHandler {
+      override def receive(
+          client: TransportClient,
+          message: ByteBuffer,
+          callback: RpcResponseCallback): Unit = {
+        val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+        msgObj match {
+          case _: RegisterExecutor => () // No reply to generate client-side 
timeout
+        }
+      }
+    }
+    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
+    Utils.tryWithResource(new TransportContext(transConf, handler, true)) { 
transCtx =>
+      def newShuffleServer(port: Int): (TransportServer, Int) = {
+        (transCtx.createServer(port, 
Seq.empty[TransportServerBootstrap].asJava), port)
+      }
+
+      val candidatePort = RandomUtils.nextInt(1024, 65536)
+      val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
+        newShuffleServer, conf, "ShuffleServer")
+
+      conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+      conf.set(SHUFFLE_SERVICE_PORT.key, shufflePort.toString)
+      conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
+      conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+      val e = intercept[SparkException] {
+        makeBlockManager(8000, "timeoutExec")
+      }.getMessage
+      assert(e.contains("TimeoutException"))
+      verify(master, times(0))
+        .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
+      server.close()
+    }
+  }
+
   private def createKryoSerializerWithDiskCorruptedInputStream(): 
KryoSerializer = {
     class TestDiskCorruptedInputStream extends InputStream {
       override def read(): Int = throw new IOException("Input/output error")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to