szetszwo commented on code in PR #1021:
URL: https://github.com/apache/ratis/pull/1021#discussion_r1458122562


##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java:
##########
@@ -64,7 +64,11 @@ private static class PendingOrderedRequest implements 
SlidingWindow.ServerSideRe
 
     PendingOrderedRequest(ReferenceCountedObject<RaftClientRequest> 
requestRef) {
       this.requestRef = requestRef;
-      this.request = requestRef != null ? requestRef.get() : null;
+      this.request = requestRef != null ? requestRef.retain() : null;
+    }
+
+    void release() {
+      requestRef.release();
     }

Review Comment:
   This method could be removed.



##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java:
##########
@@ -128,9 +128,11 @@ <OUTPUT, THROWABLE extends Throwable> OUTPUT 
runSequentially(
    * Used by the leader.
    */
   default CompletableFuture<Long> appendEntry(LogEntryProto entry, 
TransactionContext context) {
-    return appendEntry(entry);
+    return appendEntry(ReferenceCountedObject.wrap(entry), context);
   }
 
+  CompletableFuture<Long> appendEntry(ReferenceCountedObject<LogEntryProto> 
entryRef, TransactionContext context);
+

Review Comment:
   Since this is in `ratis-server-api`, we have to provide a default 
implementation of the new method for compatibility.
   ```java
     /**
      * Append asynchronously an entry.
      * Used by the leader.
      */
     default CompletableFuture<Long> 
appendEntry(ReferenceCountedObject<LogEntryProto> entryRef, TransactionContext 
context) {
       return appendEntry(entryRef.get(), context);
     }
   
     /**
      * @deprecated use {@link #append(ReferenceCountedObject)}.
      */
     @Deprecated
     default CompletableFuture<Long> appendEntry(LogEntryProto entry, 
TransactionContext context) {
       throw new UnsupportedOperationException();
     }
   ```



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java:
##########
@@ -363,6 +367,7 @@ void processClientRequest(PendingOrderedRequest pending) {
       final long seq = pending.getSeqNum();
       processClientRequest(pending.getRequestRef(),
           reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
+      pending.release();

Review Comment:
   We should use `whenComplete(..)`.
   ```java
         final ReferenceCountedObject<RaftClientRequest> ref = 
pending.getRequestRef();
         processClientRequest(ref, reply -> slidingWindow.receiveReply(seq, 
reply, this::sendReply))
             .whenComplete((v, e) -> ref.release());
   ```



##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java:
##########
@@ -128,9 +128,11 @@ <OUTPUT, THROWABLE extends Throwable> OUTPUT 
runSequentially(
    * Used by the leader.
    */
   default CompletableFuture<Long> appendEntry(LogEntryProto entry, 
TransactionContext context) {
-    return appendEntry(entry);
+    return appendEntry(ReferenceCountedObject.wrap(entry), context);
   }
 
+  CompletableFuture<Long> appendEntry(ReferenceCountedObject<LogEntryProto> 
entryRef, TransactionContext context);
+

Review Comment:
   Update also `RaftLogBase`:
   ```java
   @@ -345,15 +346,17 @@ public abstract class RaftLogBase implements RaftLog {
    
      @Override
      public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
   -    return appendEntry(entry, null);
   +    return appendEntry(ReferenceCountedObject.wrap(entry), null);
      }
    
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to