This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 855de101c [CELEBORN-1691] Fix the issue that upstream tasks don't
rerun and the current task still retry when failed to decompress in flink
855de101c is described below
commit 855de101c260aae91670cf9875d819e776bd0156
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 22:43:26 2024 +0800
[CELEBORN-1691] Fix the issue that upstream tasks don't rerun and the
current task still retry when failed to decompress in flink
### What changes were proposed in this pull request?
Fix the issue that upstream tasks don't rerun and the current task still
retry when failed to decompress in flink
### Why are the changes needed?
Decompress error should retry upstream otherwise this is un-recoverable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually
Closes #2884 from reswqa/fix_decompress.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../flink/RemoteShuffleInputGateDelegation.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index 4192b0e65..95ad20495 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -98,6 +98,8 @@ public class RemoteShuffleInputGateDelegation {
/** Received buffers from remote shuffle worker. It's consumed by upper
computing task. */
private Queue<Pair<Buffer, InputChannelInfo>> receivedBuffers = new
LinkedList<>();
+ private Map<InputChannelInfo, ResultPartitionID> channelToResultPartitionId;
+
/** {@link Throwable} when reading failure. */
private Throwable cause;
@@ -171,7 +173,7 @@ public class RemoteShuffleInputGateDelegation {
this.endSubIndex = endSubIndex;
initShuffleReadClients();
-
+ channelToResultPartitionId = new HashMap<>();
channelsInfo = createChannelInfos();
this.numConcurrentReading = numConcurrentReading;
this.availabilityHelper = availabilityHelper;
@@ -220,7 +222,13 @@ public class RemoteShuffleInputGateDelegation {
private List<InputChannelInfo> createChannelInfos() {
return IntStream.range(0, gateDescriptor.getShuffleDescriptors().length)
- .mapToObj(i -> new InputChannelInfo(gateIndex, i))
+ .mapToObj(
+ i -> {
+ InputChannelInfo channelInfo = new InputChannelInfo(gateIndex,
i);
+ channelToResultPartitionId.put(
+ channelInfo,
gateDescriptor.getShuffleDescriptors()[i].getResultPartitionID());
+ return channelInfo;
+ })
.collect(Collectors.toList());
}
@@ -319,13 +327,14 @@ public class RemoteShuffleInputGateDelegation {
}
}
- private Buffer decompressBufferIfNeeded(Buffer buffer) throws IOException {
+ private Buffer decompressBufferIfNeeded(Buffer buffer, InputChannelInfo
info) throws IOException {
if (buffer.isCompressed()) {
try {
checkState(bufferDecompressor != null, "Buffer decompressor not set.");
return bufferDecompressor.decompressToIntermediateBuffer(buffer);
} catch (Throwable t) {
- throw new IOException("Decompress failure", t);
+ LOG.error("Decompress buffer error: inputChannelInfo: {}", info, t);
+ throw new
PartitionNotFoundException(channelToResultPartitionId.get(info));
} finally {
buffer.recycleBuffer();
}
@@ -422,7 +431,7 @@ public class RemoteShuffleInputGateDelegation {
private Optional<BufferOrEvent> transformBuffer(Buffer buf, InputChannelInfo
info)
throws IOException {
return Optional.of(
- new BufferOrEvent(decompressBufferIfNeeded(buf), info, !isFinished(),
false));
+ new BufferOrEvent(decompressBufferIfNeeded(buf, info), info,
!isFinished(), false));
}
private Optional<BufferOrEvent> transformEvent(Buffer buffer,
InputChannelInfo channelInfo)