This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 08716da1a [CELEBORN-1567] Support throw FetchFailedException when Data
corruption detected
08716da1a is described below
commit 08716da1aabd168e7b968a4dfc4069c8996a7b95
Author: sychen <[email protected]>
AuthorDate: Tue Aug 20 12:29:53 2024 +0800
[CELEBORN-1567] Support throw FetchFailedException when Data corruption
detected
### What changes were proposed in this pull request?
### Why are the changes needed?
https://github.com/apache/celeborn/pull/2655#pullrequestreview-2213124224
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2691 from cxzl25/CELEBORN-1567.
Authored-by: sychen <[email protected]>
Signed-off-by: Shaoyun Chen <[email protected]>
(cherry picked from commit b8f275d888cfcbe4aa036b2d0021d7117599812a)
Signed-off-by: Shaoyun Chen <[email protected]>
---
.../apache/celeborn/client/read/CelebornInputStream.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index cd1e5d061..611aa00af 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -27,8 +27,10 @@ import java.util.concurrent.atomic.LongAdder;
import scala.Tuple2;
+import com.github.luben.zstd.ZstdException;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.buffer.ByteBuf;
+import net.jpountz.lz4.LZ4Exception;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -664,7 +666,7 @@ public abstract class CelebornInputStream extends
InputStream {
}
return hasData;
- } catch (IOException e) {
+ } catch (LZ4Exception | ZstdException | IOException e) {
logger.error(
"Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {},
partitionId {}, location {}",
appShuffleId,
@@ -672,7 +674,12 @@ public abstract class CelebornInputStream extends
InputStream {
partitionId,
currentReader.getLocation(),
e);
- IOException ioe = e;
+ IOException ioe;
+ if (e instanceof IOException) {
+ ioe = (IOException) e;
+ } else {
+ ioe = new IOException(e);
+ }
if (exceptionMaker != null) {
if (shuffleClient.reportShuffleFetchFailure(appShuffleId,
shuffleId)) {
/*
@@ -689,7 +696,7 @@ public abstract class CelebornInputStream extends
InputStream {
throw ioe;
} catch (Exception e) {
logger.error(
- "Failed to read data from chunk. AppShuffleId {}, shuffleId {},
partitionId {}, location {}",
+ "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {},
partitionId {}, location {}",
appShuffleId,
shuffleId,
partitionId,