szetszwo commented on code in PR #1179:
URL: https://github.com/apache/ratis/pull/1179#discussion_r1852553011


##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -390,8 +390,13 @@ static void sendDataStreamException(Throwable throwable, 
DataStreamRequestByteBu
   void cleanUp(Set<ClientInvocationId> ids) {
     for (ClientInvocationId clientInvocationId : ids) {
       Optional.ofNullable(streams.remove(clientInvocationId))
-          .map(StreamInfo::getLocal)
-          .ifPresent(LocalStream::cleanUp);
+          .ifPresent(streamInfo -> {
+            streamInfo.getDivision()
+                .getDataStreamMap()
+                .remove(clientInvocationId);
+            streamInfo.getLocal().cleanUp();
+            streamInfo.applyToRemotes(out -> out.out.closeAsync());
+          });

Review Comment:
   Let's use the `removeDataStream` method here.
   
   BTW, we should update it to not passing `StreamInfo` and then check if the 
removed steam is the same.  If not, call `cleanUp(..)`.  See below.
   ```java
   -  private void removeDataStream(ClientInvocationId invocationId, StreamInfo 
info) {
   +  private StreamInfo removeDataStream(ClientInvocationId invocationId) {
        final StreamInfo removed = streams.remove(invocationId);
   -    if (info == null) {
   -      info = removed;
   -    }
   -    if (info != null) {
   -      info.getDivision().getDataStreamMap().remove(invocationId);
   -      info.getLocal().cleanUp();
   +    if (removed != null) {
   +      removed.cleanUp(invocationId);
        }
   +    return removed;
      }
    
      private void readImpl(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx,
   @@ -479,7 +480,10 @@ public class DataStreamManagement {
          try {
            if (exception != null) {
              replyDataStreamException(server, exception, info.getRequest(), 
request, ctx);
   -          removeDataStream(key, info);
   +          final StreamInfo removed = removeDataStream(key);
   +          if (removed != info) {
   +            info.cleanUp(key);
   +          }
            }
          } finally {
            request.release();
   ```



##########
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java:
##########
@@ -423,6 +428,7 @@ private void removeDataStream(ClientInvocationId 
invocationId, StreamInfo info)
     if (info != null) {
       info.getDivision().getDataStreamMap().remove(invocationId);
       info.getLocal().cleanUp();
+      info.applyToRemotes(out -> out.out.closeAsync());

Review Comment:
   Let's add a `cleanUp()` method to `StreamInfo`.
   ```java
       void cleanUp(ClientInvocationId invocationId) {
         getDivision().getDataStreamMap().remove(invocationId);
         getLocal().cleanUp();
         applyToRemotes(remote -> remote.out.closeAsync());
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to