This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch release-3.1.3
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8c7fbb6c03f9249244a38e4d4a6143bf8f5e5f69
Author: Symious <[email protected]>
AuthorDate: Thu Dec 26 02:57:36 2024 +0800

    RATIS-2197. Clean remote stream to resolve direct memory leak (#1179)
---
 .../ratis/client/impl/DataStreamClientImpl.java    |  3 ++
 .../ratis/netty/server/DataStreamManagement.java   | 35 ++++++++++++++--------
 2 files changed, 26 insertions(+), 12 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index ba91866d7..313131cbd 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -153,6 +153,9 @@ public class DataStreamClientImpl implements 
DataStreamClient {
 
     private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, 
long length, Iterable<WriteOption> options) {
       if (isClosed()) {
+        if (data instanceof ByteBuf) {
+          ((ByteBuf) data).release();
+        }
         return JavaUtils.completeExceptionally(new AlreadyClosedException(
             clientId + ": stream already closed, request=" + header));
       }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 74d5cd7fd..711063185 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -211,6 +211,12 @@ public class DataStreamManagement {
 
       return Collections.emptySet();
     }
+
+    void cleanUp(ClientInvocationId invocationId) {
+      getDivision().getDataStreamMap().remove(invocationId);
+      getLocal().cleanUp();
+      applyToRemotes(remote -> remote.out.closeAsync());
+    }
   }
 
   private final RaftServer server;
@@ -301,6 +307,9 @@ public class DataStreamManagement {
     final DataChannel channel = stream.getDataChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
+      if (buffer.remaining() == 0) {
+        continue;
+      }
       final ReferenceCountedObject<ByteBuffer> wrapped = 
ReferenceCountedObject.wrap(
           buffer, buf::retain, ignored -> buf.release());
       try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
@@ -389,9 +398,7 @@ public class DataStreamManagement {
 
   void cleanUp(Set<ClientInvocationId> ids) {
     for (ClientInvocationId clientInvocationId : ids) {
-      Optional.ofNullable(streams.remove(clientInvocationId))
-          .map(StreamInfo::getLocal)
-          .ifPresent(LocalStream::cleanUp);
+      removeDataStream(clientInvocationId);
     }
   }
 
@@ -411,19 +418,16 @@ public class DataStreamManagement {
       readImpl(request, ctx, getStreams);
     } catch (Throwable t) {
       replyDataStreamException(t, request, ctx);
-      removeDataStream(ClientInvocationId.valueOf(request.getClientId(), 
request.getStreamId()), null);
+      removeDataStream(ClientInvocationId.valueOf(request.getClientId(), 
request.getStreamId()));
     }
   }
 
-  private void removeDataStream(ClientInvocationId invocationId, StreamInfo 
info) {
+  private StreamInfo removeDataStream(ClientInvocationId invocationId) {
     final StreamInfo removed = streams.remove(invocationId);
-    if (info == null) {
-      info = removed;
-    }
-    if (info != null) {
-      info.getDivision().getDataStreamMap().remove(invocationId);
-      info.getLocal().cleanUp();
+    if (removed != null) {
+      removed.cleanUp(invocationId);
     }
+    return removed;
   }
 
   private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx,
@@ -479,7 +483,14 @@ public class DataStreamManagement {
       try {
         if (exception != null) {
           replyDataStreamException(server, exception, info.getRequest(), 
request, ctx);
-          removeDataStream(key, info);
+          final StreamInfo removed = removeDataStream(key);
+          if (removed != null) {
+            Preconditions.assertSame(info, removed, "removed");
+          } else {
+            info.cleanUp(key);
+          }
+        } else if (close) {
+          info.applyToRemotes(remote -> remote.out.closeAsync());
         }
       } finally {
         request.release();

Reply via email to