This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b49a79317 RATIS-1847. Stream has memory leak. (#884)
b49a79317 is described below

commit b49a79317fbc1fcb68448bd603df6f6bf9d18808
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 19 23:33:46 2023 +0800

    RATIS-1847. Stream has memory leak. (#884)
---
 .../ratis/netty/server/DataStreamManagement.java   | 30 ++++++++++++----------
 .../netty/server/DataStreamRequestByteBuf.java     | 22 +++++++++++++---
 2 files changed, 35 insertions(+), 17 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 362199323..b3f42bf32 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -96,6 +96,10 @@ public class DataStreamManagement {
           n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options, 
stream, executor)
               .whenComplete((l, e) -> metrics.stop(context, e == null))));
     }
+
+    void cleanUp() {
+      streamFuture.thenAccept(DataStream::cleanUp);
+    }
   }
 
   static class RemoteStream {
@@ -374,31 +378,33 @@ public class DataStreamManagement {
       ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
     } catch (Throwable t) {
       LOG.warn("Failed to sendDataStreamException {} for {}", throwable, 
request, t);
+    } finally {
+      request.release();
     }
   }
 
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
       CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
     LOG.debug("{}: read {}", this, request);
-    final ByteBuf buf = request.slice();
     try {
-      readImpl(request, ctx, buf, getStreams);
+      readImpl(request, ctx, getStreams);
     } catch (Throwable t) {
       replyDataStreamException(t, request, ctx);
-      buf.release();
     }
   }
 
-  private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx, ByteBuf buf,
+  private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx,
       CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, 
Set<DataStreamOutputRpc>, IOException> getStreams) {
     final boolean close = 
request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
     ClientInvocationId key =  
ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
-      final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> 
newStreamInfo(buf, getStreams));
+      final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
+          () -> newStreamInfo(request.slice(), getStreams));
       info = streams.computeIfAbsent(key, id -> supplier.get());
       if (!supplier.isInitialized()) {
-        streams.remove(key);
+        final StreamInfo removed = streams.remove(key);
+        removed.getLocal().cleanUp();
         throw new IllegalStateException("Failed to create a new stream for " + 
request
             + " since a stream already exists Key: " + key + " StreamInfo:" + 
info);
       }
@@ -408,10 +414,7 @@ public class DataStreamManagement {
           () -> new IllegalStateException("Failed to remove StreamInfo for " + 
request));
     } else {
       info = Optional.ofNullable(streams.get(key)).orElseThrow(
-          () -> {
-            streams.remove(key);
-            return new IllegalStateException("Failed to get StreamInfo for " + 
request);
-          });
+          () -> new IllegalStateException("Failed to get StreamInfo for " + 
request));
     }
 
     final CompletableFuture<Long> localWrite;
@@ -420,7 +423,7 @@ public class DataStreamManagement {
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
-      localWrite = info.getLocal().write(buf, request.getWriteOptionList(), 
writeExecutor);
+      localWrite = info.getLocal().write(request.slice(), 
request.getWriteOptionList(), writeExecutor);
       remoteWrites = info.applyToRemotes(out -> out.write(request, 
requestExecutor));
     } else {
       throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
@@ -439,11 +442,12 @@ public class DataStreamManagement {
         }, requestExecutor)).whenComplete((v, exception) -> {
       try {
         if (exception != null) {
-          streams.remove(key);
+          final StreamInfo removed = streams.remove(key);
           replyDataStreamException(server, exception, info.getRequest(), 
request, ctx);
+          removed.getLocal().cleanUp();
         }
       } finally {
-        buf.release();
+        request.release();
       }
     });
   }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 0dcd46e02..2542b1ec6 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -30,6 +30,8 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Implements {@link DataStreamRequest} with {@link ByteBuf}.
@@ -37,13 +39,13 @@ import java.util.List;
  * This class is immutable.
  */
 public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements 
DataStreamRequest {
-  private final ByteBuf buf;
+  private final AtomicReference<ByteBuf> buf;
   private final List<WriteOption> options;
 
   public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, 
long streamOffset,
                                   Iterable<WriteOption> options, ByteBuf buf) {
     super(clientId, type, streamId, streamOffset);
-    this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
+    this.buf = new AtomicReference<>(buf != null? buf.asReadOnly(): 
Unpooled.EMPTY_BUFFER);
     this.options = Collections.unmodifiableList(Lists.newArrayList(options));
   }
 
@@ -52,13 +54,25 @@ public class DataStreamRequestByteBuf extends 
DataStreamPacketImpl implements Da
          header.getWriteOptionList(), buf);
   }
 
+  ByteBuf getBuf() {
+    return Optional.ofNullable(buf.get()).orElseThrow(
+        () -> new IllegalStateException("buf is already released in " + this));
+  }
+
   @Override
   public long getDataLength() {
-    return buf.readableBytes();
+    return getBuf().readableBytes();
   }
 
   public ByteBuf slice() {
-    return buf.slice();
+    return getBuf().slice();
+  }
+
+  public void release() {
+    final ByteBuf got = buf.getAndSet(null);
+    if (got != null) {
+      got.release();
+    }
   }
 
   @Override

Reply via email to