This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 01f7cccef [CELEBORN-1376] Push data failed should always release 
request body
01f7cccef is described below

commit 01f7cccef6a538c26c06fc519aac6a52d6596b1a
Author: Angerszhuuuu <[email protected]>
AuthorDate: Wed Apr 10 19:42:14 2024 +0800

    [CELEBORN-1376] Push data failed should always release request body
    
    ### What changes were proposed in this pull request?
    Worker netty not release
    <img width="1729" alt="截屏2024-04-07 17 26 40" 
src="https://github.com/apache/celeborn/assets/46485123/5774f735-570b-448e-ab94-4c78661717f5";>
    
    Many push failed
    <img width="767" alt="截屏2024-04-07 17 27 46" 
src="https://github.com/apache/celeborn/assets/46485123/41866bd0-d634-4dbf-8518-b474c8d1faad";>
    
    1. For spark shuffle client, enable it release push data body when rpc 
failure
    2. For flink client, since it use wrapped bytbuf, we need release push data 
body when rpc failure and release origin body when rpc completed.
    3. For worker replicate, we should enable it release push data body when 
rpc failure.
    
    ### Why are the changes needed?
    Avoid worker netty memory leak
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2449 from AngersZhuuuu/CELEBORN-1376.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit b65b5433dceaa19dc22f6a878e5160751a7ef036)
    Signed-off-by: mingji <[email protected]>
---
 .../plugin/flink/FlinkShuffleClientImplSuiteJ.java |  2 +-
 .../common/network/client/TransportClient.java     | 54 ++++++++++++++++++++--
 2 files changed, 51 insertions(+), 5 deletions(-)

diff --git 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
index 161e728d9..824e0f0d7 100644
--- 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
+++ 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
@@ -110,7 +110,7 @@ public class FlinkShuffleClientImplSuiteJ {
   @Test
   public void testPushDataByteBufFail() throws IOException {
     ByteBuf byteBuf = Unpooled.wrappedBuffer(TEST_BUF1);
-    when(client.pushData(any(), anyLong(), any(), any()))
+    when(client.pushData(any(), anyLong(), any(), any(), any()))
         .thenAnswer(
             t -> {
               RpcResponseCallback rpcResponseCallback = t.getArgument(1, 
RpcResponseCallback.class);
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index 5482bdf6f..a9670b05c 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -216,6 +216,23 @@ public class TransportClient implements Closeable {
       long pushDataTimeout,
       RpcResponseCallback callback,
       Runnable rpcSendoutCallback) {
+    Runnable rpcFailureCallback =
+        () -> {
+          try {
+            pushData.body().release();
+          } catch (Throwable e) {
+            logger.error("Error release buffer for PUSH_DATA request {}", 
pushData.requestId, e);
+          }
+        };
+    return pushData(pushData, pushDataTimeout, callback, rpcSendoutCallback, 
rpcFailureCallback);
+  }
+
+  public ChannelFuture pushData(
+      PushData pushData,
+      long pushDataTimeout,
+      RpcResponseCallback callback,
+      Runnable rpcSendoutCallback,
+      Runnable rpcFailureCallback) {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
     }
@@ -225,7 +242,8 @@ public class TransportClient implements Closeable {
     PushRequestInfo info = new PushRequestInfo(dueTime, callback);
     handler.addPushRequest(requestId, info);
     pushData.requestId = requestId;
-    PushChannelListener listener = new PushChannelListener(requestId, 
rpcSendoutCallback);
+    PushChannelListener listener =
+        new PushChannelListener(requestId, rpcSendoutCallback, 
rpcFailureCallback);
     ChannelFuture channelFuture = 
channel.writeAndFlush(pushData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -233,6 +251,26 @@ public class TransportClient implements Closeable {
 
   public ChannelFuture pushMergedData(
       PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback 
callback) {
+    Runnable rpcFailureCallback =
+        () -> {
+          try {
+            pushMergedData.body().release();
+          } catch (Throwable e) {
+            logger.error(
+                "Error release buffer for PUSH_MERGED_DATA request {}",
+                pushMergedData.requestId,
+                e);
+          }
+        };
+    return pushMergedData(pushMergedData, pushDataTimeout, callback, null, 
rpcFailureCallback);
+  }
+
+  public ChannelFuture pushMergedData(
+      PushMergedData pushMergedData,
+      long pushDataTimeout,
+      RpcResponseCallback callback,
+      Runnable rpcSendoutCallback,
+      Runnable rpcFailureCallback) {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing merged data to {}", 
NettyUtils.getRemoteAddress(channel));
     }
@@ -243,7 +281,8 @@ public class TransportClient implements Closeable {
     handler.addPushRequest(requestId, info);
     pushMergedData.requestId = requestId;
 
-    PushChannelListener listener = new PushChannelListener(requestId);
+    PushChannelListener listener =
+        new PushChannelListener(requestId, rpcSendoutCallback, 
rpcFailureCallback);
     ChannelFuture channelFuture = 
channel.writeAndFlush(pushMergedData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -416,14 +455,18 @@ public class TransportClient implements Closeable {
     final long pushRequestId;
     Runnable rpcSendOutCallback;
 
+    Runnable rpcFailureCallback;
+
     PushChannelListener(long pushRequestId) {
-      this(pushRequestId, null);
+      this(pushRequestId, null, null);
     }
 
-    PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
+    PushChannelListener(
+        long pushRequestId, Runnable rpcSendOutCallback, Runnable 
rpcFailureCallback) {
       super("PUSH " + pushRequestId);
       this.pushRequestId = pushRequestId;
       this.rpcSendOutCallback = rpcSendOutCallback;
+      this.rpcFailureCallback = rpcFailureCallback;
     }
 
     @Override
@@ -437,6 +480,9 @@ public class TransportClient implements Closeable {
     @Override
     protected void handleFailure(String errorMsg, Throwable cause) {
       handler.handlePushFailure(pushRequestId, errorMsg, cause);
+      if (rpcFailureCallback != null) {
+        rpcFailureCallback.run();
+      }
     }
   }
 }

Reply via email to