This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new dce7c9aa94ea [SPARK-43242][CORE][3.5] Fix throw 'Unexpected type of
BlockId' in shuffle corruption diagnose
dce7c9aa94ea is described below
commit dce7c9aa94eadbeafb7bb1fc770991d5ec8d0b84
Author: zhangliang <[email protected]>
AuthorDate: Fri Aug 30 11:09:46 2024 +0800
[SPARK-43242][CORE][3.5] Fix throw 'Unexpected type of BlockId' in shuffle
corruption diagnose
#### What changes were proposed in this pull request?
port to 3.5 for
[[SPARK-43242](https://issues.apache.org/jira/browse/SPARK-43242)][CORE] Fix
throw 'Unexpected type of BlockId' in shuffle corruption diagnose
#### Why are the changes needed?
3.5 conflict with PR in master, see end of discussion
https://github.com/apache/spark/pull/40921
#### Does this PR introduce any user-facing change?
No
#### How was this patch tested?
Existing tests
Closes #47910 from CavemanIV/port3.5-SPARK-43242.
Authored-by: zhangliang <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
---
.../storage/ShuffleBlockFetcherIterator.scala | 6 +++++
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 27 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b9365f45a11a..17407f4ee21f 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1142,6 +1142,12 @@ final class ShuffleBlockFetcherIterator(
s"diagnosis is skipped due to lack of shuffle checksum support for
push-based shuffle."
logWarning(diagnosisResponse)
diagnosisResponse
+ case shuffleBlockBatch: ShuffleBlockBatchId =>
+ val diagnosisResponse = s"BlockBatch $shuffleBlockBatch is corrupted
" +
+ s"but corruption diagnosis is skipped due to lack of shuffle
checksum support for " +
+ s"ShuffleBlockBatchId"
+ logWarning(diagnosisResponse)
+ diagnosisResponse
case unexpected: BlockId =>
throw SparkException.internalError(
s"Unexpected type of BlockId, $unexpected", category = "STORAGE")
diff --git
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index f2d5f27a66cc..a9902cb4ccb4 100644
---
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1941,4 +1941,31 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
assert(err2.getMessage.contains("corrupt at reset"))
}
+
+ test("SPARK-43242: Fix throw 'Unexpected type of BlockId' in shuffle
corruption diagnose") {
+ val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+ val blocks = Map[BlockId, ManagedBuffer](
+ ShuffleBlockBatchId(0, 0, 0, 3) -> createMockManagedBuffer())
+ answerFetchBlocks { invocation =>
+ val listener = invocation.getArgument[BlockFetchingListener](4)
+ listener.onBlockFetchSuccess(ShuffleBlockBatchId(0, 0, 0, 3).toString,
mockCorruptBuffer())
+ }
+
+ val logAppender = new LogAppender("diagnose corruption")
+ withLogAppender(logAppender) {
+ val iterator = createShuffleBlockIteratorWithDefaults(
+ Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
+ streamWrapperLimitSize = Some(100)
+ )
+ intercept[FetchFailedException](iterator.next())
+ verify(transfer, times(2))
+ .fetchBlocks(any(), any(), any(), any(), any(), any())
+ assert(logAppender.loggingEvents.count(
+ _.getMessage.getFormattedMessage.contains("Start corruption
diagnosis")) === 1)
+ assert(logAppender.loggingEvents.exists(
+ _.getMessage.getFormattedMessage.contains("shuffle_0_0_0_3 is
corrupted " +
+ "but corruption diagnosis is skipped due to lack of " +
+ "shuffle checksum support for ShuffleBlockBatchId")))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]