This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new ca8407b97338 [SPARK-47702][CORE][3.5] Remove Shuffle service endpoint 
from the locations list when RDD block is removed form a node
ca8407b97338 is described below

commit ca8407b9733882fd28a6f94fd88aef27f3b648d0
Author: maheshbehera <[email protected]>
AuthorDate: Sat Oct 5 12:56:52 2024 +0800

    [SPARK-47702][CORE][3.5] Remove Shuffle service endpoint from the locations 
list when RDD block is removed form a node
    
    Credit to maheshk114 for the initial investigation and the fix.
    
    This PR fix a bug where the shuffle service's ID is kept among the block 
location list at the removing of a RDD block from a node. Before this change 
`StorageLevel.NONE` is used to notify about the block remove which causes the 
block manager master ignoring the update of the locations for shuffle service's 
IDs (for details please see the method 
`BlockManagerMasterEndpoint#updateBlockInfo()` and keep in mind 
`StorageLevel.NONE.useDisk` is `false`). But after this change only the replic 
[...]
    
    If the block location is not updated properly, then tasks fails with fetch 
failed exception. The tasks will try to read the RDD blocks from a node using 
external shuffle service. The read will fail, if the node is already 
decommissioned.
    
    ```
    WARN BlockManager [Executor task launch worker for task 25.0 in stage 6.0 
(TID 1567)]: Failed to fetch remote block rdd_5_25 from BlockManagerId(4, 
vm-92303839, 7337, None) (failed attempt 1)
    org.apache.spark.SparkException: Exception thrown in awaitResult:
            at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
            at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
            at 
org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1155)
            at 
org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1099)
            at scala.Option.orElse(Option.scala:447)
            at 
org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1099)
            at 
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1045)
            at 
org.apache.spark.storage.BlockManager.get(BlockManager.scala:1264)
            at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1326)
    ```
    
    No
    
    Added a new UT.
    
    No
    
    Closes #48357 from attilapiros/SPARK-47702-Spark3.5.
    
    Authored-by: maheshbehera <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../org/apache/spark/storage/BlockManager.scala    |  6 ++--
 .../storage/BlockManagerReplicationSuite.scala     | 36 ++++++++++++++++++++++
 .../apache/spark/storage/BlockManagerSuite.scala   | 13 +++++---
 3 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6de6069d2fea..1b56aa7ade12 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -2082,8 +2082,10 @@ private[spark] class BlockManager(
       hasRemoveBlock = true
       if (tellMaster) {
         // Only update storage level from the captured block status before 
deleting, so that
-        // memory size and disk size are being kept for calculating delta.
-        reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = 
StorageLevel.NONE))
+        // memory size and disk size are being kept for calculating delta. 
Reset the replica
+        // count 0 in storage level to notify that it is a remove operation.
+        val storageLevel = StorageLevel(blockStatus.get.storageLevel.toInt, 0)
+        reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = 
storageLevel))
       }
     } finally {
       if (!hasRemoveBlock) {
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 38a669bc8574..29526684c3e9 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,6 +38,8 @@ import org.apache.spark.internal.config.Tests._
 import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
+import org.apache.spark.network.shuffle.ExternalBlockStoreClient
+import org.apache.spark.network.util.{MapConfigProvider, TransportConf}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
@@ -295,6 +297,40 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     }
   }
 
+  test("Test block location after replication with 
SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") {
+    val newConf = conf.clone()
+    newConf.set(SHUFFLE_SERVICE_ENABLED, true)
+    newConf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
+    val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]()
+    val shuffleClient = Some(new ExternalBlockStoreClient(
+        new TransportConf("shuffle", MapConfigProvider.EMPTY),
+        null, false, 5000))
+    master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager-2",
+      new BlockManagerMasterEndpoint(rpcEnv, true, newConf,
+        new LiveListenerBus(newConf), shuffleClient, blockManagerInfo, 
mapOutputTracker,
+        sc.env.shuffleManager, isDriver = true)),
+      rpcEnv.setupEndpoint("blockmanagerHeartbeat-2",
+      new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, 
blockManagerInfo)), newConf, true)
+
+    val shuffleServicePort = newConf.get(SHUFFLE_SERVICE_PORT)
+    val store1 = makeBlockManager(10000, "host-1")
+    val store2 = makeBlockManager(10000, "host-2")
+    assert(master.getPeers(store1.blockManagerId).toSet === 
Set(store2.blockManagerId))
+
+    val blockId = RDDBlockId(1, 2)
+    val message = new Array[Byte](1000)
+
+    // if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port 
should be present.
+    store1.putSingle(blockId, message, StorageLevel.DISK_ONLY)
+    assert(master.getLocations(blockId).contains(
+      BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
+
+    // after block is removed, shuffle port should be removed.
+    store1.removeBlock(blockId, true)
+    assert(!master.getLocations(blockId).contains(
+      BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
+  }
+
   test("block replication - addition and deletion of block managers") {
     val blockSize = 1000
     val storeSize = 10000
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index ecd66dc2c5fb..728e3a252b7a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -33,7 +33,7 @@ import scala.reflect.classTag
 import com.esotericsoftware.kryo.KryoException
 import org.apache.commons.lang3.RandomUtils
 import org.mockito.{ArgumentCaptor, ArgumentMatchers => mc}
-import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify, when}
+import org.mockito.Mockito.{atLeastOnce, doAnswer, mock, never, spy, times, 
verify, when}
 import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.concurrent.Eventually._
@@ -666,7 +666,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with PrivateMethodTe
       removedFromMemory: Boolean,
       removedFromDisk: Boolean): Unit = {
     def assertSizeReported(captor: ArgumentCaptor[Long], expectRemoved: 
Boolean): Unit = {
-      assert(captor.getAllValues().size() === 1)
+      assert(captor.getAllValues().size() >= 1)
       if (expectRemoved) {
         assert(captor.getValue() > 0)
       } else {
@@ -676,15 +676,18 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
 
     val memSizeCaptor = 
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
     val diskSizeCaptor = 
ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
-    verify(master).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
-      mc.eq(StorageLevel.NONE), memSizeCaptor.capture(), 
diskSizeCaptor.capture())
+    val storageLevelCaptor =
+      
ArgumentCaptor.forClass(classOf[StorageLevel]).asInstanceOf[ArgumentCaptor[StorageLevel]]
+    verify(master, atLeastOnce()).updateBlockInfo(mc.eq(store.blockManagerId), 
mc.eq(blockId),
+      storageLevelCaptor.capture(), memSizeCaptor.capture(), 
diskSizeCaptor.capture())
     assertSizeReported(memSizeCaptor, removedFromMemory)
     assertSizeReported(diskSizeCaptor, removedFromDisk)
+    assert(storageLevelCaptor.getValue.replication == 0)
   }
 
   private def assertUpdateBlockInfoNotReported(store: BlockManager, blockId: 
BlockId): Unit = {
     verify(master, never()).updateBlockInfo(mc.eq(store.blockManagerId), 
mc.eq(blockId),
-      mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
+      mc.any[StorageLevel](), mc.anyInt(), mc.anyInt())
   }
 
   test("reregistration on heart beat") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to