This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new dce7c9aa94ea [SPARK-43242][CORE][3.5] Fix throw 'Unexpected type of 
BlockId' in shuffle corruption diagnose
dce7c9aa94ea is described below

commit dce7c9aa94eadbeafb7bb1fc770991d5ec8d0b84
Author: zhangliang <[email protected]>
AuthorDate: Fri Aug 30 11:09:46 2024 +0800

    [SPARK-43242][CORE][3.5] Fix throw 'Unexpected type of BlockId' in shuffle 
corruption diagnose
    
    #### What changes were proposed in this pull request?
    port to 3.5 for 
[[SPARK-43242](https://issues.apache.org/jira/browse/SPARK-43242)][CORE] Fix 
throw 'Unexpected type of BlockId' in shuffle corruption diagnose
    
    #### Why are the changes needed?
    3.5 conflict with PR in master, see end of discussion 
https://github.com/apache/spark/pull/40921
    
    #### Does this PR introduce any user-facing change?
    No
    
    #### How was this patch tested?
    Existing tests
    
    Closes #47910 from CavemanIV/port3.5-SPARK-43242.
    
    Authored-by: zhangliang <[email protected]>
    Signed-off-by: Yi Wu <[email protected]>
---
 .../storage/ShuffleBlockFetcherIterator.scala      |  6 +++++
 .../storage/ShuffleBlockFetcherIteratorSuite.scala | 27 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b9365f45a11a..17407f4ee21f 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1142,6 +1142,12 @@ final class ShuffleBlockFetcherIterator(
           s"diagnosis is skipped due to lack of shuffle checksum support for 
push-based shuffle."
         logWarning(diagnosisResponse)
         diagnosisResponse
+      case shuffleBlockBatch: ShuffleBlockBatchId =>
+          val diagnosisResponse = s"BlockBatch $shuffleBlockBatch is corrupted 
" +
+          s"but corruption diagnosis is skipped due to lack of shuffle 
checksum support for " +
+          s"ShuffleBlockBatchId"
+        logWarning(diagnosisResponse)
+        diagnosisResponse
       case unexpected: BlockId =>
         throw SparkException.internalError(
           s"Unexpected type of BlockId, $unexpected", category = "STORAGE")
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index f2d5f27a66cc..a9902cb4ccb4 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1941,4 +1941,31 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 
     assert(err2.getMessage.contains("corrupt at reset"))
   }
+
+  test("SPARK-43242: Fix throw 'Unexpected type of BlockId' in shuffle 
corruption diagnose") {
+    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+    val blocks = Map[BlockId, ManagedBuffer](
+      ShuffleBlockBatchId(0, 0, 0, 3) -> createMockManagedBuffer())
+    answerFetchBlocks { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      listener.onBlockFetchSuccess(ShuffleBlockBatchId(0, 0, 0, 3).toString, 
mockCorruptBuffer())
+    }
+
+    val logAppender = new LogAppender("diagnose corruption")
+    withLogAppender(logAppender) {
+      val iterator = createShuffleBlockIteratorWithDefaults(
+        Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
+        streamWrapperLimitSize = Some(100)
+      )
+      intercept[FetchFailedException](iterator.next())
+      verify(transfer, times(2))
+        .fetchBlocks(any(), any(), any(), any(), any(), any())
+      assert(logAppender.loggingEvents.count(
+        _.getMessage.getFormattedMessage.contains("Start corruption 
diagnosis")) === 1)
+      assert(logAppender.loggingEvents.exists(
+        _.getMessage.getFormattedMessage.contains("shuffle_0_0_0_3 is 
corrupted " +
+          "but corruption diagnosis is skipped due to lack of " +
+          "shuffle checksum support for ShuffleBlockBatchId")))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to