szetszwo commented on code in PR #1011:
URL: https://github.com/apache/ratis/pull/1011#discussion_r1454625906


##########
ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java:
##########
@@ -22,14 +22,41 @@
 import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.util.ReferenceCountedObject;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 public interface RaftServerAsynchronousProtocol {
 
-  CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request)
-      throws IOException;
+  /**
+   * It is recommended to override {@link 
#appendEntriesAsync(ReferenceCountedObject)} instead.
+   * Then, it does not have to override this method.
+   */
+  default CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request)
+      throws IOException {
+    return appendEntriesAsync(ReferenceCountedObject.wrap(request));

Review Comment:
   This should throw `UnsupportedOperationException`.  The Ratis code (except 
for the default 
`appendEntriesAsync(ReferenceCountedObject<AppendEntriesRequestProto>)` 
implementation below) should not call this method anymore.   Then, an 
implementation can override either of the methods.
   
   If an implementation does not override both of the methods, it will get 
`UnsupportedOperationException`.



##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java:
##########
@@ -132,21 +133,34 @@ default CompletableFuture<Long> appendEntry(LogEntryProto 
entry, TransactionCont
   /**
    * The same as append(Arrays.asList(entries)).
    *
-   * @deprecated use {@link #append(List)}
+   * @deprecated use {@link #append(List, ReferenceCountedObject)}
    */
   @Deprecated
   default List<CompletableFuture<Long>> append(LogEntryProto... entries) {
     return append(Arrays.asList(entries));
   }
 
+  /**
+   * @deprecated use {@link #append(List, ReferenceCountedObject)}.
+   */
+  @Deprecated
+  default List<CompletableFuture<Long>> append(List<LogEntryProto> entries) {
+    return append(entries, null);
+  }

Review Comment:
   This should throw `UnsupportedOperationException`.
   



##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java:
##########
@@ -181,7 +192,8 @@ public long getStartIndex() {
   }
 
   @Override
-  public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> 
logEntryProtos) {
+  public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> 
logEntryProtos,
+      ReferenceCountedObject<?> entriesRef) {

Review Comment:
   It should be
   ```java
     public List<CompletableFuture<Long>> 
appendImpl(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
   ```



##########
ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java:
##########
@@ -132,21 +133,34 @@ default CompletableFuture<Long> appendEntry(LogEntryProto 
entry, TransactionCont
   /**
    * The same as append(Arrays.asList(entries)).
    *
-   * @deprecated use {@link #append(List)}
+   * @deprecated use {@link #append(List, ReferenceCountedObject)}
    */
   @Deprecated
   default List<CompletableFuture<Long>> append(LogEntryProto... entries) {
     return append(Arrays.asList(entries));
   }
 
+  /**
+   * @deprecated use {@link #append(List, ReferenceCountedObject)}.
+   */
+  @Deprecated
+  default List<CompletableFuture<Long>> append(List<LogEntryProto> entries) {
+    return append(entries, null);
+  }
+
   /**
    * Append asynchronously all the given log entries.
    * Used by the followers.
    *
    * If an existing entry conflicts with a new one (same index but different 
terms),
    * delete the existing entry and all entries that follow it (ยง5.3).
+   *
+   * A reference counter is also submitted.
+   * For each entry, implementations of this method should retain the counter, 
process it and then release.
    */
-  List<CompletableFuture<Long>> append(List<LogEntryProto> entries);
+  default List<CompletableFuture<Long>> append(List<LogEntryProto> entries, 
ReferenceCountedObject<?> entriesRef) {

Review Comment:
   This should be
   ```java
     default List<CompletableFuture<Long>> 
append(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
       try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries = 
entriesRef.retainAndReleaseOnClose()) {
         return append(entries.get());
       }
     }
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java:
##########
@@ -212,9 +224,13 @@ public List<CompletableFuture<Long>> 
appendImpl(List<LogEntryProto> logEntryProt
       } else {
         futures = new ArrayList<>(logEntryProtos.size() - index);
       }
+
+      Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> wrap = 
entriesRef != null ?
+          entriesRef::delegate : ReferenceCountedObject::wrap;
       for (int i = index; i < logEntryProtos.size(); i++) {
         LogEntryProto logEntryProto = logEntryProtos.get(i);
-        this.entries.add(logEntryProto);
+        ReferenceCountedObject<LogEntryProto> entryRef = 
wrap.apply(logEntryProto);
+        this.entries.add(entryRef);
         
futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
       }

Review Comment:
   Then, we could use `entriesRef` to delegate:
   ```java
         for (int i = index; i < logEntryProtos.size(); i++) {
           LogEntryProto logEntryProto = logEntryProtos.get(i);
           entries.add(entriesRef.delegate(logEntryProto));
           
futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to