This is an automated email from the ASF dual-hosted git repository.

csy pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 08716da1a [CELEBORN-1567] Support throw FetchFailedException when Data 
corruption detected
08716da1a is described below

commit 08716da1aabd168e7b968a4dfc4069c8996a7b95
Author: sychen <[email protected]>
AuthorDate: Tue Aug 20 12:29:53 2024 +0800

    [CELEBORN-1567] Support throw FetchFailedException when Data corruption 
detected
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    https://github.com/apache/celeborn/pull/2655#pullrequestreview-2213124224
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA
    
    Closes #2691 from cxzl25/CELEBORN-1567.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Shaoyun Chen <[email protected]>
    (cherry picked from commit b8f275d888cfcbe4aa036b2d0021d7117599812a)
    Signed-off-by: Shaoyun Chen <[email protected]>
---
 .../apache/celeborn/client/read/CelebornInputStream.java    | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index cd1e5d061..611aa00af 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -27,8 +27,10 @@ import java.util.concurrent.atomic.LongAdder;
 
 import scala.Tuple2;
 
+import com.github.luben.zstd.ZstdException;
 import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.buffer.ByteBuf;
+import net.jpountz.lz4.LZ4Exception;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -664,7 +666,7 @@ public abstract class CelebornInputStream extends 
InputStream {
         }
 
         return hasData;
-      } catch (IOException e) {
+      } catch (LZ4Exception | ZstdException | IOException e) {
         logger.error(
             "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, 
partitionId {}, location {}",
             appShuffleId,
@@ -672,7 +674,12 @@ public abstract class CelebornInputStream extends 
InputStream {
             partitionId,
             currentReader.getLocation(),
             e);
-        IOException ioe = e;
+        IOException ioe;
+        if (e instanceof IOException) {
+          ioe = (IOException) e;
+        } else {
+          ioe = new IOException(e);
+        }
         if (exceptionMaker != null) {
           if (shuffleClient.reportShuffleFetchFailure(appShuffleId, 
shuffleId)) {
             /*
@@ -689,7 +696,7 @@ public abstract class CelebornInputStream extends 
InputStream {
         throw ioe;
       } catch (Exception e) {
         logger.error(
-            "Failed to read data from chunk. AppShuffleId {}, shuffleId {}, 
partitionId {}, location {}",
+            "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, 
partitionId {}, location {}",
             appShuffleId,
             shuffleId,
             partitionId,

Reply via email to