This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new cec1252b0a22 [SPARK-43242][CORE][3.4] Fix throw 'Unexpected type of
BlockId' in shuffle corruption diagnose
cec1252b0a22 is described below
commit cec1252b0a2234cf64759a27758a9234de091828
Author: zhangliang <[email protected]>
AuthorDate: Fri Aug 30 11:10:56 2024 +0800
[SPARK-43242][CORE][3.4] Fix throw 'Unexpected type of BlockId' in shuffle
corruption diagnose
### What changes were proposed in this pull request?
port to 3.4 for [SPARK-43242][CORE] Fix throw 'Unexpected type of BlockId'
in shuffle corruption diagnose
### Why are the changes needed?
3.4 conflict with master, see #40921
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing tests
Closes #47909 from CavemanIV/port3.4-SPARK-43242.
Authored-by: zhangliang <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
---
.../storage/ShuffleBlockFetcherIterator.scala | 6 +++++
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 27 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e36ebc02405f..7bcf41a73e39 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1142,6 +1142,12 @@ final class ShuffleBlockFetcherIterator(
s"diagnosis is skipped due to lack of shuffle checksum support for
push-based shuffle."
logWarning(diagnosisResponse)
diagnosisResponse
+ case shuffleBlockBatch: ShuffleBlockBatchId =>
+ val diagnosisResponse = s"BlockBatch $shuffleBlockBatch is corrupted
" +
+ s"but corruption diagnosis is skipped due to lack of shuffle
checksum support for " +
+ s"ShuffleBlockBatchId"
+ logWarning(diagnosisResponse)
+ diagnosisResponse
case unexpected: BlockId =>
throw new IllegalArgumentException(s"Unexpected type of BlockId,
$unexpected")
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index e3c47c12b8f2..138490836244 100644
---
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1943,4 +1943,31 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
assert(err2.getMessage.contains("corrupt at reset"))
}
+
+ test("SPARK-43242: Fix throw 'Unexpected type of BlockId' in shuffle
corruption diagnose") {
+ val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+ val blocks = Map[BlockId, ManagedBuffer](
+ ShuffleBlockBatchId(0, 0, 0, 3) -> createMockManagedBuffer())
+ answerFetchBlocks { invocation =>
+ val listener = invocation.getArgument[BlockFetchingListener](4)
+ listener.onBlockFetchSuccess(ShuffleBlockBatchId(0, 0, 0, 3).toString,
mockCorruptBuffer())
+ }
+
+ val logAppender = new LogAppender("diagnose corruption")
+ withLogAppender(logAppender) {
+ val iterator = createShuffleBlockIteratorWithDefaults(
+ Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
+ streamWrapperLimitSize = Some(100)
+ )
+ intercept[FetchFailedException](iterator.next())
+ verify(transfer, times(2))
+ .fetchBlocks(any(), any(), any(), any(), any(), any())
+ assert(logAppender.loggingEvents.count(
+ _.getMessage.getFormattedMessage.contains("Start corruption
diagnosis")) === 1)
+ assert(logAppender.loggingEvents.exists(
+ _.getMessage.getFormattedMessage.contains("shuffle_0_0_0_3 is
corrupted " +
+ "but corruption diagnosis is skipped due to lack of " +
+ "shuffle checksum support for ShuffleBlockBatchId")))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]