This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new acf8f66650a [SPARK-39647][CORE] Register the executor with ESS before
registering the BlockManager
acf8f66650a is described below
commit acf8f66650af53718b08f3778c2a2a3a5d10a88f
Author: Chandni Singh <[email protected]>
AuthorDate: Tue Jul 12 00:20:43 2022 -0500
[SPARK-39647][CORE] Register the executor with ESS before registering the
BlockManager
### What changes were proposed in this pull request?
Currently the executors register with the ESS after the `BlockManager`
registration with the `BlockManagerMaster`. This order creates a problem with
the push-based shuffle. A registered BlockManager node is picked up by the
driver as a merger but the shuffle service on that node is not yet ready to
merge the data which causes block pushes to fail until the local executor
registers with it. This fix is to reverse the order, that is, register with the
ESS before registering the `BlockManager`
### Why are the changes needed?
They are needed to fix the issue which causes block pushes to fail.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a UT.
Closes #37052 from otterc/SPARK-39647.
Authored-by: Chandni Singh <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 79ba2890f51c5f676b9cd6e3a6682c7969462999)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
.../org/apache/spark/storage/BlockManager.scala | 30 ++++++++++++------
.../apache/spark/storage/BlockManagerSuite.scala | 36 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d5901888d1a..53d2d054121 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -516,9 +516,27 @@ private[spark] class BlockManager(
ret
}
+ // Register Executors' configuration with the local shuffle service, if
one should exist.
+ // Registration with the ESS should happen before registering the block
manager with the
+ // BlockManagerMaster. In push-based shuffle, the registered BM is
selected by the driver
+ // as a merger. However, for the ESS on this host to be able to merge
blocks successfully,
+ // it needs the merge directories metadata which is provided by the local
executor during
+ // the registration with the ESS. Therefore, this registration should be
prior to
+ // the BlockManager registration. See SPARK-39647.
+ if (externalShuffleServiceEnabled) {
+ logInfo(s"external shuffle service port = $externalShuffleServicePort")
+ shuffleServerId = BlockManagerId(executorId,
blockTransferService.hostName,
+ externalShuffleServicePort)
+ if (!isDriver) {
+ registerWithExternalShuffleServer()
+ }
+ }
+
val id =
BlockManagerId(executorId, blockTransferService.hostName,
blockTransferService.port, None)
+ // The idFromMaster has just additional topology information. Otherwise,
it has the same
+ // executor id/host/port of idWithoutTopologyInfo which is not expected to
be changed.
val idFromMaster = master.registerBlockManager(
id,
diskBlockManager.localDirsString,
@@ -528,16 +546,8 @@ private[spark] class BlockManager(
blockManagerId = if (idFromMaster != null) idFromMaster else id
- shuffleServerId = if (externalShuffleServiceEnabled) {
- logInfo(s"external shuffle service port = $externalShuffleServicePort")
- BlockManagerId(executorId, blockTransferService.hostName,
externalShuffleServicePort)
- } else {
- blockManagerId
- }
-
- // Register Executors' configuration with the local shuffle service, if
one should exist.
- if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
- registerWithExternalShuffleServer()
+ if (!externalShuffleServiceEnabled) {
+ shuffleServerId = blockManagerId
}
hostLocalDirManager = {
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 45e05b2cc2d..874b2b4f005 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -2175,6 +2175,42 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
assert(kryoException.getMessage === "java.io.IOException: Input/output
error")
}
+ test("SPARK-39647: Failure to register with ESS should prevent registering
the BM") {
+ val handler = new NoOpRpcHandler {
+ override def receive(
+ client: TransportClient,
+ message: ByteBuffer,
+ callback: RpcResponseCallback): Unit = {
+ val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
+ msgObj match {
+ case _: RegisterExecutor => () // No reply to generate client-side
timeout
+ }
+ }
+ }
+ val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
+ Utils.tryWithResource(new TransportContext(transConf, handler, true)) {
transCtx =>
+ def newShuffleServer(port: Int): (TransportServer, Int) = {
+ (transCtx.createServer(port,
Seq.empty[TransportServerBootstrap].asJava), port)
+ }
+
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
+ newShuffleServer, conf, "ShuffleServer")
+
+ conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+ conf.set(SHUFFLE_SERVICE_PORT.key, shufflePort.toString)
+ conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
+ conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
+ val e = intercept[SparkException] {
+ makeBlockManager(8000, "timeoutExec")
+ }.getMessage
+ assert(e.contains("TimeoutException"))
+ verify(master, times(0))
+ .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
+ server.close()
+ }
+ }
+
private def createKryoSerializerWithDiskCorruptedInputStream():
KryoSerializer = {
class TestDiskCorruptedInputStream extends InputStream {
override def read(): Int = throw new IOException("Input/output error")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]