This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 5180694705be [SPARK-46957][CORE][3.5][3.4] Decommission migrated
shuffle files should be able to cleanup from executor
5180694705be is described below
commit 5180694705be3508bd21dd9b863a59b8cb8ba193
Author: Yi Wu <[email protected]>
AuthorDate: Thu Jun 27 23:54:35 2024 +0800
[SPARK-46957][CORE][3.5][3.4] Decommission migrated shuffle files should be
able to cleanup from executor
### What changes were proposed in this pull request?
This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the
migrated shuffle files on the destination executor.
### Why are the changes needed?
This is a long-standing bug in decommission where the migrated shuffle
files can't be cleaned up from the executor. Normally, the shuffle files are
tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving
the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those
shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated
shuffle files by decommission, they lose the track in the destination
executor's `taskIdMapsForShuffle` [...]
Note this bug only affects shuffle removal on the executor. For shuffle
removal on the external shuffle service (when
`spark.shuffle.service.removeShuffle` enabled and the executor stores the
shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the
specific shuffle block id to locate the shuffle file directly. So it won't be
an issue there.
### Does this PR introduce _any_ user-facing change?
No. (Common users won't see the difference underlying.)
### How was this patch tested?
Add unit test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47122 from Ngone51/SPARK-46957-3.5.
Authored-by: Yi Wu <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit b28ddb176fd87aa0bab4afe5e0db4fc4c3ec9c59)
Signed-off-by: Kent Yao <[email protected]>
---
.../io/LocalDiskShuffleExecutorComponents.java | 6 +-
.../spark/shuffle/IndexShuffleBlockResolver.scala | 20 +++++-
.../spark/shuffle/sort/SortShuffleManager.scala | 3 +-
.../shuffle/sort/UnsafeShuffleWriterSuite.java | 6 +-
.../BlockManagerDecommissionIntegrationSuite.scala | 76 ++++++++++++++++++++++
5 files changed, 106 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
index eb4d9d9abc8e..38f0a60f8b0d 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.sort.io;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
@@ -56,7 +57,10 @@ public class LocalDiskShuffleExecutorComponents implements
ShuffleExecutorCompon
if (blockManager == null) {
throw new IllegalStateException("No blockManager available from the
SparkEnv.");
}
- blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
+ blockResolver =
+ new IndexShuffleBlockResolver(
+ sparkConf, blockManager, Collections.emptyMap() /* Shouldn't be
accessed */
+ );
}
@Override
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index d41321b4597b..ec1717f13775 100644
---
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -21,6 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
+import java.util.{Collections, Map => JMap}
import scala.collection.mutable.ArrayBuffer
@@ -36,6 +37,7 @@ import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.OpenHashSet
/**
* Create and maintain the shuffle blocks' mapping between logic block and
physical file location.
@@ -51,7 +53,8 @@ import org.apache.spark.util.Utils
private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
// var for testing
- var _blockManager: BlockManager = null)
+ var _blockManager: BlockManager = null,
+ val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] =
Collections.emptyMap())
extends ShuffleBlockResolver
with Logging with MigratableResolver {
@@ -268,6 +271,21 @@ private[spark] class IndexShuffleBlockResolver(
throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
}
}
+ blockId match {
+ case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+ val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
+ shuffleId, _ => new OpenHashSet[Long](8)
+ )
+ mapTaskIds.add(mapId)
+
+ case ShuffleDataBlockId(shuffleId, mapId, _) =>
+ val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
+ shuffleId, _ => new OpenHashSet[Long](8)
+ )
+ mapTaskIds.add(mapId)
+
+ case _ => // Unreachable
+ }
blockManager.reportBlockStatus(blockId,
BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
}
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 79dff6f87534..4234d0ec5fd0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -87,7 +87,8 @@ private[spark] class SortShuffleManager(conf: SparkConf)
extends ShuffleManager
private lazy val shuffleExecutorComponents =
loadShuffleExecutorComponents(conf)
- override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
+ override val shuffleBlockResolver =
+ new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle =
taskIdMapsForShuffle)
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
diff --git
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 1fa17b908699..472d03baeae0 100644
---
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -314,7 +314,8 @@ public class UnsafeShuffleWriterSuite implements
ShuffleChecksumTestHelper {
@Test
public void writeChecksumFileWithoutSpill() throws Exception {
- IndexShuffleBlockResolver blockResolver = new
IndexShuffleBlockResolver(conf, blockManager);
+ IndexShuffleBlockResolver blockResolver =
+ new IndexShuffleBlockResolver(conf, blockManager,
Collections.emptyMap());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0,
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm =
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
@@ -344,7 +345,8 @@ public class UnsafeShuffleWriterSuite implements
ShuffleChecksumTestHelper {
@Test
public void writeChecksumFileWithSpill() throws Exception {
- IndexShuffleBlockResolver blockResolver = new
IndexShuffleBlockResolver(conf, blockManager);
+ IndexShuffleBlockResolver blockResolver =
+ new IndexShuffleBlockResolver(conf, blockManager,
Collections.emptyMap());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0,
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm =
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index d9d2e6102f12..2ba348222f7b 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -17,12 +17,14 @@
package org.apache.spark.storage
+import java.io.File
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
Semaphore, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
+import org.apache.commons.io.FileUtils
import org.scalatest.concurrent.Eventually
import org.apache.spark._
@@ -352,4 +354,78 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
import scala.language.reflectiveCalls
assert(listener.removeReasonValidated)
}
+
+ test("SPARK-46957: Migrated shuffle files should be able to cleanup from
executor") {
+
+ val sparkTempDir = System.getProperty("java.io.tmpdir")
+
+ def shuffleFiles: Seq[File] = {
+ FileUtils
+ .listFiles(new File(sparkTempDir), Array("data", "index"), true)
+ .asScala
+ .toSeq
+ }
+
+ val existingShuffleFiles = shuffleFiles
+
+ val conf = new SparkConf()
+ .setAppName("SPARK-46957")
+ .setMaster("local-cluster[2,1,1024]")
+ .set(config.DECOMMISSION_ENABLED, true)
+ .set(config.STORAGE_DECOMMISSION_ENABLED, true)
+ .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+ sc = new SparkContext(conf)
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+ val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
+ var isDecommissionedExecutorRemoved = false
+ val execToDecommission = sc.getExecutorIds().head
+ sc.addSparkListener(new SparkListener {
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
+ if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
+ shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
+ }
+ }
+
+ override def onExecutorRemoved(executorRemoved:
SparkListenerExecutorRemoved): Unit = {
+ assert(execToDecommission === executorRemoved.executorId)
+ isDecommissionedExecutorRemoved = true
+ }
+ })
+
+ // Run a job to create shuffle data
+ val result = sc.parallelize(1 to 1000, 10)
+ .map { i => (i % 2, i) }
+ .reduceByKey(_ + _).collect()
+
+ assert(result.head === (0, 250500))
+ assert(result.tail.head === (1, 250000))
+ sc.schedulerBackend
+ .asInstanceOf[StandaloneSchedulerBackend]
+ .decommissionExecutor(
+ execToDecommission,
+ ExecutorDecommissionInfo("test", None),
+ adjustTargetNumExecutors = true
+ )
+
+ eventually(timeout(1.minute), interval(10.milliseconds)) {
+ assert(isDecommissionedExecutorRemoved)
+ // Ensure there are shuffle data have been migrated
+ assert(shuffleBlockUpdates.size >= 2)
+ }
+
+ val shuffleId = shuffleBlockUpdates
+ .find(_.isInstanceOf[ShuffleIndexBlockId])
+ .map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
+ .get
+
+ val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
+ assert(newShuffleFiles.size >= shuffleBlockUpdates.size)
+
+ // Remove the shuffle data
+ sc.shuffleDriverComponents.removeShuffle(shuffleId, true)
+
+ eventually(timeout(1.minute), interval(10.milliseconds)) {
+ assert(newShuffleFiles.intersect(shuffleFiles).isEmpty)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]