This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c316fdbdf Revert "[CELEBORN-1376] Push data failed should always
release request body"
c316fdbdf is described below
commit c316fdbdfb8f3bf28545815088ea163b7aff1a0b
Author: zhengtao <[email protected]>
AuthorDate: Fri Dec 13 11:20:11 2024 +0800
Revert "[CELEBORN-1376] Push data failed should always release request body"
This reverts commit b65b5433dceaa19dc22f6a878e5160751a7ef036.
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX]
Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a
faster review.
-->
### What changes were proposed in this pull request?
Revert [CELEBORN-1376](https://github.com/apache/celeborn/pull/2449)
This pr will introduce reference count error when replica enable and
workers randomly terminate
### Why are the changes needed?
When data replication is enabled and workers are randomly terminated there
will be IllegalReferenceCountException `refCnt: 0, decrement: 1` which will
fail the task.

### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
cluster testing.
Closes #2992 from zaynt4606/clbr1376.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../plugin/flink/FlinkShuffleClientImplSuiteJ.java | 2 +-
.../common/network/client/TransportClient.java | 54 ++--------------------
2 files changed, 5 insertions(+), 51 deletions(-)
diff --git
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
index cb15c1e15..d0e0dee28 100644
---
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
+++
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
@@ -111,7 +111,7 @@ public class FlinkShuffleClientImplSuiteJ {
@Test
public void testPushDataByteBufFail() throws IOException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(TEST_BUF1);
- when(client.pushData(any(), anyLong(), any(), any(), any()))
+ when(client.pushData(any(), anyLong(), any(), any()))
.thenAnswer(
t -> {
RpcResponseCallback rpcResponseCallback = t.getArgument(1,
RpcResponseCallback.class);
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index 3fc7c43b2..4987ca38c 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -216,23 +216,6 @@ public class TransportClient implements Closeable {
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback) {
- Runnable rpcFailureCallback =
- () -> {
- try {
- pushData.body().release();
- } catch (Throwable e) {
- logger.error("Error release buffer for PUSH_DATA request {}",
pushData.requestId, e);
- }
- };
- return pushData(pushData, pushDataTimeout, callback, rpcSendoutCallback,
rpcFailureCallback);
- }
-
- public ChannelFuture pushData(
- PushData pushData,
- long pushDataTimeout,
- RpcResponseCallback callback,
- Runnable rpcSendoutCallback,
- Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
}
@@ -242,8 +225,7 @@ public class TransportClient implements Closeable {
PushRequestInfo info = new PushRequestInfo(dueTime, callback);
handler.addPushRequest(requestId, info);
pushData.requestId = requestId;
- PushChannelListener listener =
- new PushChannelListener(requestId, rpcSendoutCallback,
rpcFailureCallback);
+ PushChannelListener listener = new PushChannelListener(requestId,
rpcSendoutCallback);
ChannelFuture channelFuture =
channel.writeAndFlush(pushData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@@ -251,26 +233,6 @@ public class TransportClient implements Closeable {
public ChannelFuture pushMergedData(
PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback
callback) {
- Runnable rpcFailureCallback =
- () -> {
- try {
- pushMergedData.body().release();
- } catch (Throwable e) {
- logger.error(
- "Error release buffer for PUSH_MERGED_DATA request {}",
- pushMergedData.requestId,
- e);
- }
- };
- return pushMergedData(pushMergedData, pushDataTimeout, callback, null,
rpcFailureCallback);
- }
-
- public ChannelFuture pushMergedData(
- PushMergedData pushMergedData,
- long pushDataTimeout,
- RpcResponseCallback callback,
- Runnable rpcSendoutCallback,
- Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing merged data to {}",
NettyUtils.getRemoteAddress(channel));
}
@@ -281,8 +243,7 @@ public class TransportClient implements Closeable {
handler.addPushRequest(requestId, info);
pushMergedData.requestId = requestId;
- PushChannelListener listener =
- new PushChannelListener(requestId, rpcSendoutCallback,
rpcFailureCallback);
+ PushChannelListener listener = new PushChannelListener(requestId);
ChannelFuture channelFuture =
channel.writeAndFlush(pushMergedData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
@@ -498,18 +459,14 @@ public class TransportClient implements Closeable {
final long pushRequestId;
Runnable rpcSendOutCallback;
- Runnable rpcFailureCallback;
-
PushChannelListener(long pushRequestId) {
- this(pushRequestId, null, null);
+ this(pushRequestId, null);
}
- PushChannelListener(
- long pushRequestId, Runnable rpcSendOutCallback, Runnable
rpcFailureCallback) {
+ PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
super("PUSH " + pushRequestId);
this.pushRequestId = pushRequestId;
this.rpcSendOutCallback = rpcSendOutCallback;
- this.rpcFailureCallback = rpcFailureCallback;
}
@Override
@@ -523,9 +480,6 @@ public class TransportClient implements Closeable {
@Override
protected void handleFailure(String errorMsg, Throwable cause) {
handler.handlePushFailure(pushRequestId, errorMsg, cause);
- if (rpcFailureCallback != null) {
- rpcFailureCallback.run();
- }
}
}
}