This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 88124d763 [CELEBORN-1691][FOLLOWUP] Fix the issue that upstream tasks
don't rerun and the current task still retry when failed to deserialize in flink
88124d763 is described below
commit 88124d763a1f673a1e5d0452d088c03d44de8d76
Author: SteNicholas <[email protected]>
AuthorDate: Thu May 15 17:18:06 2025 +0800
[CELEBORN-1691][FOLLOWUP] Fix the issue that upstream tasks don't rerun and
the current task still retry when failed to deserialize in flink
### What changes were proposed in this pull request?
Fix the issue that upstream tasks don't rerun and the current task still
retry when failed to deserialize in flink. `RemoteShuffleInputGateDelegation`
should keep the same behavior when failed to decompress.
Follow up #2884.
### Why are the changes needed?
Deserialize error should retry upstream otherwise this is un-recoverable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual.
Closes #3255 from SteNicholas/CELEBORN-1691.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index 6162d0bbc..31d61b89a 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -440,7 +440,8 @@ public class RemoteShuffleInputGateDelegation {
try {
event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
} catch (Throwable t) {
- throw new IOException("Deserialize failure.", t);
+ LOG.error("Failed to deserialize event: inputChannelInfo {}.",
channelInfo, t);
+ throw new
PartitionNotFoundException(channelToResultPartitionId.get(channelInfo));
} finally {
buffer.recycleBuffer();
}