szetszwo commented on code in PR #1007:
URL: https://github.com/apache/ratis/pull/1007#discussion_r1450731169


##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -39,25 +42,32 @@ private static class PendingStream {
     private final ClientInvocationId key;
     private long nextId = -1;
     private ByteString bytes = ByteString.EMPTY;
+    private List<ReferenceCountedObject<Message>> pendingRefs = new 
LinkedList<>();
 
     PendingStream(ClientInvocationId key) {
       this.key = key;
     }
 
-    synchronized CompletableFuture<ByteString> append(long messageId, Message 
message) {
+    synchronized CompletableFuture<ReferenceCountedObject<ByteString>> 
append(long messageId,
+        ReferenceCountedObject<Message> messageRef) {
       if (nextId == -1) {
         nextId = messageId;
       } else if (messageId != nextId) {
         return JavaUtils.completeExceptionally(new StreamException(
             "Unexpected message id in " + key + ": messageId = " + messageId + 
" != nextId = " + nextId));
       }
       nextId++;
+      Message message = messageRef.retain();
+      pendingRefs.add(messageRef);
       bytes = bytes.concat(message.getContent());
-      return CompletableFuture.completedFuture(bytes);
+
+      ReferenceCountedObject<ByteString> bytesRef = 
ReferenceCountedObject.delegateFrom(pendingRefs, bytes);
+      return CompletableFuture.completedFuture(bytesRef);
     }
 
-    synchronized CompletableFuture<ByteString> getBytes(long messageId, 
Message message) {
-      return append(messageId, message);
+    synchronized CompletableFuture<ReferenceCountedObject<ByteString>> 
getBytes(long messageId,
+        ReferenceCountedObject<Message> messageRef) {
+      return append(messageId, messageRef);
     }

Review Comment:
   Make it return 
`CompletableFuture<ReferenceCountedObject<RaftClientRequest>>`.  The code will 
become simpler
   ```java
       synchronized 
CompletableFuture<ReferenceCountedObject<RaftClientRequest>> 
getWriteRequest(long messageId,
           ReferenceCountedObject<RaftClientRequest> requestRef) {
         return append(messageId, requestRef)
             .thenApply(bytes -> 
RaftClientRequest.toWriteRequest(requestRef.get(), () -> bytes))
             .thenApply(write -> 
ReferenceCountedObject.delegateFrom(pendingRefs, write));
       }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -39,25 +42,32 @@ private static class PendingStream {
     private final ClientInvocationId key;
     private long nextId = -1;
     private ByteString bytes = ByteString.EMPTY;
+    private List<ReferenceCountedObject<Message>> pendingRefs = new 
LinkedList<>();
 
     PendingStream(ClientInvocationId key) {
       this.key = key;
     }
 
-    synchronized CompletableFuture<ByteString> append(long messageId, Message 
message) {
+    synchronized CompletableFuture<ReferenceCountedObject<ByteString>> 
append(long messageId,
+        ReferenceCountedObject<Message> messageRef) {

Review Comment:
   Pass `ReferenceCountedObject<RaftClientRequest> requestRef` instead.



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java:
##########
@@ -534,15 +535,21 @@ PendingRequest addPendingRequest(PendingRequests.Permit 
permit, RaftClientReques
     return pendingRequests.add(permit, request, entry);
   }
 
-  CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
-    return messageStreamRequests.streamAsync(request)
+  CompletableFuture<RaftClientReply> 
streamAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
+    RaftClientRequest request = requestRef.get();
+    return messageStreamRequests.streamAsync(requestRef)
         .thenApply(dummy -> server.newSuccessReply(request))
         .exceptionally(e -> exception2RaftClientReply(request, e));
   }
 
-  CompletableFuture<RaftClientRequest> 
streamEndOfRequestAsync(RaftClientRequest request) {
-    return messageStreamRequests.streamEndOfRequestAsync(request)
-        .thenApply(bytes -> RaftClientRequest.toWriteRequest(request, 
Message.valueOf(bytes)));
+  CompletableFuture<ReferenceCountedObject<RaftClientRequest>> 
streamEndOfRequestAsync(
+      ReferenceCountedObject<RaftClientRequest> requestRef) {
+    RaftClientRequest request = requestRef.get();
+    return messageStreamRequests.streamEndOfRequestAsync(requestRef)
+        .thenApply(bytesRef -> {
+          RaftClientRequest finalRequest = 
RaftClientRequest.toWriteRequest(request, Message.valueOf(bytesRef.get()));
+          return bytesRef.delegate(finalRequest);
+        });

Review Comment:
   The code here becomes trivial.
   ```java
       return messageStreamRequests.streamEndOfRequestAsync(requestRef);
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -39,25 +42,32 @@ private static class PendingStream {
     private final ClientInvocationId key;
     private long nextId = -1;
     private ByteString bytes = ByteString.EMPTY;
+    private List<ReferenceCountedObject<Message>> pendingRefs = new 
LinkedList<>();

Review Comment:
   Store `RaftClientRequest` instead.
   ```java
       private final List<ReferenceCountedObject<RaftClientRequest>> 
pendingRefs = new LinkedList<>();
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/impl/MessageStreamRequests.java:
##########
@@ -101,7 +114,7 @@ CompletableFuture<ByteString> 
streamEndOfRequestAsync(RaftClientRequest request)
     if (pending == null) {
       return JavaUtils.completeExceptionally(new StreamException(name + ": " + 
key + " not found"));
     }
-    return pending.getBytes(stream.getMessageId(), request.getMessage());
+    return pending.getBytes(stream.getMessageId(), 
requestRef.delegate(request.getMessage()));

Review Comment:
   Then, we don't have to delegate here.
   ```java
     CompletableFuture<ReferenceCountedObject<RaftClientRequest>> 
streamEndOfRequestAsync(
       ...
       return pending.getWriteRequest(stream.getMessageId(), requestRef);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to