This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 855de101c [CELEBORN-1691] Fix the issue that upstream tasks don't 
rerun and the current task still retry when failed to decompress in flink
855de101c is described below

commit 855de101c260aae91670cf9875d819e776bd0156
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 22:43:26 2024 +0800

    [CELEBORN-1691] Fix the issue that upstream tasks don't rerun and the 
current task still retry when failed to decompress in flink
    
    ### What changes were proposed in this pull request?
    
    Fix the issue that upstream tasks don't rerun and the current task still 
retry when failed to decompress in flink
    
    ### Why are the changes needed?
    Decompress error should retry upstream otherwise this is un-recoverable.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manually
    
    Closes #2884 from reswqa/fix_decompress.
    
    Authored-by: Weijie Guo <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../flink/RemoteShuffleInputGateDelegation.java       | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index 4192b0e65..95ad20495 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -98,6 +98,8 @@ public class RemoteShuffleInputGateDelegation {
   /** Received buffers from remote shuffle worker. It's consumed by upper 
computing task. */
   private Queue<Pair<Buffer, InputChannelInfo>> receivedBuffers = new 
LinkedList<>();
 
+  private Map<InputChannelInfo, ResultPartitionID> channelToResultPartitionId;
+
   /** {@link Throwable} when reading failure. */
   private Throwable cause;
 
@@ -171,7 +173,7 @@ public class RemoteShuffleInputGateDelegation {
     this.endSubIndex = endSubIndex;
 
     initShuffleReadClients();
-
+    channelToResultPartitionId = new HashMap<>();
     channelsInfo = createChannelInfos();
     this.numConcurrentReading = numConcurrentReading;
     this.availabilityHelper = availabilityHelper;
@@ -220,7 +222,13 @@ public class RemoteShuffleInputGateDelegation {
 
   private List<InputChannelInfo> createChannelInfos() {
     return IntStream.range(0, gateDescriptor.getShuffleDescriptors().length)
-        .mapToObj(i -> new InputChannelInfo(gateIndex, i))
+        .mapToObj(
+            i -> {
+              InputChannelInfo channelInfo = new InputChannelInfo(gateIndex, 
i);
+              channelToResultPartitionId.put(
+                  channelInfo, 
gateDescriptor.getShuffleDescriptors()[i].getResultPartitionID());
+              return channelInfo;
+            })
         .collect(Collectors.toList());
   }
 
@@ -319,13 +327,14 @@ public class RemoteShuffleInputGateDelegation {
     }
   }
 
-  private Buffer decompressBufferIfNeeded(Buffer buffer) throws IOException {
+  private Buffer decompressBufferIfNeeded(Buffer buffer, InputChannelInfo 
info) throws IOException {
     if (buffer.isCompressed()) {
       try {
         checkState(bufferDecompressor != null, "Buffer decompressor not set.");
         return bufferDecompressor.decompressToIntermediateBuffer(buffer);
       } catch (Throwable t) {
-        throw new IOException("Decompress failure", t);
+        LOG.error("Decompress buffer error: inputChannelInfo: {}", info, t);
+        throw new 
PartitionNotFoundException(channelToResultPartitionId.get(info));
       } finally {
         buffer.recycleBuffer();
       }
@@ -422,7 +431,7 @@ public class RemoteShuffleInputGateDelegation {
   private Optional<BufferOrEvent> transformBuffer(Buffer buf, InputChannelInfo 
info)
       throws IOException {
     return Optional.of(
-        new BufferOrEvent(decompressBufferIfNeeded(buf), info, !isFinished(), 
false));
+        new BufferOrEvent(decompressBufferIfNeeded(buf, info), info, 
!isFinished(), false));
   }
 
   private Optional<BufferOrEvent> transformEvent(Buffer buffer, 
InputChannelInfo channelInfo)

Reply via email to