This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d4ec6933 RATIS-1990. Refactor appendEntries processing to support 
reference count (#1011)
5d4ec6933 is described below

commit 5d4ec6933f538d0bf2c40483ad3d37173c82a1ca
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Jan 18 00:23:22 2024 -0800

    RATIS-1990. Refactor appendEntries processing to support reference count 
(#1011)
---
 .../grpc/server/GrpcServerProtocolService.java     |  3 +-
 .../protocol/RaftServerAsynchronousProtocol.java   | 31 +++++++++++++++++--
 .../ratis/server/raftlog/RaftLogSequentialOps.java | 21 +++++++++++--
 .../apache/ratis/server/impl/RaftServerImpl.java   | 21 ++++++++-----
 .../apache/ratis/server/impl/RaftServerProxy.java  | 14 ++++++---
 .../apache/ratis/server/raftlog/RaftLogBase.java   | 14 +++++++--
 .../ratis/server/raftlog/memory/MemoryRaftLog.java | 35 +++++++++++++++-------
 .../server/raftlog/segmented/SegmentedRaftLog.java | 15 ++++++++--
 .../server/raftlog/memory/MemoryRaftLogTest.java   |  9 +++---
 .../raftlog/segmented/TestCacheEviction.java       |  5 ++--
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  8 +++--
 11 files changed, 135 insertions(+), 41 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 766e14321..dde01c39a 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -30,6 +30,7 @@ import 
org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.*;
 import 
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,7 +227,7 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
         RaftServerProtocol.Op.APPEND_ENTRIES, responseObserver) {
       @Override
       CompletableFuture<AppendEntriesReplyProto> 
process(AppendEntriesRequestProto request) throws IOException {
-        return server.appendEntriesAsync(request);
+        return server.appendEntriesAsync(ReferenceCountedObject.wrap(request));
       }
 
       @Override
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
index 8a904069b..1244e7254 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
@@ -22,14 +22,41 @@ import 
org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
 import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.util.ReferenceCountedObject;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 public interface RaftServerAsynchronousProtocol {
 
-  CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request)
-      throws IOException;
+  /**
+   * It is recommended to override {@link 
#appendEntriesAsync(ReferenceCountedObject)} instead.
+   * Then, it does not have to override this method.
+   */
+  default CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * A referenced counted request is submitted from a client for processing.
+   * Implementations of this method should retain the request, process it and 
then release it.
+   * The request may be retained even after the future returned by this method 
has completed.
+   *
+   * @return a future of the reply
+   * @see ReferenceCountedObject
+   */
+  default CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+      ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws 
IOException {
+    // Default implementation for backward compatibility.
+    try {
+      return appendEntriesAsync(requestRef.retain())
+          .whenComplete((r, e) -> requestRef.release());
+    } catch (Exception e) {
+      requestRef.release();
+      throw e;
+    }
+  }
 
   CompletableFuture<ReadIndexReplyProto> readIndexAsync(ReadIndexRequestProto 
request)
       throws IOException;
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index 7b9f42b6b..e4523cd93 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -22,8 +22,10 @@ import 
org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.StringUtils;
 import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 
 import java.util.Arrays;
 import java.util.List;
@@ -132,21 +134,36 @@ interface RaftLogSequentialOps {
   /**
    * The same as append(Arrays.asList(entries)).
    *
-   * @deprecated use {@link #append(List)}
+   * @deprecated use {@link #append(ReferenceCountedObject)}.
    */
   @Deprecated
   default List<CompletableFuture<Long>> append(LogEntryProto... entries) {
     return append(Arrays.asList(entries));
   }
 
+  /**
+   * @deprecated use {@link #append(ReferenceCountedObject)}.
+   */
+  @Deprecated
+  default List<CompletableFuture<Long>> append(List<LogEntryProto> entries) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Append asynchronously all the given log entries.
    * Used by the followers.
    *
    * If an existing entry conflicts with a new one (same index but different 
terms),
    * delete the existing entry and all entries that follow it (ยง5.3).
+   *
+   * A reference counter is also submitted.
+   * For each entry, implementations of this method should retain the counter, 
process it and then release.
    */
-  List<CompletableFuture<Long>> append(List<LogEntryProto> entries);
+  default List<CompletableFuture<Long>> 
append(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+    try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries = 
entriesRef.retainAndReleaseOnClose()) {
+      return append(entries.get());
+    }
+  }
 
   /**
    * Truncate asynchronously the log entries till the given index 
(inclusively).
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index de6491364..8ad835474 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1460,27 +1460,30 @@ class RaftServerImpl implements RaftServer.Division,
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
       throws IOException {
     try {
-      return appendEntriesAsync(r).join();
+      return appendEntriesAsync(ReferenceCountedObject.wrap(r)).join();
     } catch (CompletionException e) {
       throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e));
     }
   }
 
   @Override
-  public CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto r)
-      throws IOException {
+  public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+      ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws 
IOException {
+    final AppendEntriesRequestProto r = requestRef.retain();
     final RaftRpcRequestProto request = r.getServerRequest();
     final List<LogEntryProto> entries = r.getEntriesList();
     final TermIndex previous = r.hasPreviousLog()? 
TermIndex.valueOf(r.getPreviousLog()) : null;
     final RaftPeerId requestorId = 
RaftPeerId.valueOf(request.getRequestorId());
 
-    preAppendEntriesAsync(requestorId, 
ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
-        previous, r.getLeaderCommit(), r.getInitializing(), entries);
     try {
+      preAppendEntriesAsync(requestorId, 
ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
+          previous, r.getLeaderCommit(), r.getInitializing(), entries);
       return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, 
r.getLeaderCommit(),
-          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), 
entries);
+          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), 
entries, requestRef)
+          .whenComplete((reply, e) -> requestRef.release());
     } catch(Exception t) {
       LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
+      requestRef.release();
       throw t;
     }
   }
@@ -1554,7 +1557,8 @@ class RaftServerImpl implements RaftServer.Division,
   @SuppressWarnings("checkstyle:parameternumber")
   private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, long leaderTerm, TermIndex previous, long 
leaderCommit, long callId, boolean initializing,
-      List<CommitInfoProto> commitInfos, List<LogEntryProto> entries) throws 
IOException {
+      List<CommitInfoProto> commitInfos, List<LogEntryProto> entries,
+      ReferenceCountedObject<?> requestRef) throws IOException {
     final boolean isHeartbeat = entries.isEmpty();
     logAppendEntries(isHeartbeat,
         () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + 
leaderTerm + ", "
@@ -1612,8 +1616,9 @@ class RaftServerImpl implements RaftServer.Division,
       state.updateConfiguration(entries);
     }
 
+
     final List<CompletableFuture<Long>> futures = entries.isEmpty() ? 
Collections.emptyList()
-        : state.getLog().append(entries);
+        : state.getLog().append(requestRef.delegate(entries));
     commitInfos.forEach(commitInfoCache::update);
 
     CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index cb7918e51..589c7eeb1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -645,10 +645,16 @@ class RaftServerProxy implements RaftServer {
   }
 
   @Override
-  public CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request) {
-    final RaftGroupId groupId = 
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
-    return getImplFuture(groupId)
-        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> 
impl.appendEntriesAsync(request)));
+  public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+      ReferenceCountedObject<AppendEntriesRequestProto> requestRef) {
+    AppendEntriesRequestProto request = requestRef.retain();
+    try {
+      final RaftGroupId groupId = 
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
+      return getImplFuture(groupId)
+          .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> 
impl.appendEntriesAsync(requestRef)));
+    } finally {
+      requestRef.release();
+    }
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 708b499cd..0b6fa15b6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -31,7 +31,9 @@ import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 
 import java.io.IOException;
 import java.util.List;
@@ -354,11 +356,19 @@ public abstract class RaftLogBase implements RaftLog {
   protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto 
entry, TransactionContext context);
 
   @Override
-  public final List<CompletableFuture<Long>> append(List<LogEntryProto> 
entries) {
+  public final List<CompletableFuture<Long>> 
append(ReferenceCountedObject<List<LogEntryProto>> entries) {
     return runner.runSequentially(() -> appendImpl(entries));
   }
 
-  protected abstract List<CompletableFuture<Long>> 
appendImpl(List<LogEntryProto> entries);
+  protected List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> 
entries) {
+    throw new UnsupportedOperationException();
+  }
+
+  protected List<CompletableFuture<Long>> 
appendImpl(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+    try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries = 
entriesRef.retainAndReleaseOnClose()) {
+      return appendImpl(entries.get());
+    }
+  }
 
   @Override
   public String toString() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index ebb1e27d7..7435bb178 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,6 +36,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.LongSupplier;
 
 /**
@@ -42,10 +44,10 @@ import java.util.function.LongSupplier;
  */
 public class MemoryRaftLog extends RaftLogBase {
   static class EntryList {
-    private final List<LogEntryProto> entries = new ArrayList<>();
+    private final List<ReferenceCountedObject<LogEntryProto>> entries = new 
ArrayList<>();
 
     LogEntryProto get(int i) {
-      return i >= 0 && i < entries.size() ? entries.get(i) : null;
+      return i >= 0 && i < entries.size() ? entries.get(i).get() : null;
     }
 
     TermIndex getTermIndex(int i) {
@@ -62,18 +64,25 @@ public class MemoryRaftLog extends RaftLogBase {
 
     void truncate(int index) {
       if (entries.size() > index) {
-        entries.subList(index, entries.size()).clear();
+        clear(index, entries.size());
       }
     }
 
     void purge(int index) {
       if (entries.size() > index) {
-        entries.subList(0, index).clear();
+        clear(0, index);
       }
     }
 
-    void add(LogEntryProto entry) {
-      entries.add(entry);
+    void clear(int from, int to) {
+      List<ReferenceCountedObject<LogEntryProto>> subList = 
entries.subList(from, to);
+      subList.forEach(ReferenceCountedObject::release);
+      subList.clear();
+    }
+
+    void add(ReferenceCountedObject<LogEntryProto> entryRef) {
+      entryRef.retain();
+      entries.add(entryRef);
     }
   }
 
@@ -170,7 +179,9 @@ public class MemoryRaftLog extends RaftLogBase {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       validateLogEntry(entry);
-      entries.add(entry);
+      Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> wrap = 
context != null ?
+          context::wrap : ReferenceCountedObject::wrap;
+      entries.add(wrap.apply(entry));
     }
     return CompletableFuture.completedFuture(entry.getIndex());
   }
@@ -181,12 +192,14 @@ public class MemoryRaftLog extends RaftLogBase {
   }
 
   @Override
-  public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> 
logEntryProtos) {
+  public List<CompletableFuture<Long>> 
appendImpl(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
     checkLogState();
+    final List<LogEntryProto> logEntryProtos = entriesRef.retain();
     if (logEntryProtos == null || logEntryProtos.isEmpty()) {
+      entriesRef.release();
       return Collections.emptyList();
     }
-    try(AutoCloseableLock writeLock = writeLock()) {
+    try (AutoCloseableLock writeLock = writeLock()) {
       // Before truncating the entries, we first need to check if some
       // entries are duplicated. If the leader sends entry 6, entry 7, then
       // entry 6 again, without this check the follower may truncate entry 7
@@ -214,10 +227,12 @@ public class MemoryRaftLog extends RaftLogBase {
       }
       for (int i = index; i < logEntryProtos.size(); i++) {
         LogEntryProto logEntryProto = logEntryProtos.get(i);
-        this.entries.add(logEntryProto);
+        entries.add(entriesRef.delegate(logEntryProto));
         
futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
       }
       return futures;
+    } finally {
+      entriesRef.release();
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1cfb5933f..b4e958965 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -36,6 +36,7 @@ import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateI
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.AwaitToRun;
@@ -454,12 +455,14 @@ public final class SegmentedRaftLog extends RaftLogBase {
   }
 
   @Override
-  public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> entries) 
{
+  protected List<CompletableFuture<Long>> 
appendImpl(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
     checkLogState();
+    final List<LogEntryProto> entries = entriesRef.retain();
     if (entries == null || entries.isEmpty()) {
+      entriesRef.release();
       return Collections.emptyList();
     }
-    try(AutoCloseableLock writeLock = writeLock()) {
+    try (AutoCloseableLock writeLock = writeLock()) {
       final TruncateIndices ti = 
cache.computeTruncateIndices(server::notifyTruncatedLogEntry, entries);
       final long truncateIndex = ti.getTruncateIndex();
       final int index = ti.getArrayIndex();
@@ -474,9 +477,15 @@ public final class SegmentedRaftLog extends RaftLogBase {
       }
       for (int i = index; i < entries.size(); i++) {
         final LogEntryProto entry = entries.get(i);
-        futures.add(appendEntry(entry, server.getTransactionContext(entry, 
true)));
+        TransactionContextImpl transactionContext = (TransactionContextImpl) 
server.getTransactionContext(entry, true);
+        if (transactionContext != null) {
+          transactionContext.setDelegatedRef(entriesRef);
+        }
+        futures.add(appendEntry(entry, transactionContext));
       }
       return futures;
+    } finally {
+      entriesRef.release();
     }
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
index 5d8d090a3..503de3453 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
@@ -33,6 +33,7 @@ import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.Slf4jUtils;
 import org.junit.Test;
 import org.slf4j.event.Level;
@@ -56,11 +57,11 @@ public class MemoryRaftLogTest extends BaseTest {
     List<LogEntryProto> entries1 = new ArrayList<>();
     entries1.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
     entries1.add(LogEntryProto.newBuilder().setIndex(1).setTerm(0).build());
-    raftLog.append(entries1).forEach(CompletableFuture::join);
+    
raftLog.append(ReferenceCountedObject.wrap(entries1)).forEach(CompletableFuture::join);
 
     List<LogEntryProto> entries2 = new ArrayList<>();
     entries2.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
-    raftLog.append(entries2).forEach(CompletableFuture::join);
+    
raftLog.append(ReferenceCountedObject.wrap(entries2)).forEach(CompletableFuture::join);
 
     final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
     assertEquals(2, termIndices.length);
@@ -84,11 +85,11 @@ public class MemoryRaftLogTest extends BaseTest {
     List<LogEntryProto> entries1 = new ArrayList<>();
     entries1.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
     entries1.add(LogEntryProto.newBuilder().setIndex(1).setTerm(0).build());
-    raftLog.append(entries1).forEach(CompletableFuture::join);
+    
raftLog.append(ReferenceCountedObject.wrap(entries1)).forEach(CompletableFuture::join);
 
     List<LogEntryProto> entries2 = new ArrayList<>();
     entries2.add(LogEntryProto.newBuilder().setIndex(0).setTerm(2).build());
-    raftLog.append(entries2).forEach(CompletableFuture::join);
+    
raftLog.append(ReferenceCountedObject.wrap(entries2)).forEach(CompletableFuture::join);
 
     final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
     assertEquals(1, termIndices.length);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 996f7ef52..675df51d6 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -38,6 +38,7 @@ import org.apache.ratis.server.storage.RaftStorageTestUtils;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 import org.junit.Assert;
 import org.junit.Test;
@@ -174,7 +175,7 @@ public class TestCacheEviction extends BaseTest {
     raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
     List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, 
maxCachedNum, 7, 0);
     List<LogEntryProto> entries = generateEntries(slist);
-    raftLog.append(entries).forEach(CompletableFuture::join);
+    
raftLog.append(ReferenceCountedObject.wrap(entries)).forEach(CompletableFuture::join);
 
     // check the current cached segment number: the last segment is still open
     Assert.assertEquals(maxCachedNum - 1,
@@ -184,7 +185,7 @@ public class TestCacheEviction extends BaseTest {
     Mockito.when(info.getFollowerNextIndices()).thenReturn(new long[]{21, 40, 
40});
     slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, maxCachedNum + 2, 
7, 7 * maxCachedNum);
     entries = generateEntries(slist);
-    raftLog.append(entries).forEach(CompletableFuture::join);
+    
raftLog.append(ReferenceCountedObject.wrap(entries)).forEach(CompletableFuture::join);
 
     // check the cached segment number again. since the slowest follower is on
     // index 21, the eviction should happen and evict 3 segments
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 38fa45e6f..3d5d5f87d 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -43,6 +43,7 @@ import 
org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -551,7 +552,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       LOG.info("newEntries[0] = {}", newEntries.get(0));
       final int last = newEntries.size() - 1;
       LOG.info("newEntries[{}] = {}", last, newEntries.get(last));
-      raftLog.append(newEntries).forEach(CompletableFuture::join);
+      
raftLog.append(ReferenceCountedObject.wrap(newEntries)).forEach(CompletableFuture::join);
 
       checkFailedEntries(entries, 650, retryCache);
       checkEntries(raftLog, entries, 0, 650);
@@ -710,8 +711,9 @@ public class TestSegmentedRaftLog extends BaseTest {
       long start = System.nanoTime();
       for (int i = 0; i < entries.size(); i += 5) {
         // call append API
-        futures.add(raftLog.append(Arrays.asList(
-            entries.get(i), entries.get(i + 1), entries.get(i + 2), 
entries.get(i + 3), entries.get(i + 4))));
+        List<LogEntryProto> entries1 = Arrays.asList(
+            entries.get(i), entries.get(i + 1), entries.get(i + 2), 
entries.get(i + 3), entries.get(i + 4));
+        futures.add(raftLog.append(ReferenceCountedObject.wrap(entries1)));
       }
       for (List<CompletableFuture<Long>> futureList: futures) {
         futureList.forEach(CompletableFuture::join);

Reply via email to