This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new cec1252b0a22 [SPARK-43242][CORE][3.4] Fix throw 'Unexpected type of 
BlockId' in shuffle corruption diagnose
cec1252b0a22 is described below

commit cec1252b0a2234cf64759a27758a9234de091828
Author: zhangliang <[email protected]>
AuthorDate: Fri Aug 30 11:10:56 2024 +0800

    [SPARK-43242][CORE][3.4] Fix throw 'Unexpected type of BlockId' in shuffle 
corruption diagnose
    
    ### What changes were proposed in this pull request?
    port to 3.4 for [SPARK-43242][CORE] Fix throw 'Unexpected type of BlockId' 
in shuffle corruption diagnose
    
    ### Why are the changes needed?
    3.4 conflict with master, see #40921
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    Closes #47909 from CavemanIV/port3.4-SPARK-43242.
    
    Authored-by: zhangliang <[email protected]>
    Signed-off-by: Yi Wu <[email protected]>
---
 .../storage/ShuffleBlockFetcherIterator.scala      |  6 +++++
 .../storage/ShuffleBlockFetcherIteratorSuite.scala | 27 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e36ebc02405f..7bcf41a73e39 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1142,6 +1142,12 @@ final class ShuffleBlockFetcherIterator(
           s"diagnosis is skipped due to lack of shuffle checksum support for 
push-based shuffle."
         logWarning(diagnosisResponse)
         diagnosisResponse
+      case shuffleBlockBatch: ShuffleBlockBatchId =>
+          val diagnosisResponse = s"BlockBatch $shuffleBlockBatch is corrupted 
" +
+          s"but corruption diagnosis is skipped due to lack of shuffle 
checksum support for " +
+          s"ShuffleBlockBatchId"
+        logWarning(diagnosisResponse)
+        diagnosisResponse
       case unexpected: BlockId =>
         throw new IllegalArgumentException(s"Unexpected type of BlockId, 
$unexpected")
     }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index e3c47c12b8f2..138490836244 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1943,4 +1943,31 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 
     assert(err2.getMessage.contains("corrupt at reset"))
   }
+
+  test("SPARK-43242: Fix throw 'Unexpected type of BlockId' in shuffle 
corruption diagnose") {
+    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+    val blocks = Map[BlockId, ManagedBuffer](
+      ShuffleBlockBatchId(0, 0, 0, 3) -> createMockManagedBuffer())
+    answerFetchBlocks { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      listener.onBlockFetchSuccess(ShuffleBlockBatchId(0, 0, 0, 3).toString, 
mockCorruptBuffer())
+    }
+
+    val logAppender = new LogAppender("diagnose corruption")
+    withLogAppender(logAppender) {
+      val iterator = createShuffleBlockIteratorWithDefaults(
+        Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
+        streamWrapperLimitSize = Some(100)
+      )
+      intercept[FetchFailedException](iterator.next())
+      verify(transfer, times(2))
+        .fetchBlocks(any(), any(), any(), any(), any(), any())
+      assert(logAppender.loggingEvents.count(
+        _.getMessage.getFormattedMessage.contains("Start corruption 
diagnosis")) === 1)
+      assert(logAppender.loggingEvents.exists(
+        _.getMessage.getFormattedMessage.contains("shuffle_0_0_0_3 is 
corrupted " +
+          "but corruption diagnosis is skipped due to lack of " +
+          "shuffle checksum support for ShuffleBlockBatchId")))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to