This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6e8b4b5a6116 [SPARK-51638][CORE] Fix fetching the remote disk stored RDD blocks via the external shuffle service 6e8b4b5a6116 is described below commit 6e8b4b5a61167479b119a6710e626d7dc885fc5e Author: attilapiros <piros.attila.zs...@gmail.com> AuthorDate: Tue Apr 15 14:34:19 2025 -0700 [SPARK-51638][CORE] Fix fetching the remote disk stored RDD blocks via the external shuffle service ### What changes were proposed in this pull request? Fix remote fetching of disk stored RDD blocks via the external shuffle service when `spark.shuffle.service.fetch.rdd.enabled` is set. ### Why are the changes needed? After https://issues.apache.org/jira/browse/SPARK-43221 remote fetching was handled in `BlockManagerMasterEndpoint#getLocationsAndStatus` at one place where all the location was used along with the `blockManagerInfo` map but this map only includes information about the active executors which are not already killed (after for example downscaling in dynamic allocation or just killed because of a failures). This PR extend the search to all the remote external shuffle services where the `blockStatusByShuffleService` map is used. That map contains block infos even for the killed executors. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? An existing unit test was extended. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50439 from attilapiros/SPARK-51638. Authored-by: attilapiros <piros.attila.zs...@gmail.com> Signed-off-by: attilapiros <piros.attila.zs...@gmail.com> --- .../spark/storage/BlockManagerMasterEndpoint.scala | 56 ++++++++++---------- .../apache/spark/ExternalShuffleServiceSuite.scala | 60 +++++++++++++++++++++- 2 files changed, 86 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index e33161e25f08..858db498e83a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -863,36 +863,36 @@ class BlockManagerMasterEndpoint( blockId: BlockId, requesterHost: String): Option[BlockLocationsAndStatus] = { val allLocations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty) - val hostLocalLocations = allLocations.filter(bmId => bmId.host == requesterHost) - val blockStatusWithBlockManagerId: Option[(BlockStatus, BlockManagerId)] = - (if (externalShuffleServiceRddFetchEnabled) { - // if fetching RDD is enabled from the external shuffle service then first try to find - // the block in the external shuffle service of the same host - val location = hostLocalLocations.find(_.port == externalShuffleServicePort) - location - .flatMap(blockStatusByShuffleService.get(_).flatMap(_.get(blockId))) - .zip(location) - } else { - None - }) - .orElse { - // if the block is not found via the external shuffle service trying to find it in the - // executors running on the same host and persisted on the disk - // using flatMap on iterators makes the transformation lazy - hostLocalLocations.iterator - .flatMap { bmId => - blockManagerInfo.get(bmId).flatMap { blockInfo => - blockInfo.getStatus(blockId).map((_, bmId)) - } - } - .find(_._1.storageLevel.useDisk) - } - .orElse { - // if the block cannot be found in the same host search it in all the executors - val location = allLocations.headOption - location.flatMap(blockManagerInfo.get(_)).flatMap(_.getStatus(blockId)).zip(location) + (if (externalShuffleServiceRddFetchEnabled && blockId.isRDD) { + // If fetching disk persisted RDD from the external shuffle service is enabled then first + // try to find the block in the external shuffle service preferring the one running on + // the same host. This search includes blocks stored on already killed executors as well. + val hostLocalLocations = allLocations.find { bmId => + bmId.host == requesterHost && bmId.port == externalShuffleServicePort } + val location = hostLocalLocations + .orElse(allLocations.find(_.port == externalShuffleServicePort)) + location + .flatMap(blockStatusByShuffleService.get(_).flatMap(_.get(blockId))) + .zip(location) + } else { + // trying to find it in the executors running on the same host and persisted on the disk + // Implementation detail: using flatMap on iterators makes the transformation lazy. + allLocations.filter(_.host == requesterHost).iterator + .flatMap { bmId => + blockManagerInfo.get(bmId).flatMap { blockInfo => + blockInfo.getStatus(blockId).map((_, bmId)) + } + } + .find(_._1.storageLevel.useDisk) + }) + .orElse { + // if the block cannot be found in the same host as a disk stored block then extend the + // search to all active (not killed) executors and to all storage levels + val location = allLocations.headOption + location.flatMap(blockManagerInfo.get(_)).flatMap(_.getStatus(blockId)).zip(location) + } logDebug(s"Identified block: $blockStatusWithBlockManagerId") blockStatusWithBlockManagerId .map { case (blockStatus: BlockStatus, bmId: BlockManagerId) => diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index f0a63247e64b..e9a0c405b0d9 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io.File +import java.nio.ByteBuffer import java.nio.file.Files import java.nio.file.attribute.PosixFilePermission @@ -35,8 +36,9 @@ import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient} -import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} +import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.io.ChunkedByteBuffer /** * This suite creates an external shuffle server and routes all shuffle fetches through it. @@ -117,12 +119,15 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) .set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) .set(config.EXECUTOR_REMOVE_DELAY.key, "0s") + .set(config.DRIVER_BIND_ADDRESS.key, Utils.localHostName()) sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled) sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) try { + val list = List[Int](1, 2, 3, 4) + val broadcast = sc.broadcast(list) val rdd = sc.parallelize(0 until 100, 2) - .map { i => (i, 1) } + .map { i => (i, broadcast.value.size) } .persist(StorageLevel.DISK_ONLY) rdd.count() @@ -173,8 +178,59 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi "external shuffle service port should be contained") } + eventually(timeout(2.seconds), interval(100.milliseconds)) { + val locationStatusForLocalHost = + sc.env.blockManager.master.getLocationsAndStatus(blockId, Utils.localHostName()) + assert(locationStatusForLocalHost.isDefined) + assert(locationStatusForLocalHost.get.localDirs.isDefined) + assert(locationStatusForLocalHost.get.locations.head.executorId == "0") + assert(locationStatusForLocalHost.get.locations.head.host == Utils.localHostName()) + } + + eventually(timeout(2.seconds), interval(100.milliseconds)) { + val locationStatusForRemoteHost = + sc.env.blockManager.master.getLocationsAndStatus(blockId, "<invalid-host>") + assert(locationStatusForRemoteHost.isDefined) + assert(locationStatusForRemoteHost.get.localDirs.isEmpty) + assert(locationStatusForRemoteHost.get.locations.head.executorId == "0") + assert(locationStatusForRemoteHost.get.locations.head.host == Utils.localHostName()) + } + assert(sc.env.blockManager.getRemoteValues(blockId).isDefined) + eventually(timeout(2.seconds), interval(100.milliseconds)) { + val broadcastBlockId = BroadcastBlockId(broadcast.id, "piece0") + val locStatusForMemBroadcast = + sc.env.blockManager.master.getLocationsAndStatus(broadcastBlockId, Utils.localHostName()) + assert(locStatusForMemBroadcast.isDefined) + assert(locStatusForMemBroadcast.get.localDirs.isEmpty) + assert(locStatusForMemBroadcast.get.locations.head.executorId == "driver") + assert(locStatusForMemBroadcast.get.locations.head.host == Utils.localHostName()) + } + + val byteBuffer = ByteBuffer.wrap(Array[Byte](7)) + val bytes = new ChunkedByteBuffer(Array(byteBuffer)) + val diskBroadcastId = BroadcastBlockId(Long.MaxValue, "piece0") + sc.env.blockManager.putBytes(diskBroadcastId, bytes, StorageLevel.DISK_ONLY, + tellMaster = true) + eventually(timeout(2.seconds), interval(100.milliseconds)) { + val locStatusForDiskBroadcast = + sc.env.blockManager.master.getLocationsAndStatus(diskBroadcastId, Utils.localHostName()) + assert(locStatusForDiskBroadcast.isDefined) + assert(locStatusForDiskBroadcast.get.localDirs.isDefined) + assert(locStatusForDiskBroadcast.get.locations.head.executorId == "driver") + assert(locStatusForDiskBroadcast.get.locations.head.host == Utils.localHostName()) + } + + eventually(timeout(2.seconds), interval(100.milliseconds)) { + val locStatusForDiskBroadcastForFetch = + sc.env.blockManager.master.getLocationsAndStatus(diskBroadcastId, "<invalid-host>") + assert(locStatusForDiskBroadcastForFetch.isDefined) + assert(locStatusForDiskBroadcastForFetch.get.localDirs.isEmpty) + assert(locStatusForDiskBroadcastForFetch.get.locations.head.executorId == "driver") + assert(locStatusForDiskBroadcastForFetch.get.locations.head.host == Utils.localHostName()) + } + // test unpersist: as executors are killed the blocks will be removed via the shuffle service rdd.unpersist(true) assert(sc.env.blockManager.getRemoteValues(blockId).isEmpty) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org