This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c3f46d5c6d6 [SPARK-41360][CORE] Avoid BlockManager re-registration if 
the executor has been lost
c3f46d5c6d6 is described below

commit c3f46d5c6d69a9b21473dae6d86dee53833dfd52
Author: Yi Wu <yi...@databricks.com>
AuthorDate: Mon Dec 12 13:29:37 2022 -0600

    [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has 
been lost
    
    ### What changes were proposed in this pull request?
    
    This PR majorly proposes to reject the block manager re-registration if the 
executor has been already considered lost/dead from the scheduler backend.
    
    Along with the major proposal, this PR also includes a few other changes:
    * Only post `SparkListenerBlockManagerAdded` event when the registration 
succeeds
    * Return an "invalid" executor id when the re-registration fails
    * Do not report all blocks when the re-registration fails
    
    ### Why are the changes needed?
    
    BlockManager re-registration from lost executor (terminated/terminating 
executor or orphan executor) has led to some known issues, e.g., false-active 
executor shows up in UP (SPARK-35011), [block fetching to the dead 
executor](https://github.com/apache/spark/pull/32114#issuecomment-899979045). 
And since there's no re-registration from the lost executor itself, it's 
meaningless to have BlockManager re-registration when the executor is already 
lost.
    
    Regarding the corner case where the re-registration event comes earlier 
before the lost executor is actually removed from the scheduler backend, I 
think it is not possible. Because re-registration will only be required when 
the BlockManager doesn't see the block manager in `blockManagerInfo`. And the 
block manager will only be removed from `blockManagerInfo` whether when the 
executor is already know lost or removed by the driver proactively. So the 
executor should always be removed fr [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test
    
    Closes #38876 from Ngone51/fix-blockmanager-reregister.
    
    Authored-by: Yi Wu <yi...@databricks.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/storage/BlockManager.scala    | 11 ++--
 .../org/apache/spark/storage/BlockManagerId.scala  |  2 +
 .../apache/spark/storage/BlockManagerMaster.scala  | 20 ++++++--
 .../spark/storage/BlockManagerMasterEndpoint.scala | 41 ++++++++++++---
 .../spark/storage/BlockManagerMessages.scala       |  3 +-
 .../apache/spark/storage/BlockManagerSuite.scala   | 58 +++++++++++++++++++++-
 6 files changed, 119 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d5fde96b146..1067ee15567 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -637,9 +637,14 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, 
diskBlockManager.localDirsString, maxOnHeapMemory,
-      maxOffHeapMemory, storageEndpoint)
-    reportAllBlocks()
+    val id = master.registerBlockManager(blockManagerId, 
diskBlockManager.localDirsString,
+      maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
+    if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
+      reportAllBlocks()
+    } else {
+      logError("Exiting executor due to block manager re-registration failure")
+      System.exit(-1)
+    }
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index c6a4457d8f9..12e416bbb36 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -147,4 +147,6 @@ private[spark] object BlockManagerId {
   }
 
   private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"
+
+  private[spark] val INVALID_EXECUTOR_ID = "invalid"
 }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 40008e6afbf..0ee3dc249d5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -74,11 +74,25 @@ class BlockManagerMaster(
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      storageEndpoint: RpcEndpointRef): BlockManagerId = {
+      storageEndpoint: RpcEndpointRef,
+      isReRegister: Boolean = false): BlockManagerId = {
     logInfo(s"Registering BlockManager $id")
     val updatedId = driverEndpoint.askSync[BlockManagerId](
-      RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, 
storageEndpoint))
-    logInfo(s"Registered BlockManager $updatedId")
+      RegisterBlockManager(
+        id,
+        localDirs,
+        maxOnHeapMemSize,
+        maxOffHeapMemSize,
+        storageEndpoint,
+        isReRegister
+      )
+    )
+    if (updatedId.executorId == BlockManagerId.INVALID_EXECUTOR_ID) {
+      assert(isReRegister, "Got invalid executor id from non re-register case")
+      logInfo(s"Re-register BlockManager $id failed")
+    } else {
+      logInfo(s"Registered BlockManager $updatedId")
+    }
     updatedId
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 495d91fe0e4..d30272c51be 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -117,8 +117,10 @@ class BlockManagerMasterEndpoint(
     RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, 
rpcEnv)
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
-    case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, 
maxOffHeapMemSize, endpoint) =>
-      context.reply(register(id, localDirs, maxOnHeapMemSize, 
maxOffHeapMemSize, endpoint))
+    case RegisterBlockManager(
+      id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, 
isReRegister) =>
+      context.reply(
+        register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, 
isReRegister))
 
     case _updateBlockInfo @
         UpdateBlockInfo(blockManagerId, blockId, storageLevel, 
deserializedSize, size) =>
@@ -572,7 +574,8 @@ class BlockManagerMasterEndpoint(
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      storageEndpoint: RpcEndpointRef): BlockManagerId = {
+      storageEndpoint: RpcEndpointRef,
+      isReRegister: Boolean): BlockManagerId = {
     // the dummy id is not expected to contain the topology information.
     // we get that info here and respond back with a more fleshed out block 
manager id
     val id = BlockManagerId(
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
 
     val time = System.currentTimeMillis()
     executorIdToLocalDirs.put(id.executorId, localDirs)
-    if (!blockManagerInfo.contains(id)) {
+    // SPARK-41360: For the block manager re-registration, we should only 
allow it when
+    // the executor is recognized as active by the scheduler backend. 
Otherwise, this kind
+    // of re-registration from the terminating/stopped executor is meaningless 
and harmful.
+    lazy val isExecutorAlive =
+      
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
+    if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {
       blockManagerIdByExecutor.get(id.executorId) match {
         case Some(oldId) =>
           // A block manager of the same executor already exists, so remove it 
(assumed dead)
@@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint(
       if (pushBasedShuffleEnabled) {
         addMergerLocation(id)
       }
+      listenerBus.post(SparkListenerBlockManagerAdded(time, id,
+        maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), 
Some(maxOffHeapMemSize)))
     }
-    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize 
+ maxOffHeapMemSize,
-        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
-    id
+    val updatedId = if (isReRegister && !isExecutorAlive) {
+      assert(!blockManagerInfo.contains(id),
+        "BlockManager re-registration shouldn't succeed when the executor is 
lost")
+
+      logInfo(s"BlockManager ($id) re-registration is rejected since " +
+        s"the executor (${id.executorId}) has been lost")
+
+      // Use "invalid" as the return executor id to indicate the block manager 
that
+      // re-registration failed. It's a bit hacky but fine since the returned 
block
+      // manager id won't be accessed in the case of re-registration. And 
we'll use
+      // this "invalid" executor id to print better logs and avoid blocks 
reporting.
+      BlockManagerId(
+        BlockManagerId.INVALID_EXECUTOR_ID,
+        id.host,
+        id.port,
+        id.topologyInfo)
+    } else {
+      id
+    }
+    updatedId
   }
 
  private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: 
BlockManagerId)
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index afe416a55ed..e047b61fcb1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
       localDirs: Array[String],
       maxOnHeapMemSize: Long,
       maxOffHeapMemSize: Long,
-      sender: RpcEndpointRef)
+      sender: RpcEndpointRef,
+      isReRegister: Boolean)
     extends ToBlockManagerMaster
 
   case class UpdateBlockInfo(
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index c8914761b94..842b66193f2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -295,7 +295,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with PrivateMethodTe
     eventually(timeout(5.seconds)) {
       // make sure both bm1 and bm2 are registered at driver side 
BlockManagerMaster
       verify(master, times(2))
-        .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
+        .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), 
mc.any(), mc.any())
       assert(driverEndpoint.askSync[Boolean](
         CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId)))
       assert(driverEndpoint.askSync[Boolean](
@@ -361,6 +361,44 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
     master.removeShuffle(0, true)
   }
 
+  test("SPARK-41360: Avoid block manager re-registration if the executor has 
been lost") {
+    // Set up a DriverEndpoint which always returns isExecutorAlive=false
+    rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
+      new RpcEndpoint {
+        override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv
+
+        override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+          case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, 
_, _, _, _, _) =>
+            context.reply(true)
+          case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
+            // always return false
+            context.reply(false)
+        }
+      }
+    )
+
+    // Set up a block manager endpoint and endpoint reference
+    val bmRef = rpcEnv.setupEndpoint(s"bm-0", new RpcEndpoint {
+      override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv
+
+      private def reply[T](context: RpcCallContext, response: T): Unit = {
+        context.reply(response)
+      }
+
+      override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+        case RemoveRdd(_) => reply(context, 1)
+        case RemoveBroadcast(_, _) => reply(context, 1)
+        case RemoveShuffle(_) => reply(context, true)
+      }
+    })
+    val bmId = BlockManagerId(s"exec-0", "localhost", 1234, None)
+    // Register the block manager with isReRegister = true
+    val updatedId = master.registerBlockManager(
+      bmId, Array.empty, 2000, 0, bmRef, isReRegister = true)
+    // The re-registration should fail since the executor is considered as 
dead by DriverEndpoint
+    assert(updatedId.executorId === BlockManagerId.INVALID_EXECUTOR_ID)
+  }
+
   test("StorageLevel object caching") {
     val level1 = StorageLevel(false, false, false, 3)
     // this should return the same object as level1
@@ -669,6 +707,22 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
     val a1 = new Array[Byte](400)
     val a2 = new Array[Byte](400)
 
+    // Set up a DriverEndpoint which simulates the executor is alive (required 
by SPARK-41360)
+    rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
+      new RpcEndpoint {
+        override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv
+
+        override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+          case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
+            if (executorId == store.blockManagerId.executorId) {
+              context.reply(true)
+            } else {
+              context.reply(false)
+            }
+        }
+      }
+    )
+
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     assert(master.getLocations("a1").size > 0, "master was not told about a1")
 
@@ -2207,7 +2261,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
       }.getMessage
       assert(e.contains("TimeoutException"))
       verify(master, times(0))
-        .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
+        .registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), 
mc.any(), mc.any())
       server.close()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to