This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fc5342314c8d [SPARK-44126][CORE] Shuffle migration failure count
should not increase when target executor decommissioned
fc5342314c8d is described below
commit fc5342314c8d0890cf97a808bcf5fdf3720a5864
Author: Warren Zhu <[email protected]>
AuthorDate: Tue Sep 26 10:52:40 2023 +0800
[SPARK-44126][CORE] Shuffle migration failure count should not increase
when target executor decommissioned
### What changes were proposed in this pull request?
Do not increase shuffle migration failure count when target executor
decommissioned
### Why are the changes needed?
Block manager decommissioner only sync with block manager master about live
peers every `spark.storage.cachedPeersTtl`(default 60s). If some block manager
decommissioned between this, it still try to migrated shuffle to such
decommissioned block manger. The migration will be failed with
RuntimeException("BlockSavedOnDecommissionedBlockManagerException"). Detailed
stack trace as below:
```
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:122)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$5(BlockManagerDecommissioner.scala:127)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$5$adapted(BlockManagerDecommissioner.scala:118)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:118)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException:
org.apache.spark.storage.BlockSavedOnDecommissionedBlockManagerException: Block
shuffle_2_6429_0.data cannot be saved on decommissioned executor
at
org.apache.spark.errors.SparkCoreErrors$.cannotSaveBlockOnDecommissionedExecutorError(SparkCoreErrors.scala:238)
at
org.apache.spark.storage.BlockManager.checkShouldStore(BlockManager.scala:277)
at
org.apache.spark.storage.BlockManager.putBlockDataAsStream(BlockManager.scala:741)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receiveStream(NettyBlockRpcServer.scala:174)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT in `BlockManagerDecommissionUnitSuite`
Closes #41905 from warrenzhu25/migrate-decom.
Authored-by: Warren Zhu <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
---
.../spark/storage/BlockManagerDecommissioner.scala | 16 ++++++++-
.../BlockManagerDecommissionUnitSuite.scala | 40 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 59d1f3b4c4ba..cbac3fd1a994 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -43,6 +43,8 @@ private[storage] class BlockManagerDecommissioner(
private val fallbackStorage = FallbackStorage.getFallbackStorage(conf)
private val maxReplicationFailuresForDecommission =
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+ private val blockSavedOnDecommissionedBlockManagerException =
+ classOf[BlockSavedOnDecommissionedBlockManagerException].getSimpleName
// Used for tracking if our migrations are complete. Readable for testing
@volatile private[storage] var lastRDDMigrationTime: Long = 0
@@ -101,6 +103,7 @@ private[storage] class BlockManagerDecommissioner(
try {
val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate()
val blocks =
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
+ var isTargetDecommissioned = false
// We only migrate a shuffle block when both index file and data
file exist.
if (blocks.isEmpty) {
logInfo(s"Ignore deleted shuffle block $shuffleBlockInfo")
@@ -143,6 +146,11 @@ private[storage] class BlockManagerDecommissioner(
// have been used in the try-block above so there's no
point trying again
&& peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+ } else if (e.getCause != null && e.getCause.getMessage != null
+ && e.getCause.getMessage
+ .contains(blockSavedOnDecommissionedBlockManagerException)) {
+ isTargetDecommissioned = true
+ keepRunning = false
} else {
logError(s"Error occurred during migrating
$shuffleBlockInfo", e)
keepRunning = false
@@ -156,8 +164,14 @@ private[storage] class BlockManagerDecommissioner(
numMigratedShuffles.incrementAndGet()
} else {
logWarning(s"Stop migrating shuffle blocks to $peer")
+
+ val newRetryCount = if (isTargetDecommissioned) {
+ retryCount
+ } else {
+ retryCount + 1
+ }
// Do not mark the block as migrated if it still needs retry
- if (!allowRetry(shuffleBlockInfo, retryCount + 1)) {
+ if (!allowRetry(shuffleBlockInfo, newRetryCount)) {
numMigratedShuffles.incrementAndGet()
}
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index 67a4514d5bda..b7ad6722faa8 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -231,6 +231,46 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
numShuffles = Option(1))
}
+ test("SPARK-44126: block decom manager handles
BlockSavedOnDecommissionedBlockManagerException") {
+ // Set up the mocks so we return one shuffle block
+ val conf = sparkConf
+ .clone
+ .set(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK, 1)
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ val exe1 = BlockManagerId("exec1", "host1", 12345)
+ val exe2 = BlockManagerId("exec2", "host2", 12345)
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(exe1), Seq(exe1), Seq(exe2))
+
+ val blockTransferService = mock(classOf[BlockTransferService])
+ // Simulate BlockSavedOnDecommissionedBlockManagerException
+ when(blockTransferService.uploadBlock(
+ mc.any(), mc.any(), mc.eq(exe1.executorId), mc.any(), mc.any(),
mc.any(), mc.isNull()))
+ .thenReturn(
+ Future.failed(new
RuntimeException("BlockSavedOnDecommissionedBlockManagerException"))
+ )
+ when(blockTransferService.uploadBlockSync(
+ mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull()))
+ .thenCallRealMethod()
+
+ when(bm.blockTransferService).thenReturn(blockTransferService)
+
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ validateDecommissionTimestampsOnManager(bmDecomManager)
+ verify(blockTransferService, times(1))
+ .uploadBlock(mc.any(), mc.any(), mc.eq(exe1.executorId),
+ mc.any(), mc.any(), mc.any(), mc.isNull())
+ verify(blockTransferService, times(1))
+ .uploadBlock(mc.any(), mc.any(), mc.eq(exe2.executorId),
+ mc.any(), mc.any(), mc.any(), mc.isNull())
+ }
+
test("block decom manager handles IO failures") {
// Set up the mocks so we return one shuffle block
val bm = mock(classOf[BlockManager])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]