This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 88124d763 [CELEBORN-1691][FOLLOWUP] Fix the issue that upstream tasks 
don't rerun and the current task still retry when failed to deserialize in flink
88124d763 is described below

commit 88124d763a1f673a1e5d0452d088c03d44de8d76
Author: SteNicholas <[email protected]>
AuthorDate: Thu May 15 17:18:06 2025 +0800

    [CELEBORN-1691][FOLLOWUP] Fix the issue that upstream tasks don't rerun and 
the current task still retry when failed to deserialize in flink
    
    ### What changes were proposed in this pull request?
    
    Fix the issue that upstream tasks don't rerun and the current task still 
retry when failed to deserialize in flink. `RemoteShuffleInputGateDelegation` 
should keep the same behavior when failed to decompress.
    
    Follow up #2884.
    
    ### Why are the changes needed?
    
    Deserialize error should retry upstream otherwise this is un-recoverable.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual.
    
    Closes #3255 from SteNicholas/CELEBORN-1691.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index 6162d0bbc..31d61b89a 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -440,7 +440,8 @@ public class RemoteShuffleInputGateDelegation {
     try {
       event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
     } catch (Throwable t) {
-      throw new IOException("Deserialize failure.", t);
+      LOG.error("Failed to deserialize event: inputChannelInfo {}.", 
channelInfo, t);
+      throw new 
PartitionNotFoundException(channelToResultPartitionId.get(channelInfo));
     } finally {
       buffer.recycleBuffer();
     }

Reply via email to