This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 63d1209bb [CELEBORN-1567] Support throw FetchFailedException when Data
corruption detected
63d1209bb is described below
commit 63d1209bb913597dda8b199efb2837495d9e8028
Author: sychen <[email protected]>
AuthorDate: Tue Aug 20 12:29:53 2024 +0800
[CELEBORN-1567] Support throw FetchFailedException when Data corruption
detected
### What changes were proposed in this pull request?
### Why are the changes needed?
https://github.com/apache/celeborn/pull/2655#pullrequestreview-2213124224
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2691 from cxzl25/CELEBORN-1567.
Authored-by: sychen <[email protected]>
Signed-off-by: Shaoyun Chen <[email protected]>
(cherry picked from commit b8f275d888cfcbe4aa036b2d0021d7117599812a)
---
.../apache/celeborn/client/read/CelebornInputStream.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 1ffca2954..4c517eebc 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -25,8 +25,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
+import com.github.luben.zstd.ZstdException;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.buffer.ByteBuf;
+import net.jpountz.lz4.LZ4Exception;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -644,7 +646,7 @@ public abstract class CelebornInputStream extends
InputStream {
}
return hasData;
- } catch (IOException e) {
+ } catch (LZ4Exception | ZstdException | IOException e) {
logger.error(
"Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {},
partitionId {}, location {}",
appShuffleId,
@@ -652,7 +654,12 @@ public abstract class CelebornInputStream extends
InputStream {
partitionId,
currentReader.getLocation(),
e);
- IOException ioe = e;
+ IOException ioe;
+ if (e instanceof IOException) {
+ ioe = (IOException) e;
+ } else {
+ ioe = new IOException(e);
+ }
if (exceptionMaker != null) {
if (shuffleClient.reportShuffleFetchFailure(appShuffleId,
shuffleId)) {
/*
@@ -669,7 +676,7 @@ public abstract class CelebornInputStream extends
InputStream {
throw ioe;
} catch (Exception e) {
logger.error(
- "Failed to read data from chunk. AppShuffleId {}, shuffleId {},
partitionId {}, location {}",
+ "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {},
partitionId {}, location {}",
appShuffleId,
shuffleId,
partitionId,