This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b6b11b514c4 [SPARK-40168][CORE] Handle `SparkException` during shuffle
block migration
b6b11b514c4 is described below
commit b6b11b514c46189594a89b2b5607a5016c84d97f
Author: Warren Zhu <[email protected]>
AuthorDate: Fri Sep 9 14:22:14 2022 -0700
[SPARK-40168][CORE] Handle `SparkException` during shuffle block migration
### What changes were proposed in this pull request?
Explicitly handle FileNotFoundException wrapped inside SparkException, then
mark this block as deleted, further avoid retry of this block and stop of
current migration thread
### Why are the changes needed?
FileNotFoundException wrapped inside SparkException is not handled
correctly, causing unnecessary retry and stop of current migration thread
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added test in BlockManagerDecommissionUnitSuite
Closes #37603 from warrenzhu25/deco-error.
Authored-by: Warren Zhu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/storage/BlockManagerDecommissioner.scala | 2 +-
.../BlockManagerDecommissionUnitSuite.scala | 45 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 82450dd2651..6e3cf9c9b41 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -127,7 +127,7 @@ private[storage] class BlockManagerDecommissioner(
}
logInfo(s"Migrated $shuffleBlockInfo to $peer")
} catch {
- case e: IOException =>
+ case e @ ( _ : IOException | _ : SparkException) =>
// If a block got deleted before netty opened the file handle,
then trying to
// load the blocks now will fail. This is most likely to occur
if we start
// migrating blocks and then the shuffle TTL cleaner kicks in.
However this
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index b7ac378b4c6..df4f256afb6 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.storage
+import java.io.FileNotFoundException
+
+import scala.concurrent.Future
import scala.concurrent.duration._
import org.mockito.{ArgumentMatchers => mc}
@@ -186,6 +189,48 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
validateDecommissionTimestampsOnManager(bmDecomManager, fail = false,
assertDone = false)
}
+ test("SPARK-40168: block decom manager handles shuffle file not found") {
+ // Set up the mocks so we return one shuffle block
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ // First call get blocks, then empty list simulating a delete.
+ when(migratableShuffleBlockResolver.getStoredShuffles())
+ .thenReturn(Seq(ShuffleBlockInfo(1, 1)))
+ .thenReturn(Seq())
+ when(migratableShuffleBlockResolver.getMigrationBlocks(mc.any()))
+ .thenReturn(
+ List(
+ (ShuffleIndexBlockId(1, 1, 1), mock(classOf[ManagedBuffer])),
+ (ShuffleDataBlockId(1, 1, 1), mock(classOf[ManagedBuffer]))))
+ .thenReturn(List())
+
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+ val blockTransferService = mock(classOf[BlockTransferService])
+ // Simulate FileNotFoundException wrap inside SparkException
+ when(
+ blockTransferService
+ .uploadBlock(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),
mc.any(), mc.isNull()))
+ .thenReturn(Future.failed(
+ new java.io.IOException("boop", new FileNotFoundException("file not
found"))))
+ when(
+ blockTransferService
+ .uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),
mc.any(), mc.isNull()))
+ .thenCallRealMethod()
+
+ when(bm.blockTransferService).thenReturn(blockTransferService)
+
+ // Verify the decom manager handles this correctly
+ val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm)
+ validateDecommissionTimestampsOnManager(
+ bmDecomManager,
+ numShuffles = Option(1))
+ }
+
test("block decom manager handles IO failures") {
// Set up the mocks so we return one shuffle block
val bm = mock(classOf[BlockManager])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]