This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 6a0dd5334 [CELEBORN-1652] Throw TransportableError for failure of
sending PbReadAddCredit to avoid flink task get stuck
6a0dd5334 is described below
commit 6a0dd5334f493be6b21b6d5d931c592b4d7656cf
Author: codenohup <[email protected]>
AuthorDate: Tue Oct 15 22:01:25 2024 +0800
[CELEBORN-1652] Throw TransportableError for failure of sending
PbReadAddCredit to avoid flink task get stuck
### What changes were proposed in this pull request?
Throw `TransportableError` for failure of sending `PbReadAddCredit` to
avoid flink task get stuck.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
by manual verification
Closes #2811 from codenohup/fix-send-credit.
Authored-by: codenohup <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit af930e7f6788cb040578884526c37241affd8005)
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/plugin/flink/readclient/CelebornBufferStream.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
index d0571a14a..43d3a61a5 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
@@ -100,10 +100,12 @@ public class CelebornBufferStream {
@Override
public void onFailure(Throwable e) {
- logger.warn(
- "Send PbReadAddCredit to {} failed, detail {}",
+ logger.error(
+ "Send PbReadAddCredit to {} failed, streamId {}, detail {}",
NettyUtils.getRemoteAddress(client.getChannel()),
+ streamId,
e.getCause());
+ messageConsumer.accept(new TransportableError(streamId, e));
}
});
}