This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 5180694705be [SPARK-46957][CORE][3.5][3.4] Decommission migrated 
shuffle files should be able to cleanup from executor
5180694705be is described below

commit 5180694705be3508bd21dd9b863a59b8cb8ba193
Author: Yi Wu <[email protected]>
AuthorDate: Thu Jun 27 23:54:35 2024 +0800

    [SPARK-46957][CORE][3.5][3.4] Decommission migrated shuffle files should be 
able to cleanup from executor
    
    ### What changes were proposed in this pull request?
    
    This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the 
migrated shuffle files on the destination executor.
    
    ### Why are the changes needed?
    
    This is a long-standing bug in decommission where the migrated shuffle 
files can't be cleaned up from the executor. Normally, the shuffle files are 
tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving 
the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those 
shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated 
shuffle files by decommission, they lose the track in the destination 
executor's  `taskIdMapsForShuffle` [...]
    
    Note this bug only affects shuffle removal on the executor. For shuffle 
removal on the external shuffle service (when 
`spark.shuffle.service.removeShuffle` enabled and the executor stores the 
shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the 
specific shuffle block id to locate the shuffle file directly. So it won't be 
an issue there.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. (Common users won't see the difference underlying.)
    
    ### How was this patch tested?
    
    Add unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47122 from Ngone51/SPARK-46957-3.5.
    
    Authored-by: Yi Wu <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
    (cherry picked from commit b28ddb176fd87aa0bab4afe5e0db4fc4c3ec9c59)
    Signed-off-by: Kent Yao <[email protected]>
---
 .../io/LocalDiskShuffleExecutorComponents.java     |  6 +-
 .../spark/shuffle/IndexShuffleBlockResolver.scala  | 20 +++++-
 .../spark/shuffle/sort/SortShuffleManager.scala    |  3 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java     |  6 +-
 .../BlockManagerDecommissionIntegrationSuite.scala | 76 ++++++++++++++++++++++
 5 files changed, 106 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
index eb4d9d9abc8e..38f0a60f8b0d 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle.sort.io;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 
@@ -56,7 +57,10 @@ public class LocalDiskShuffleExecutorComponents implements 
ShuffleExecutorCompon
     if (blockManager == null) {
       throw new IllegalStateException("No blockManager available from the 
SparkEnv.");
     }
-    blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
+    blockResolver =
+      new IndexShuffleBlockResolver(
+        sparkConf, blockManager, Collections.emptyMap() /* Shouldn't be 
accessed */
+      );
   }
 
   @Override
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index d41321b4597b..ec1717f13775 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.nio.ByteBuffer
 import java.nio.channels.Channels
 import java.nio.file.Files
+import java.util.{Collections, Map => JMap}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -36,6 +37,7 @@ import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
 import org.apache.spark.storage._
 import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.OpenHashSet
 
 /**
  * Create and maintain the shuffle blocks' mapping between logic block and 
physical file location.
@@ -51,7 +53,8 @@ import org.apache.spark.util.Utils
 private[spark] class IndexShuffleBlockResolver(
     conf: SparkConf,
     // var for testing
-    var _blockManager: BlockManager = null)
+    var _blockManager: BlockManager = null,
+    val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = 
Collections.emptyMap())
   extends ShuffleBlockResolver
   with Logging with MigratableResolver {
 
@@ -268,6 +271,21 @@ private[spark] class IndexShuffleBlockResolver(
             throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
           }
         }
+        blockId match {
+          case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+            val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
+              shuffleId, _ => new OpenHashSet[Long](8)
+            )
+            mapTaskIds.add(mapId)
+
+          case ShuffleDataBlockId(shuffleId, mapId, _) =>
+            val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
+              shuffleId, _ => new OpenHashSet[Long](8)
+            )
+            mapTaskIds.add(mapId)
+
+          case _ => // Unreachable
+        }
         blockManager.reportBlockStatus(blockId, 
BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
       }
 
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 79dff6f87534..4234d0ec5fd0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -87,7 +87,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
 
   private lazy val shuffleExecutorComponents = 
loadShuffleExecutorComponents(conf)
 
-  override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
+  override val shuffleBlockResolver =
+    new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle = 
taskIdMapsForShuffle)
 
   /**
    * Obtains a [[ShuffleHandle]] to pass to tasks.
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 1fa17b908699..472d03baeae0 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -314,7 +314,8 @@ public class UnsafeShuffleWriterSuite implements 
ShuffleChecksumTestHelper {
 
   @Test
   public void writeChecksumFileWithoutSpill() throws Exception {
-    IndexShuffleBlockResolver blockResolver = new 
IndexShuffleBlockResolver(conf, blockManager);
+    IndexShuffleBlockResolver blockResolver =
+      new IndexShuffleBlockResolver(conf, blockManager, 
Collections.emptyMap());
     ShuffleChecksumBlockId checksumBlockId =
       new ShuffleChecksumBlockId(0, 0, 
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
     String checksumAlgorithm = 
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
@@ -344,7 +345,8 @@ public class UnsafeShuffleWriterSuite implements 
ShuffleChecksumTestHelper {
 
   @Test
   public void writeChecksumFileWithSpill() throws Exception {
-    IndexShuffleBlockResolver blockResolver = new 
IndexShuffleBlockResolver(conf, blockManager);
+    IndexShuffleBlockResolver blockResolver =
+      new IndexShuffleBlockResolver(conf, blockManager, 
Collections.emptyMap());
     ShuffleChecksumBlockId checksumBlockId =
       new ShuffleChecksumBlockId(0, 0, 
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
     String checksumAlgorithm = 
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index d9d2e6102f12..2ba348222f7b 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.storage
 
+import java.io.File
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
Semaphore, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 
+import org.apache.commons.io.FileUtils
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark._
@@ -352,4 +354,78 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     import scala.language.reflectiveCalls
     assert(listener.removeReasonValidated)
   }
+
+  test("SPARK-46957: Migrated shuffle files should be able to cleanup from 
executor") {
+
+    val sparkTempDir = System.getProperty("java.io.tmpdir")
+
+    def shuffleFiles: Seq[File] = {
+      FileUtils
+        .listFiles(new File(sparkTempDir), Array("data", "index"), true)
+        .asScala
+        .toSeq
+    }
+
+    val existingShuffleFiles = shuffleFiles
+
+    val conf = new SparkConf()
+      .setAppName("SPARK-46957")
+      .setMaster("local-cluster[2,1,1024]")
+      .set(config.DECOMMISSION_ENABLED, true)
+      .set(config.STORAGE_DECOMMISSION_ENABLED, true)
+      .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+    sc = new SparkContext(conf)
+    TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+    val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
+    var isDecommissionedExecutorRemoved = false
+    val execToDecommission = sc.getExecutorIds().head
+    sc.addSparkListener(new SparkListener {
+      override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): 
Unit = {
+        if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
+          shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
+        }
+      }
+
+      override def onExecutorRemoved(executorRemoved: 
SparkListenerExecutorRemoved): Unit = {
+        assert(execToDecommission === executorRemoved.executorId)
+        isDecommissionedExecutorRemoved = true
+      }
+    })
+
+    // Run a job to create shuffle data
+    val result = sc.parallelize(1 to 1000, 10)
+      .map { i => (i % 2, i) }
+      .reduceByKey(_ + _).collect()
+
+    assert(result.head === (0, 250500))
+    assert(result.tail.head === (1, 250000))
+    sc.schedulerBackend
+      .asInstanceOf[StandaloneSchedulerBackend]
+      .decommissionExecutor(
+        execToDecommission,
+        ExecutorDecommissionInfo("test", None),
+        adjustTargetNumExecutors = true
+      )
+
+    eventually(timeout(1.minute), interval(10.milliseconds)) {
+      assert(isDecommissionedExecutorRemoved)
+      // Ensure there are shuffle data have been migrated
+      assert(shuffleBlockUpdates.size >= 2)
+    }
+
+    val shuffleId = shuffleBlockUpdates
+      .find(_.isInstanceOf[ShuffleIndexBlockId])
+      .map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
+      .get
+
+    val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
+    assert(newShuffleFiles.size >= shuffleBlockUpdates.size)
+
+    // Remove the shuffle data
+    sc.shuffleDriverComponents.removeShuffle(shuffleId, true)
+
+    eventually(timeout(1.minute), interval(10.milliseconds)) {
+      assert(newShuffleFiles.intersect(shuffleFiles).isEmpty)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to