This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new f8b1040ea006 Revert "[SPARK-46957][CORE] Decommission migrated shuffle
files should be able to cleanup from executor"
f8b1040ea006 is described below
commit f8b1040ea006fe48df6bb52e0ace4dce54ab6d56
Author: Kent Yao <[email protected]>
AuthorDate: Thu Jun 27 19:13:12 2024 +0800
Revert "[SPARK-46957][CORE] Decommission migrated shuffle files should be
able to cleanup from executor"
This reverts commit dfd3c64db91f18df92777b368fbcda0a87b35e91.
---
.../io/LocalDiskShuffleExecutorComponents.java | 3 +-
.../spark/shuffle/IndexShuffleBlockResolver.scala | 20 +-----
.../spark/shuffle/sort/SortShuffleManager.scala | 3 +-
.../shuffle/sort/UnsafeShuffleWriterSuite.java | 6 +-
.../BlockManagerDecommissionIntegrationSuite.scala | 76 ----------------------
5 files changed, 5 insertions(+), 103 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
index 861a8e623a6e..eb4d9d9abc8e 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
@@ -56,8 +56,7 @@ public class LocalDiskShuffleExecutorComponents implements
ShuffleExecutorCompon
if (blockManager == null) {
throw new IllegalStateException("No blockManager available from the
SparkEnv.");
}
- blockResolver =
- new IndexShuffleBlockResolver(sparkConf, blockManager, Map.of() /*
Shouldn't be accessed */);
+ blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
}
@Override
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index b36680f0a8ca..d41321b4597b 100644
---
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -21,7 +21,6 @@ import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
-import java.util.{Map => JMap}
import scala.collection.mutable.ArrayBuffer
@@ -37,7 +36,6 @@ import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils
-import org.apache.spark.util.collection.OpenHashSet
/**
* Create and maintain the shuffle blocks' mapping between logic block and
physical file location.
@@ -53,8 +51,7 @@ import org.apache.spark.util.collection.OpenHashSet
private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
// var for testing
- var _blockManager: BlockManager = null,
- val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = JMap.of())
+ var _blockManager: BlockManager = null)
extends ShuffleBlockResolver
with Logging with MigratableResolver {
@@ -271,21 +268,6 @@ private[spark] class IndexShuffleBlockResolver(
throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
}
}
- blockId match {
- case ShuffleIndexBlockId(shuffleId, mapId, _) =>
- val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
- shuffleId, _ => new OpenHashSet[Long](8)
- )
- mapTaskIds.add(mapId)
-
- case ShuffleDataBlockId(shuffleId, mapId, _) =>
- val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
- shuffleId, _ => new OpenHashSet[Long](8)
- )
- mapTaskIds.add(mapId)
-
- case _ => // Unreachable
- }
blockManager.reportBlockStatus(blockId,
BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
}
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 4234d0ec5fd0..79dff6f87534 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -87,8 +87,7 @@ private[spark] class SortShuffleManager(conf: SparkConf)
extends ShuffleManager
private lazy val shuffleExecutorComponents =
loadShuffleExecutorComponents(conf)
- override val shuffleBlockResolver =
- new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle =
taskIdMapsForShuffle)
+ override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
diff --git
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index ed3a3b887c30..1fa17b908699 100644
---
a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++
b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -314,8 +314,7 @@ public class UnsafeShuffleWriterSuite implements
ShuffleChecksumTestHelper {
@Test
public void writeChecksumFileWithoutSpill() throws Exception {
- IndexShuffleBlockResolver blockResolver =
- new IndexShuffleBlockResolver(conf, blockManager, Map.of());
+ IndexShuffleBlockResolver blockResolver = new
IndexShuffleBlockResolver(conf, blockManager);
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0,
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm =
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
@@ -345,8 +344,7 @@ public class UnsafeShuffleWriterSuite implements
ShuffleChecksumTestHelper {
@Test
public void writeChecksumFileWithSpill() throws Exception {
- IndexShuffleBlockResolver blockResolver =
- new IndexShuffleBlockResolver(conf, blockManager, Map.of());
+ IndexShuffleBlockResolver blockResolver = new
IndexShuffleBlockResolver(conf, blockManager);
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0,
IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm =
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 2ba348222f7b..d9d2e6102f12 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -17,14 +17,12 @@
package org.apache.spark.storage
-import java.io.File
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
Semaphore, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import org.apache.commons.io.FileUtils
import org.scalatest.concurrent.Eventually
import org.apache.spark._
@@ -354,78 +352,4 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
import scala.language.reflectiveCalls
assert(listener.removeReasonValidated)
}
-
- test("SPARK-46957: Migrated shuffle files should be able to cleanup from
executor") {
-
- val sparkTempDir = System.getProperty("java.io.tmpdir")
-
- def shuffleFiles: Seq[File] = {
- FileUtils
- .listFiles(new File(sparkTempDir), Array("data", "index"), true)
- .asScala
- .toSeq
- }
-
- val existingShuffleFiles = shuffleFiles
-
- val conf = new SparkConf()
- .setAppName("SPARK-46957")
- .setMaster("local-cluster[2,1,1024]")
- .set(config.DECOMMISSION_ENABLED, true)
- .set(config.STORAGE_DECOMMISSION_ENABLED, true)
- .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
- sc = new SparkContext(conf)
- TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
- val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
- var isDecommissionedExecutorRemoved = false
- val execToDecommission = sc.getExecutorIds().head
- sc.addSparkListener(new SparkListener {
- override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated):
Unit = {
- if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
- shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
- }
- }
-
- override def onExecutorRemoved(executorRemoved:
SparkListenerExecutorRemoved): Unit = {
- assert(execToDecommission === executorRemoved.executorId)
- isDecommissionedExecutorRemoved = true
- }
- })
-
- // Run a job to create shuffle data
- val result = sc.parallelize(1 to 1000, 10)
- .map { i => (i % 2, i) }
- .reduceByKey(_ + _).collect()
-
- assert(result.head === (0, 250500))
- assert(result.tail.head === (1, 250000))
- sc.schedulerBackend
- .asInstanceOf[StandaloneSchedulerBackend]
- .decommissionExecutor(
- execToDecommission,
- ExecutorDecommissionInfo("test", None),
- adjustTargetNumExecutors = true
- )
-
- eventually(timeout(1.minute), interval(10.milliseconds)) {
- assert(isDecommissionedExecutorRemoved)
- // Ensure there are shuffle data have been migrated
- assert(shuffleBlockUpdates.size >= 2)
- }
-
- val shuffleId = shuffleBlockUpdates
- .find(_.isInstanceOf[ShuffleIndexBlockId])
- .map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
- .get
-
- val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
- assert(newShuffleFiles.size >= shuffleBlockUpdates.size)
-
- // Remove the shuffle data
- sc.shuffleDriverComponents.removeShuffle(shuffleId, true)
-
- eventually(timeout(1.minute), interval(10.milliseconds)) {
- assert(newShuffleFiles.intersect(shuffleFiles).isEmpty)
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]