This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b6b11b514c4 [SPARK-40168][CORE] Handle `SparkException` during shuffle block migration b6b11b514c4 is described below commit b6b11b514c46189594a89b2b5607a5016c84d97f Author: Warren Zhu <warren.zh...@gmail.com> AuthorDate: Fri Sep 9 14:22:14 2022 -0700 [SPARK-40168][CORE] Handle `SparkException` during shuffle block migration ### What changes were proposed in this pull request? Explicitly handle FileNotFoundException wrapped inside SparkException, then mark this block as deleted, further avoid retry of this block and stop of current migration thread ### Why are the changes needed? FileNotFoundException wrapped inside SparkException is not handled correctly, causing unnecessary retry and stop of current migration thread ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added test in BlockManagerDecommissionUnitSuite Closes #37603 from warrenzhu25/deco-error. Authored-by: Warren Zhu <warren.zh...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/storage/BlockManagerDecommissioner.scala | 2 +- .../BlockManagerDecommissionUnitSuite.scala | 45 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 82450dd2651..6e3cf9c9b41 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -127,7 +127,7 @@ private[storage] class BlockManagerDecommissioner( } logInfo(s"Migrated $shuffleBlockInfo to $peer") } catch { - case e: IOException => + case e @ ( _ : IOException | _ : SparkException) => // If a block got deleted before netty opened the file handle, then trying to // load the blocks now will fail. This is most likely to occur if we start // migrating blocks and then the shuffle TTL cleaner kicks in. However this diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index b7ac378b4c6..df4f256afb6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.storage +import java.io.FileNotFoundException + +import scala.concurrent.Future import scala.concurrent.duration._ import org.mockito.{ArgumentMatchers => mc} @@ -186,6 +189,48 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { validateDecommissionTimestampsOnManager(bmDecomManager, fail = false, assertDone = false) } + test("SPARK-40168: block decom manager handles shuffle file not found") { + // Set up the mocks so we return one shuffle block + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + // First call get blocks, then empty list simulating a delete. + when(migratableShuffleBlockResolver.getStoredShuffles()) + .thenReturn(Seq(ShuffleBlockInfo(1, 1))) + .thenReturn(Seq()) + when(migratableShuffleBlockResolver.getMigrationBlocks(mc.any())) + .thenReturn( + List( + (ShuffleIndexBlockId(1, 1, 1), mock(classOf[ManagedBuffer])), + (ShuffleDataBlockId(1, 1, 1), mock(classOf[ManagedBuffer])))) + .thenReturn(List()) + + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + val blockTransferService = mock(classOf[BlockTransferService]) + // Simulate FileNotFoundException wrap inside SparkException + when( + blockTransferService + .uploadBlock(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull())) + .thenReturn(Future.failed( + new java.io.IOException("boop", new FileNotFoundException("file not found")))) + when( + blockTransferService + .uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull())) + .thenCallRealMethod() + + when(bm.blockTransferService).thenReturn(blockTransferService) + + // Verify the decom manager handles this correctly + val bmDecomManager = new BlockManagerDecommissioner(sparkConf, bm) + validateDecommissionTimestampsOnManager( + bmDecomManager, + numShuffles = Option(1)) + } + test("block decom manager handles IO failures") { // Set up the mocks so we return one shuffle block val bm = mock(classOf[BlockManager]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org