This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new a44072aee Revert "[CELEBORN-1376] Push data failed should always 
release request body"
a44072aee is described below

commit a44072aee4b7a71106c08930e0ffb099379786dc
Author: zhengtao <[email protected]>
AuthorDate: Fri Dec 13 11:20:11 2024 +0800

    Revert "[CELEBORN-1376] Push data failed should always release request body"
    
    This reverts commit b65b5433dceaa19dc22f6a878e5160751a7ef036.
    
    <!--
    Thanks for sending a pull request!  Here are some tips for you:
      - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] 
Your PR title ...'.
      - Be sure to keep the PR description updated to reflect all changes.
      - Please write your PR title to summarize what this PR proposes.
      - If possible, provide a concise example to reproduce the issue for a 
faster review.
    -->
    
    ### What changes were proposed in this pull request?
    Revert [CELEBORN-1376](https://github.com/apache/celeborn/pull/2449)
    This pr will introduce reference count error when replica enable and 
workers randomly terminate
    
    ### Why are the changes needed?
    When data replication is enabled and workers are randomly terminated there 
will be IllegalReferenceCountException `refCnt: 0, decrement: 1` which will 
fail the task.
    
![image](https://github.com/user-attachments/assets/bb4965e4-5fa2-44ad-bb88-36bd475a6b5f)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    cluster testing.
    
    Closes #2992 from zaynt4606/clbr1376.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit c316fdbdfb8f3bf28545815088ea163b7aff1a0b)
    Signed-off-by: Shuang <[email protected]>
---
 .../plugin/flink/FlinkShuffleClientImplSuiteJ.java |  2 +-
 .../common/network/client/TransportClient.java     | 54 ++--------------------
 2 files changed, 5 insertions(+), 51 deletions(-)

diff --git 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
index 824e0f0d7..161e728d9 100644
--- 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
+++ 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
@@ -110,7 +110,7 @@ public class FlinkShuffleClientImplSuiteJ {
   @Test
   public void testPushDataByteBufFail() throws IOException {
     ByteBuf byteBuf = Unpooled.wrappedBuffer(TEST_BUF1);
-    when(client.pushData(any(), anyLong(), any(), any(), any()))
+    when(client.pushData(any(), anyLong(), any(), any()))
         .thenAnswer(
             t -> {
               RpcResponseCallback rpcResponseCallback = t.getArgument(1, 
RpcResponseCallback.class);
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index a9670b05c..5482bdf6f 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -216,23 +216,6 @@ public class TransportClient implements Closeable {
       long pushDataTimeout,
       RpcResponseCallback callback,
       Runnable rpcSendoutCallback) {
-    Runnable rpcFailureCallback =
-        () -> {
-          try {
-            pushData.body().release();
-          } catch (Throwable e) {
-            logger.error("Error release buffer for PUSH_DATA request {}", 
pushData.requestId, e);
-          }
-        };
-    return pushData(pushData, pushDataTimeout, callback, rpcSendoutCallback, 
rpcFailureCallback);
-  }
-
-  public ChannelFuture pushData(
-      PushData pushData,
-      long pushDataTimeout,
-      RpcResponseCallback callback,
-      Runnable rpcSendoutCallback,
-      Runnable rpcFailureCallback) {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
     }
@@ -242,8 +225,7 @@ public class TransportClient implements Closeable {
     PushRequestInfo info = new PushRequestInfo(dueTime, callback);
     handler.addPushRequest(requestId, info);
     pushData.requestId = requestId;
-    PushChannelListener listener =
-        new PushChannelListener(requestId, rpcSendoutCallback, 
rpcFailureCallback);
+    PushChannelListener listener = new PushChannelListener(requestId, 
rpcSendoutCallback);
     ChannelFuture channelFuture = 
channel.writeAndFlush(pushData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -251,26 +233,6 @@ public class TransportClient implements Closeable {
 
   public ChannelFuture pushMergedData(
       PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback 
callback) {
-    Runnable rpcFailureCallback =
-        () -> {
-          try {
-            pushMergedData.body().release();
-          } catch (Throwable e) {
-            logger.error(
-                "Error release buffer for PUSH_MERGED_DATA request {}",
-                pushMergedData.requestId,
-                e);
-          }
-        };
-    return pushMergedData(pushMergedData, pushDataTimeout, callback, null, 
rpcFailureCallback);
-  }
-
-  public ChannelFuture pushMergedData(
-      PushMergedData pushMergedData,
-      long pushDataTimeout,
-      RpcResponseCallback callback,
-      Runnable rpcSendoutCallback,
-      Runnable rpcFailureCallback) {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing merged data to {}", 
NettyUtils.getRemoteAddress(channel));
     }
@@ -281,8 +243,7 @@ public class TransportClient implements Closeable {
     handler.addPushRequest(requestId, info);
     pushMergedData.requestId = requestId;
 
-    PushChannelListener listener =
-        new PushChannelListener(requestId, rpcSendoutCallback, 
rpcFailureCallback);
+    PushChannelListener listener = new PushChannelListener(requestId);
     ChannelFuture channelFuture = 
channel.writeAndFlush(pushMergedData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -455,18 +416,14 @@ public class TransportClient implements Closeable {
     final long pushRequestId;
     Runnable rpcSendOutCallback;
 
-    Runnable rpcFailureCallback;
-
     PushChannelListener(long pushRequestId) {
-      this(pushRequestId, null, null);
+      this(pushRequestId, null);
     }
 
-    PushChannelListener(
-        long pushRequestId, Runnable rpcSendOutCallback, Runnable 
rpcFailureCallback) {
+    PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
       super("PUSH " + pushRequestId);
       this.pushRequestId = pushRequestId;
       this.rpcSendOutCallback = rpcSendOutCallback;
-      this.rpcFailureCallback = rpcFailureCallback;
     }
 
     @Override
@@ -480,9 +437,6 @@ public class TransportClient implements Closeable {
     @Override
     protected void handleFailure(String errorMsg, Throwable cause) {
       handler.handlePushFailure(pushRequestId, errorMsg, cause);
-      if (rpcFailureCallback != null) {
-        rpcFailureCallback.run();
-      }
     }
   }
 }

Reply via email to