This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 57e73dafeb9 [SPARK-45678][CORE] Cover 
BufferReleasingInputStream.available/reset under tryOrFetchFailedException
57e73dafeb9 is described below

commit 57e73dafeb9ffd288e0ed4ad76b9549e7da33ad7
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Fri Oct 27 19:21:04 2023 -0700

    [SPARK-45678][CORE] Cover BufferReleasingInputStream.available/reset under 
tryOrFetchFailedException
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to wrap `BufferReleasingInputStream.available/reset` 
under `tryOrFetchFailedException`. So `IOException` during `available`/`reset` 
call will be rethrown as `FetchFailedException`.
    
    ### Why are the changes needed?
    
    We have encountered shuffle data corruption issue:
    
    ```
    Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
    at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
    at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
    at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:543)
    at 
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450)
    at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497)
    at 
org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356)
    ```
    
    Spark shuffle has capacity to detect corruption for a few stream op like 
`read` and `skip`, such `IOException` in the stack trace will be rethrown as 
`FetchFailedException` that will re-try the failed shuffle task. But in the 
stack trace it is `available` that is not covered by the mechanism. So no-retry 
has been happened and the Spark application just failed.
    
    As the `available`/`reset` op will also involve data decompression and 
throw `IOException`, we should be able to check it like `read` and `skip` do.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Data corruption during `available`/`reset` op is now causing 
`FetchFailedException` like `read` and `skip` that can be retried instead of 
`IOException`.
    
    ### How was this patch tested?
    
    Added test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43543 from viirya/add_available.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../storage/ShuffleBlockFetcherIterator.scala      |  8 ++-
 .../storage/ShuffleBlockFetcherIteratorSuite.scala | 64 +++++++++++++++++++++-
 2 files changed, 68 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index d539611271c..6144cc0e6a9 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1354,7 +1354,8 @@ private class BufferReleasingInputStream(
     }
   }
 
-  override def available(): Int = delegate.available()
+  override def available(): Int =
+    tryOrFetchFailedException(delegate.available())
 
   override def mark(readlimit: Int): Unit = delegate.mark(readlimit)
 
@@ -1369,12 +1370,13 @@ private class BufferReleasingInputStream(
   override def read(b: Array[Byte], off: Int, len: Int): Int =
     tryOrFetchFailedException(delegate.read(b, off, len))
 
-  override def reset(): Unit = delegate.reset()
+  override def reset(): Unit = tryOrFetchFailedException(delegate.reset())
 
   /**
    * Execute a block of code that returns a value, close this stream quietly 
and re-throwing
    * IOException as FetchFailedException when detectCorruption is true. This 
method is only
-   * used by the `read` and `skip` methods inside `BufferReleasingInputStream` 
currently.
+   * used by the `available`, `read` and `skip` methods inside 
`BufferReleasingInputStream`
+   * currently.
    */
   private def tryOrFetchFailedException[T](block: => T): T = {
     try {
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index cf883567fe2..c16dae77b83 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -182,6 +182,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       blocksByAddress: Map[BlockManagerId, Seq[(BlockId, Long, Int)]],
       taskContext: Option[TaskContext] = None,
       streamWrapperLimitSize: Option[Long] = None,
+      corruptAtAvailableReset: Boolean = false,
       blockManager: Option[BlockManager] = None,
       maxBytesInFlight: Long = Long.MaxValue,
       maxReqsInFlight: Int = Int.MaxValue,
@@ -201,7 +202,14 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       blockManager.getOrElse(createMockBlockManager()),
       mapOutputTracker,
       blocksByAddress.iterator,
-      (_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, 
_)).getOrElse(in),
+      (_, in) => {
+        val limited = streamWrapperLimitSize.map(new LimitedInputStream(in, 
_)).getOrElse(in)
+        if (corruptAtAvailableReset) {
+          new CorruptAvailableResetStream(limited)
+        } else {
+          limited
+        }
+      },
       maxBytesInFlight,
       maxReqsInFlight,
       maxBlocksInFlightPerAddress,
@@ -712,6 +720,16 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     corruptBuffer
   }
 
+  private class CorruptAvailableResetStream(in: InputStream) extends 
InputStream {
+    override def read(): Int = in.read()
+
+    override def read(dest: Array[Byte], off: Int, len: Int): Int = 
in.read(dest, off, len)
+
+    override def available(): Int = throw new IOException("corrupt at 
available")
+
+    override def reset(): Unit = throw new IOException("corrupt at reset")
+  }
+
   private class CorruptStream(corruptAt: Long = 0L) extends InputStream {
     var pos = 0
     var closed = false
@@ -1879,4 +1897,48 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       blockManager = Some(blockManager), streamWrapperLimitSize = Some(100))
     verifyLocalBlocksFromFallback(iterator)
   }
+
+  test("SPARK-45678: retry corrupt blocks on available() and reset()") {
+    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+    val blocks = Map[BlockId, ManagedBuffer](
+      ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer()
+    )
+
+    // Semaphore to coordinate event sequence in two different threads.
+    val sem = new Semaphore(0)
+
+    answerFetchBlocks { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      Future {
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 0, 0).toString, createMockManagedBuffer())
+        sem.release()
+      }
+    }
+
+    val iterator = createShuffleBlockIteratorWithDefaults(
+      Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
+      streamWrapperLimitSize = Some(100),
+      detectCorruptUseExtraMemory = false, // Don't use 
`ChunkedByteBufferInputStream`.
+      corruptAtAvailableReset = true,
+      checksumEnabled = false
+    )
+
+    sem.acquire()
+
+    val (id1, stream) = iterator.next()
+    assert(id1 === ShuffleBlockId(0, 0, 0))
+
+    val err1 = intercept[FetchFailedException] {
+      stream.available()
+    }
+
+    assert(err1.getMessage.contains("corrupt at available"))
+
+    val err2 = intercept[FetchFailedException] {
+      stream.reset()
+    }
+
+    assert(err2.getMessage.contains("corrupt at reset"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to