szetszwo commented on code in PR #1179:
URL: https://github.com/apache/ratis/pull/1179#discussion_r1855522106


##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -390,8 +396,7 @@ static void sendDataStreamException(Throwable throwable, 
DataStreamRequestByteBu
   void cleanUp(Set<ClientInvocationId> ids) {
     for (ClientInvocationId clientInvocationId : ids) {
       Optional.ofNullable(streams.remove(clientInvocationId))
-          .map(StreamInfo::getLocal)
-          .ifPresent(LocalStream::cleanUp);
+          .ifPresent(streamInfo -> streamInfo.cleanUp(clientInvocationId));

Review Comment:
   We should call `removeDataStream` here, i.e.
   ```diff
      void cleanUp(Set<ClientInvocationId> ids) {
        for (ClientInvocationId clientInvocationId : ids) {
   -      Optional.ofNullable(streams.remove(clientInvocationId))
   -          .map(StreamInfo::getLocal)
   -          .ifPresent(LocalStream::cleanUp);
   +      removeDataStream(clientInvocationId);
        }
      }
   ```



##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -479,7 +485,12 @@ private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ct
       try {
         if (exception != null) {
           replyDataStreamException(server, exception, info.getRequest(), 
request, ctx);
-          removeDataStream(key, info);
+        }
+        if (close || exception != null) {

Review Comment:
   For normal close case,
   - the local stream (to a local file) is a `DataStream` created by an 
application.  It should be  removed from the `DataStreamMap` and closed during 
`link(..)`.
   - Remote streams (server to server) should be closed here.  This is the leak.
   
   To play safe, call `info.cleanUp(key)` when `removed == null`.  The code 
becomes
   ```java
           if (exception != null) {
             replyDataStreamException(server, exception, info.getRequest(), 
request, ctx);
             final StreamInfo removed = removeDataStream(key);
             if (removed != null) {
               Preconditions.assertSame(info, removed, "removed");
             } else {
               info.cleanUp(key);
             }
           } else if (close) {
             info.applyToRemotes(remote -> remote.out.closeAsync());
           }
   ```



##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -459,7 +461,11 @@ private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ct
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
-      localWrite = info.getLocal().write(request.slice(), 
request.getWriteOptionList(), writeExecutor);
+      if (close && request.getDataLength() == 0) {
+        localWrite = CompletableFuture.completedFuture(0L);
+      } else {
+        localWrite = info.getLocal().write(request.slice(), 
request.getWriteOptionList(), writeExecutor);
+      }

Review Comment:
   This part is an optimization for not calling `getLocal().write(..)`.  
However, `getLocal().write(..)` also takes care the `WriteOption`s such as 
`SYNC` and `FLUSH`.  So, we cannot skip it here.  We could skip it later on in
   ```java
   //writeTo(..)
   @@ -301,6 +307,9 @@ public class DataStreamManagement {
        final DataChannel channel = stream.getDataChannel();
        long byteWritten = 0;
        for (ByteBuffer buffer : buf.nioBuffers()) {
   +      if (buffer.remaining() == 0) {
   +        continue;
   +      }
          final ReferenceCountedObject<ByteBuffer> wrapped = 
ReferenceCountedObject.wrap(
              buffer, buf::retain, ignored -> buf.release());
          try(UncheckedAutoCloseable ignore = 
wrapped.retainAndReleaseOnClose()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to