This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 7aa12b6cd01d Revert "[SPARK-46957][CORE] Decommission migrated shuffle 
files should be able to cleanup from executor"
7aa12b6cd01d is described below

commit 7aa12b6cd01da88cbbb3e8c6e50863e6139315b7
Author: Kent Yao <[email protected]>
AuthorDate: Thu Jun 27 19:11:39 2024 +0800

    Revert "[SPARK-46957][CORE] Decommission migrated shuffle files should be 
able to cleanup from executor"
    
    This reverts commit 789ac5b5c3f4ad1df06808c82545ff69b302490d.
---
 .../io/LocalDiskShuffleExecutorComponents.java     |  3 +-
 .../spark/shuffle/IndexShuffleBlockResolver.scala  | 20 +-----
 .../spark/shuffle/sort/SortShuffleManager.scala    |  3 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java     |  6 +-
 .../BlockManagerDecommissionIntegrationSuite.scala | 76 ----------------------
 5 files changed, 5 insertions(+), 103 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
index 861a8e623a6e..eb4d9d9abc8e 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
@@ -56,8 +56,7 @@ public class LocalDiskShuffleExecutorComponents implements 
ShuffleExecutorCompon
     if (blockManager == null) {
       throw new IllegalStateException("No blockManager available from the 
SparkEnv.");
     }
-    blockResolver =
-      new IndexShuffleBlockResolver(sparkConf, blockManager, Map.of() /* 
Shouldn't be accessed */);
+    blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
   }
 
   @Override
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 299f299249b9..919b0f5f7c13 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -21,7 +21,6 @@ import java.io._
 import java.nio.ByteBuffer
 import java.nio.channels.Channels
 import java.nio.file.Files
-import java.util.{Map => JMap}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -38,7 +37,6 @@ import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
 import org.apache.spark.storage._
 import org.apache.spark.util.Utils
-import org.apache.spark.util.collection.OpenHashSet
 
 /**
  * Create and maintain the shuffle blocks' mapping between logic block and 
physical file location.
@@ -54,8 +52,7 @@ import org.apache.spark.util.collection.OpenHashSet
 private[spark] class IndexShuffleBlockResolver(
     conf: SparkConf,
     // var for testing
-    var _blockManager: BlockManager = null,
-    val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = JMap.of())
+    var _blockManager: BlockManager = null)
   extends ShuffleBlockResolver
   with Logging with MigratableResolver {
 
@@ -273,21 +270,6 @@ private[spark] class IndexShuffleBlockResolver(
             throw SparkCoreErrors.failedRenameTempFileError(fileTmp, file)
           }
         }
-        blockId match {
-          case ShuffleIndexBlockId(shuffleId, mapId, _) =>
-            val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
-              shuffleId, _ => new OpenHashSet[Long](8)
-            )
-            mapTaskIds.add(mapId)
-
-          case ShuffleDataBlockId(shuffleId, mapId, _) =>
-            val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
-              shuffleId, _ => new OpenHashSet[Long](8)
-            )
-            mapTaskIds.add(mapId)
-
-          case _ => // Unreachable
-        }
         blockManager.reportBlockStatus(blockId, 
BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
       }
 
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 4234d0ec5fd0..79dff6f87534 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -87,8 +87,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
 
   private lazy val shuffleExecutorComponents = 
loadShuffleExecutorComponents(conf)
 
-  override val shuffleBlockResolver =
-    new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle = 
taskIdMapsForShuffle)
+  override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
 
   /**
    * Obtains a [[ShuffleHandle]] to pass to tasks.
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index ed3a3b887c30..1fa17b908699 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -314,8 +314,7 @@ public class UnsafeShuffleWriterSuite implements 
ShuffleChecksumTestHelper {
 
   @Test
   public void writeChecksumFileWithoutSpill() throws Exception {
-    IndexShuffleBlockResolver blockResolver =
-      new IndexShuffleBlockResolver(conf, blockManager, Map.of());
+    IndexShuffleBlockResolver blockResolver = new 
IndexShuffleBlockResolver(conf, blockManager);
     ShuffleChecksumBlockId checksumBlockId =
       new ShuffleChecksumBlockId(0, 0, 
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
     String checksumAlgorithm = 
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
@@ -345,8 +344,7 @@ public class UnsafeShuffleWriterSuite implements 
ShuffleChecksumTestHelper {
 
   @Test
   public void writeChecksumFileWithSpill() throws Exception {
-    IndexShuffleBlockResolver blockResolver =
-      new IndexShuffleBlockResolver(conf, blockManager, Map.of());
+    IndexShuffleBlockResolver blockResolver = new 
IndexShuffleBlockResolver(conf, blockManager);
     ShuffleChecksumBlockId checksumBlockId =
       new ShuffleChecksumBlockId(0, 0, 
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
     String checksumAlgorithm = 
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 2ba348222f7b..d9d2e6102f12 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.storage
 
-import java.io.File
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
Semaphore, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 
-import org.apache.commons.io.FileUtils
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark._
@@ -354,78 +352,4 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     import scala.language.reflectiveCalls
     assert(listener.removeReasonValidated)
   }
-
-  test("SPARK-46957: Migrated shuffle files should be able to cleanup from 
executor") {
-
-    val sparkTempDir = System.getProperty("java.io.tmpdir")
-
-    def shuffleFiles: Seq[File] = {
-      FileUtils
-        .listFiles(new File(sparkTempDir), Array("data", "index"), true)
-        .asScala
-        .toSeq
-    }
-
-    val existingShuffleFiles = shuffleFiles
-
-    val conf = new SparkConf()
-      .setAppName("SPARK-46957")
-      .setMaster("local-cluster[2,1,1024]")
-      .set(config.DECOMMISSION_ENABLED, true)
-      .set(config.STORAGE_DECOMMISSION_ENABLED, true)
-      .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
-    sc = new SparkContext(conf)
-    TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
-    val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
-    var isDecommissionedExecutorRemoved = false
-    val execToDecommission = sc.getExecutorIds().head
-    sc.addSparkListener(new SparkListener {
-      override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): 
Unit = {
-        if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
-          shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
-        }
-      }
-
-      override def onExecutorRemoved(executorRemoved: 
SparkListenerExecutorRemoved): Unit = {
-        assert(execToDecommission === executorRemoved.executorId)
-        isDecommissionedExecutorRemoved = true
-      }
-    })
-
-    // Run a job to create shuffle data
-    val result = sc.parallelize(1 to 1000, 10)
-      .map { i => (i % 2, i) }
-      .reduceByKey(_ + _).collect()
-
-    assert(result.head === (0, 250500))
-    assert(result.tail.head === (1, 250000))
-    sc.schedulerBackend
-      .asInstanceOf[StandaloneSchedulerBackend]
-      .decommissionExecutor(
-        execToDecommission,
-        ExecutorDecommissionInfo("test", None),
-        adjustTargetNumExecutors = true
-      )
-
-    eventually(timeout(1.minute), interval(10.milliseconds)) {
-      assert(isDecommissionedExecutorRemoved)
-      // Ensure there are shuffle data have been migrated
-      assert(shuffleBlockUpdates.size >= 2)
-    }
-
-    val shuffleId = shuffleBlockUpdates
-      .find(_.isInstanceOf[ShuffleIndexBlockId])
-      .map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
-      .get
-
-    val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
-    assert(newShuffleFiles.size >= shuffleBlockUpdates.size)
-
-    // Remove the shuffle data
-    sc.shuffleDriverComponents.removeShuffle(shuffleId, true)
-
-    eventually(timeout(1.minute), interval(10.milliseconds)) {
-      assert(newShuffleFiles.intersect(shuffleFiles).isEmpty)
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to