This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 57e73dafeb9 [SPARK-45678][CORE] Cover
BufferReleasingInputStream.available/reset under tryOrFetchFailedException
57e73dafeb9 is described below
commit 57e73dafeb9ffd288e0ed4ad76b9549e7da33ad7
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Oct 27 19:21:04 2023 -0700
[SPARK-45678][CORE] Cover BufferReleasingInputStream.available/reset under
tryOrFetchFailedException
### What changes were proposed in this pull request?
This patch proposes to wrap `BufferReleasingInputStream.available/reset`
under `tryOrFetchFailedException`. So `IOException` during `available`/`reset`
call will be rethrown as `FetchFailedException`.
### Why are the changes needed?
We have encountered shuffle data corruption issue:
```
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:112)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:504)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:543)
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:450)
at org.xerial.snappy.SnappyInputStream.available(SnappyInputStream.java:497)
at
org.apache.spark.storage.BufferReleasingInputStream.available(ShuffleBlockFetcherIterator.scala:1356)
```
Spark shuffle has capacity to detect corruption for a few stream op like
`read` and `skip`, such `IOException` in the stack trace will be rethrown as
`FetchFailedException` that will re-try the failed shuffle task. But in the
stack trace it is `available` that is not covered by the mechanism. So no-retry
has been happened and the Spark application just failed.
As the `available`/`reset` op will also involve data decompression and
throw `IOException`, we should be able to check it like `read` and `skip` do.
### Does this PR introduce _any_ user-facing change?
Yes. Data corruption during `available`/`reset` op is now causing
`FetchFailedException` like `read` and `skip` that can be retried instead of
`IOException`.
### How was this patch tested?
Added test.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43543 from viirya/add_available.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../storage/ShuffleBlockFetcherIterator.scala | 8 ++-
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 64 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index d539611271c..6144cc0e6a9 100644
---
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1354,7 +1354,8 @@ private class BufferReleasingInputStream(
}
}
- override def available(): Int = delegate.available()
+ override def available(): Int =
+ tryOrFetchFailedException(delegate.available())
override def mark(readlimit: Int): Unit = delegate.mark(readlimit)
@@ -1369,12 +1370,13 @@ private class BufferReleasingInputStream(
override def read(b: Array[Byte], off: Int, len: Int): Int =
tryOrFetchFailedException(delegate.read(b, off, len))
- override def reset(): Unit = delegate.reset()
+ override def reset(): Unit = tryOrFetchFailedException(delegate.reset())
/**
* Execute a block of code that returns a value, close this stream quietly
and re-throwing
* IOException as FetchFailedException when detectCorruption is true. This
method is only
- * used by the `read` and `skip` methods inside `BufferReleasingInputStream`
currently.
+ * used by the `available`, `read` and `skip` methods inside
`BufferReleasingInputStream`
+ * currently.
*/
private def tryOrFetchFailedException[T](block: => T): T = {
try {
diff --git
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index cf883567fe2..c16dae77b83 100644
---
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -182,6 +182,7 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
blocksByAddress: Map[BlockManagerId, Seq[(BlockId, Long, Int)]],
taskContext: Option[TaskContext] = None,
streamWrapperLimitSize: Option[Long] = None,
+ corruptAtAvailableReset: Boolean = false,
blockManager: Option[BlockManager] = None,
maxBytesInFlight: Long = Long.MaxValue,
maxReqsInFlight: Int = Int.MaxValue,
@@ -201,7 +202,14 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
blockManager.getOrElse(createMockBlockManager()),
mapOutputTracker,
blocksByAddress.iterator,
- (_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in,
_)).getOrElse(in),
+ (_, in) => {
+ val limited = streamWrapperLimitSize.map(new LimitedInputStream(in,
_)).getOrElse(in)
+ if (corruptAtAvailableReset) {
+ new CorruptAvailableResetStream(limited)
+ } else {
+ limited
+ }
+ },
maxBytesInFlight,
maxReqsInFlight,
maxBlocksInFlightPerAddress,
@@ -712,6 +720,16 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
corruptBuffer
}
+ private class CorruptAvailableResetStream(in: InputStream) extends
InputStream {
+ override def read(): Int = in.read()
+
+ override def read(dest: Array[Byte], off: Int, len: Int): Int =
in.read(dest, off, len)
+
+ override def available(): Int = throw new IOException("corrupt at
available")
+
+ override def reset(): Unit = throw new IOException("corrupt at reset")
+ }
+
private class CorruptStream(corruptAt: Long = 0L) extends InputStream {
var pos = 0
var closed = false
@@ -1879,4 +1897,48 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite with PrivateMethodT
blockManager = Some(blockManager), streamWrapperLimitSize = Some(100))
verifyLocalBlocksFromFallback(iterator)
}
+
+ test("SPARK-45678: retry corrupt blocks on available() and reset()") {
+ val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+ val blocks = Map[BlockId, ManagedBuffer](
+ ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer()
+ )
+
+ // Semaphore to coordinate event sequence in two different threads.
+ val sem = new Semaphore(0)
+
+ answerFetchBlocks { invocation =>
+ val listener = invocation.getArgument[BlockFetchingListener](4)
+ Future {
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 0, 0).toString, createMockManagedBuffer())
+ sem.release()
+ }
+ }
+
+ val iterator = createShuffleBlockIteratorWithDefaults(
+ Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)),
+ streamWrapperLimitSize = Some(100),
+ detectCorruptUseExtraMemory = false, // Don't use
`ChunkedByteBufferInputStream`.
+ corruptAtAvailableReset = true,
+ checksumEnabled = false
+ )
+
+ sem.acquire()
+
+ val (id1, stream) = iterator.next()
+ assert(id1 === ShuffleBlockId(0, 0, 0))
+
+ val err1 = intercept[FetchFailedException] {
+ stream.available()
+ }
+
+ assert(err1.getMessage.contains("corrupt at available"))
+
+ val err2 = intercept[FetchFailedException] {
+ stream.reset()
+ }
+
+ assert(err2.getMessage.contains("corrupt at reset"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]