This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c3f46d5c6d6 [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost c3f46d5c6d6 is described below commit c3f46d5c6d69a9b21473dae6d86dee53833dfd52 Author: Yi Wu <yi...@databricks.com> AuthorDate: Mon Dec 12 13:29:37 2022 -0600 [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](https://github.com/apache/spark/pull/32114#issuecomment-899979045). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed fr [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <yi...@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../org/apache/spark/storage/BlockManager.scala | 11 ++-- .../org/apache/spark/storage/BlockManagerId.scala | 2 + .../apache/spark/storage/BlockManagerMaster.scala | 20 ++++++-- .../spark/storage/BlockManagerMasterEndpoint.scala | 41 ++++++++++++--- .../spark/storage/BlockManagerMessages.scala | 3 +- .../apache/spark/storage/BlockManagerSuite.scala | 58 +++++++++++++++++++++- 6 files changed, 119 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d5fde96b146..1067ee15567 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -637,9 +637,14 @@ private[spark] class BlockManager( def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory, - maxOffHeapMemory, storageEndpoint) - reportAllBlocks() + val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, + maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true) + if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) { + reportAllBlocks() + } else { + logError("Exiting executor due to block manager re-registration failure") + System.exit(-1) + } } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index c6a4457d8f9..12e416bbb36 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -147,4 +147,6 @@ private[spark] object BlockManagerId { } private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger" + + private[spark] val INVALID_EXECUTOR_ID = "invalid" } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 40008e6afbf..0ee3dc249d5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -74,11 +74,25 @@ class BlockManagerMaster( localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, - storageEndpoint: RpcEndpointRef): BlockManagerId = { + storageEndpoint: RpcEndpointRef, + isReRegister: Boolean = false): BlockManagerId = { logInfo(s"Registering BlockManager $id") val updatedId = driverEndpoint.askSync[BlockManagerId]( - RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint)) - logInfo(s"Registered BlockManager $updatedId") + RegisterBlockManager( + id, + localDirs, + maxOnHeapMemSize, + maxOffHeapMemSize, + storageEndpoint, + isReRegister + ) + ) + if (updatedId.executorId == BlockManagerId.INVALID_EXECUTOR_ID) { + assert(isReRegister, "Got invalid executor id from non re-register case") + logInfo(s"Re-register BlockManager $id failed") + } else { + logInfo(s"Registered BlockManager $updatedId") + } updatedId } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 495d91fe0e4..d30272c51be 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -117,8 +117,10 @@ class BlockManagerMasterEndpoint( RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint) => - context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint)) + case RegisterBlockManager( + id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister) => + context.reply( + register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => @@ -572,7 +574,8 @@ class BlockManagerMasterEndpoint( localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, - storageEndpoint: RpcEndpointRef): BlockManagerId = { + storageEndpoint: RpcEndpointRef, + isReRegister: Boolean): BlockManagerId = { // the dummy id is not expected to contain the topology information. // we get that info here and respond back with a more fleshed out block manager id val id = BlockManagerId( @@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint( val time = System.currentTimeMillis() executorIdToLocalDirs.put(id.executorId, localDirs) - if (!blockManagerInfo.contains(id)) { + // SPARK-41360: For the block manager re-registration, we should only allow it when + // the executor is recognized as active by the scheduler backend. Otherwise, this kind + // of re-registration from the terminating/stopped executor is meaningless and harmful. + lazy val isExecutorAlive = + driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId)) + if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(oldId) => // A block manager of the same executor already exists, so remove it (assumed dead) @@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint( if (pushBasedShuffleEnabled) { addMergerLocation(id) } + listenerBus.post(SparkListenerBlockManagerAdded(time, id, + maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) } - listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, - Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) - id + val updatedId = if (isReRegister && !isExecutorAlive) { + assert(!blockManagerInfo.contains(id), + "BlockManager re-registration shouldn't succeed when the executor is lost") + + logInfo(s"BlockManager ($id) re-registration is rejected since " + + s"the executor (${id.executorId}) has been lost") + + // Use "invalid" as the return executor id to indicate the block manager that + // re-registration failed. It's a bit hacky but fine since the returned block + // manager id won't be accessed in the case of re-registration. And we'll use + // this "invalid" executor id to print better logs and avoid blocks reporting. + BlockManagerId( + BlockManagerId.INVALID_EXECUTOR_ID, + id.host, + id.port, + id.topologyInfo) + } else { + id + } + updatedId } private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index afe416a55ed..e047b61fcb1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages { localDirs: Array[String], maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, - sender: RpcEndpointRef) + sender: RpcEndpointRef, + isReRegister: Boolean) extends ToBlockManagerMaster case class UpdateBlockInfo( diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c8914761b94..842b66193f2 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -295,7 +295,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe eventually(timeout(5.seconds)) { // make sure both bm1 and bm2 are registered at driver side BlockManagerMaster verify(master, times(2)) - .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) assert(driverEndpoint.askSync[Boolean]( CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId))) assert(driverEndpoint.askSync[Boolean]( @@ -361,6 +361,44 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe master.removeShuffle(0, true) } + test("SPARK-41360: Avoid block manager re-registration if the executor has been lost") { + // Set up a DriverEndpoint which always returns isExecutorAlive=false + rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, + new RpcEndpoint { + override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, _, _, _, _, _) => + context.reply(true) + case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) => + // always return false + context.reply(false) + } + } + ) + + // Set up a block manager endpoint and endpoint reference + val bmRef = rpcEnv.setupEndpoint(s"bm-0", new RpcEndpoint { + override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv + + private def reply[T](context: RpcCallContext, response: T): Unit = { + context.reply(response) + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RemoveRdd(_) => reply(context, 1) + case RemoveBroadcast(_, _) => reply(context, 1) + case RemoveShuffle(_) => reply(context, true) + } + }) + val bmId = BlockManagerId(s"exec-0", "localhost", 1234, None) + // Register the block manager with isReRegister = true + val updatedId = master.registerBlockManager( + bmId, Array.empty, 2000, 0, bmRef, isReRegister = true) + // The re-registration should fail since the executor is considered as dead by DriverEndpoint + assert(updatedId.executorId === BlockManagerId.INVALID_EXECUTOR_ID) + } + test("StorageLevel object caching") { val level1 = StorageLevel(false, false, false, 3) // this should return the same object as level1 @@ -669,6 +707,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) + // Set up a DriverEndpoint which simulates the executor is alive (required by SPARK-41360) + rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, + new RpcEndpoint { + override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) => + if (executorId == store.blockManagerId.executorId) { + context.reply(true) + } else { + context.reply(false) + } + } + } + ) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(master.getLocations("a1").size > 0, "master was not told about a1") @@ -2207,7 +2261,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe }.getMessage assert(e.contains("TimeoutException")) verify(master, times(0)) - .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) + .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any()) server.close() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org