This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e8b4b5a6116 [SPARK-51638][CORE] Fix fetching the remote disk stored 
RDD blocks via the external shuffle service
6e8b4b5a6116 is described below

commit 6e8b4b5a61167479b119a6710e626d7dc885fc5e
Author: attilapiros <piros.attila.zs...@gmail.com>
AuthorDate: Tue Apr 15 14:34:19 2025 -0700

    [SPARK-51638][CORE] Fix fetching the remote disk stored RDD blocks via the 
external shuffle service
    
    ### What changes were proposed in this pull request?
    
    Fix remote fetching of disk stored RDD blocks via the external shuffle 
service when `spark.shuffle.service.fetch.rdd.enabled` is set.
    
    ### Why are the changes needed?
    
    After https://issues.apache.org/jira/browse/SPARK-43221 remote fetching was 
handled in `BlockManagerMasterEndpoint#getLocationsAndStatus` at one place 
where all the location was used along with the `blockManagerInfo` map but this 
map only includes information about the active executors which are not already 
killed (after for example downscaling in dynamic allocation or just killed 
because of a failures).
    
    This PR extend the search to all the remote external shuffle services where 
the `blockStatusByShuffleService` map is used. That map contains block infos 
even for the killed executors.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    An existing unit test was extended.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50439 from attilapiros/SPARK-51638.
    
    Authored-by: attilapiros <piros.attila.zs...@gmail.com>
    Signed-off-by: attilapiros <piros.attila.zs...@gmail.com>
---
 .../spark/storage/BlockManagerMasterEndpoint.scala | 56 ++++++++++----------
 .../apache/spark/ExternalShuffleServiceSuite.scala | 60 +++++++++++++++++++++-
 2 files changed, 86 insertions(+), 30 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index e33161e25f08..858db498e83a 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -863,36 +863,36 @@ class BlockManagerMasterEndpoint(
       blockId: BlockId,
       requesterHost: String): Option[BlockLocationsAndStatus] = {
     val allLocations = 
Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
-    val hostLocalLocations = allLocations.filter(bmId => bmId.host == 
requesterHost)
-
     val blockStatusWithBlockManagerId: Option[(BlockStatus, BlockManagerId)] =
-      (if (externalShuffleServiceRddFetchEnabled) {
-         // if fetching RDD is enabled from the external shuffle service then 
first try to find
-         // the block in the external shuffle service of the same host
-         val location = hostLocalLocations.find(_.port == 
externalShuffleServicePort)
-         location
-           .flatMap(blockStatusByShuffleService.get(_).flatMap(_.get(blockId)))
-           .zip(location)
-       } else {
-         None
-       })
-        .orElse {
-          // if the block is not found via the external shuffle service trying 
to find it in the
-          // executors running on the same host and persisted on the disk
-          // using flatMap on iterators makes the transformation lazy
-          hostLocalLocations.iterator
-            .flatMap { bmId =>
-              blockManagerInfo.get(bmId).flatMap { blockInfo =>
-                blockInfo.getStatus(blockId).map((_, bmId))
-              }
-            }
-            .find(_._1.storageLevel.useDisk)
-        }
-        .orElse {
-          // if the block cannot be found in the same host search it in all 
the executors
-          val location = allLocations.headOption
-          
location.flatMap(blockManagerInfo.get(_)).flatMap(_.getStatus(blockId)).zip(location)
+      (if (externalShuffleServiceRddFetchEnabled && blockId.isRDD) {
+        // If fetching disk persisted RDD from the external shuffle service is 
enabled then first
+        // try to find the block in the external shuffle service preferring 
the one running on
+        // the same host. This search includes blocks stored on already killed 
executors as well.
+        val hostLocalLocations = allLocations.find { bmId =>
+          bmId.host == requesterHost && bmId.port == externalShuffleServicePort
         }
+        val location = hostLocalLocations
+          .orElse(allLocations.find(_.port == externalShuffleServicePort))
+        location
+          .flatMap(blockStatusByShuffleService.get(_).flatMap(_.get(blockId)))
+          .zip(location)
+      } else {
+        // trying to find it in the executors running on the same host and 
persisted on the disk
+        // Implementation detail: using flatMap on iterators makes the 
transformation lazy.
+        allLocations.filter(_.host == requesterHost).iterator
+          .flatMap { bmId =>
+            blockManagerInfo.get(bmId).flatMap { blockInfo =>
+              blockInfo.getStatus(blockId).map((_, bmId))
+            }
+          }
+          .find(_._1.storageLevel.useDisk)
+      })
+      .orElse {
+        // if the block cannot be found in the same host as a disk stored 
block then extend the
+        // search to all active (not killed) executors and to all storage 
levels
+        val location = allLocations.headOption
+        
location.flatMap(blockManagerInfo.get(_)).flatMap(_.getStatus(blockId)).zip(location)
+      }
     logDebug(s"Identified block: $blockStatusWithBlockManagerId")
     blockStatusWithBlockManagerId
       .map { case (blockStatus: BlockStatus, bmId: BlockManagerId) =>
diff --git 
a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala 
b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index f0a63247e64b..e9a0c405b0d9 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io.File
+import java.nio.ByteBuffer
 import java.nio.file.Files
 import java.nio.file.attribute.PosixFilePermission
 
@@ -35,8 +36,9 @@ import org.apache.spark.network.TransportContext
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.server.TransportServer
 import org.apache.spark.network.shuffle.{ExecutorDiskUtils, 
ExternalBlockHandler, ExternalBlockStoreClient}
-import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId, 
ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
+import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, 
ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
 import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * This suite creates an external shuffle server and routes all shuffle 
fetches through it.
@@ -117,12 +119,15 @@ class ExternalShuffleServiceSuite extends ShuffleSuite 
with BeforeAndAfterAll wi
       .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
       .set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
       .set(config.EXECUTOR_REMOVE_DELAY.key, "0s")
+      .set(config.DRIVER_BIND_ADDRESS.key, Utils.localHostName())
     sc = new SparkContext("local-cluster[1,1,1024]", "test", 
confWithRddFetchEnabled)
     sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
     sc.env.blockManager.blockStoreClient.getClass should 
equal(classOf[ExternalBlockStoreClient])
     try {
+      val list = List[Int](1, 2, 3, 4)
+      val broadcast = sc.broadcast(list)
       val rdd = sc.parallelize(0 until 100, 2)
-        .map { i => (i, 1) }
+        .map { i => (i, broadcast.value.size) }
         .persist(StorageLevel.DISK_ONLY)
 
       rdd.count()
@@ -173,8 +178,59 @@ class ExternalShuffleServiceSuite extends ShuffleSuite 
with BeforeAndAfterAll wi
           "external shuffle service port should be contained")
       }
 
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        val locationStatusForLocalHost =
+          sc.env.blockManager.master.getLocationsAndStatus(blockId, 
Utils.localHostName())
+        assert(locationStatusForLocalHost.isDefined)
+        assert(locationStatusForLocalHost.get.localDirs.isDefined)
+        assert(locationStatusForLocalHost.get.locations.head.executorId == "0")
+        assert(locationStatusForLocalHost.get.locations.head.host == 
Utils.localHostName())
+      }
+
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        val locationStatusForRemoteHost =
+          sc.env.blockManager.master.getLocationsAndStatus(blockId, 
"<invalid-host>")
+        assert(locationStatusForRemoteHost.isDefined)
+        assert(locationStatusForRemoteHost.get.localDirs.isEmpty)
+        assert(locationStatusForRemoteHost.get.locations.head.executorId == 
"0")
+        assert(locationStatusForRemoteHost.get.locations.head.host == 
Utils.localHostName())
+      }
+
       assert(sc.env.blockManager.getRemoteValues(blockId).isDefined)
 
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        val broadcastBlockId = BroadcastBlockId(broadcast.id, "piece0")
+        val locStatusForMemBroadcast =
+          sc.env.blockManager.master.getLocationsAndStatus(broadcastBlockId, 
Utils.localHostName())
+        assert(locStatusForMemBroadcast.isDefined)
+        assert(locStatusForMemBroadcast.get.localDirs.isEmpty)
+        assert(locStatusForMemBroadcast.get.locations.head.executorId == 
"driver")
+        assert(locStatusForMemBroadcast.get.locations.head.host == 
Utils.localHostName())
+      }
+
+      val byteBuffer = ByteBuffer.wrap(Array[Byte](7))
+      val bytes = new ChunkedByteBuffer(Array(byteBuffer))
+      val diskBroadcastId = BroadcastBlockId(Long.MaxValue, "piece0")
+      sc.env.blockManager.putBytes(diskBroadcastId, bytes, 
StorageLevel.DISK_ONLY,
+        tellMaster = true)
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        val locStatusForDiskBroadcast =
+          sc.env.blockManager.master.getLocationsAndStatus(diskBroadcastId, 
Utils.localHostName())
+        assert(locStatusForDiskBroadcast.isDefined)
+        assert(locStatusForDiskBroadcast.get.localDirs.isDefined)
+        assert(locStatusForDiskBroadcast.get.locations.head.executorId == 
"driver")
+        assert(locStatusForDiskBroadcast.get.locations.head.host == 
Utils.localHostName())
+      }
+
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        val locStatusForDiskBroadcastForFetch =
+          sc.env.blockManager.master.getLocationsAndStatus(diskBroadcastId, 
"<invalid-host>")
+        assert(locStatusForDiskBroadcastForFetch.isDefined)
+        assert(locStatusForDiskBroadcastForFetch.get.localDirs.isEmpty)
+        assert(locStatusForDiskBroadcastForFetch.get.locations.head.executorId 
== "driver")
+        assert(locStatusForDiskBroadcastForFetch.get.locations.head.host == 
Utils.localHostName())
+      }
+
       // test unpersist: as executors are killed the blocks will be removed 
via the shuffle service
       rdd.unpersist(true)
       assert(sc.env.blockManager.getRemoteValues(blockId).isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to