This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5d4ec6933 RATIS-1990. Refactor appendEntries processing to support
reference count (#1011)
5d4ec6933 is described below
commit 5d4ec6933f538d0bf2c40483ad3d37173c82a1ca
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Jan 18 00:23:22 2024 -0800
RATIS-1990. Refactor appendEntries processing to support reference count
(#1011)
---
.../grpc/server/GrpcServerProtocolService.java | 3 +-
.../protocol/RaftServerAsynchronousProtocol.java | 31 +++++++++++++++++--
.../ratis/server/raftlog/RaftLogSequentialOps.java | 21 +++++++++++--
.../apache/ratis/server/impl/RaftServerImpl.java | 21 ++++++++-----
.../apache/ratis/server/impl/RaftServerProxy.java | 14 ++++++---
.../apache/ratis/server/raftlog/RaftLogBase.java | 14 +++++++--
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 35 +++++++++++++++-------
.../server/raftlog/segmented/SegmentedRaftLog.java | 15 ++++++++--
.../server/raftlog/memory/MemoryRaftLogTest.java | 9 +++---
.../raftlog/segmented/TestCacheEviction.java | 5 ++--
.../raftlog/segmented/TestSegmentedRaftLog.java | 8 +++--
11 files changed, 135 insertions(+), 41 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 766e14321..dde01c39a 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -30,6 +30,7 @@ import
org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.*;
import
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,7 +227,7 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
RaftServerProtocol.Op.APPEND_ENTRIES, responseObserver) {
@Override
CompletableFuture<AppendEntriesReplyProto>
process(AppendEntriesRequestProto request) throws IOException {
- return server.appendEntriesAsync(request);
+ return server.appendEntriesAsync(ReferenceCountedObject.wrap(request));
}
@Override
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
index 8a904069b..1244e7254 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
@@ -22,14 +22,41 @@ import
org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.util.ReferenceCountedObject;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public interface RaftServerAsynchronousProtocol {
- CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(AppendEntriesRequestProto request)
- throws IOException;
+ /**
+ * It is recommended to override {@link
#appendEntriesAsync(ReferenceCountedObject)} instead.
+ * Then, it does not have to override this method.
+ */
+ default CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(AppendEntriesRequestProto request)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * A referenced counted request is submitted from a client for processing.
+ * Implementations of this method should retain the request, process it and
then release it.
+ * The request may be retained even after the future returned by this method
has completed.
+ *
+ * @return a future of the reply
+ * @see ReferenceCountedObject
+ */
+ default CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+ ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws
IOException {
+ // Default implementation for backward compatibility.
+ try {
+ return appendEntriesAsync(requestRef.retain())
+ .whenComplete((r, e) -> requestRef.release());
+ } catch (Exception e) {
+ requestRef.release();
+ throw e;
+ }
+ }
CompletableFuture<ReadIndexReplyProto> readIndexAsync(ReadIndexRequestProto
request)
throws IOException;
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index 7b9f42b6b..e4523cd93 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -22,8 +22,10 @@ import
org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.StringUtils;
import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import java.util.Arrays;
import java.util.List;
@@ -132,21 +134,36 @@ interface RaftLogSequentialOps {
/**
* The same as append(Arrays.asList(entries)).
*
- * @deprecated use {@link #append(List)}
+ * @deprecated use {@link #append(ReferenceCountedObject)}.
*/
@Deprecated
default List<CompletableFuture<Long>> append(LogEntryProto... entries) {
return append(Arrays.asList(entries));
}
+ /**
+ * @deprecated use {@link #append(ReferenceCountedObject)}.
+ */
+ @Deprecated
+ default List<CompletableFuture<Long>> append(List<LogEntryProto> entries) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Append asynchronously all the given log entries.
* Used by the followers.
*
* If an existing entry conflicts with a new one (same index but different
terms),
* delete the existing entry and all entries that follow it (ยง5.3).
+ *
+ * A reference counter is also submitted.
+ * For each entry, implementations of this method should retain the counter,
process it and then release.
*/
- List<CompletableFuture<Long>> append(List<LogEntryProto> entries);
+ default List<CompletableFuture<Long>>
append(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+ try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries =
entriesRef.retainAndReleaseOnClose()) {
+ return append(entries.get());
+ }
+ }
/**
* Truncate asynchronously the log entries till the given index
(inclusively).
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index de6491364..8ad835474 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1460,27 +1460,30 @@ class RaftServerImpl implements RaftServer.Division,
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
throws IOException {
try {
- return appendEntriesAsync(r).join();
+ return appendEntriesAsync(ReferenceCountedObject.wrap(r)).join();
} catch (CompletionException e) {
throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e));
}
}
@Override
- public CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(AppendEntriesRequestProto r)
- throws IOException {
+ public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+ ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws
IOException {
+ final AppendEntriesRequestProto r = requestRef.retain();
final RaftRpcRequestProto request = r.getServerRequest();
final List<LogEntryProto> entries = r.getEntriesList();
final TermIndex previous = r.hasPreviousLog()?
TermIndex.valueOf(r.getPreviousLog()) : null;
final RaftPeerId requestorId =
RaftPeerId.valueOf(request.getRequestorId());
- preAppendEntriesAsync(requestorId,
ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
- previous, r.getLeaderCommit(), r.getInitializing(), entries);
try {
+ preAppendEntriesAsync(requestorId,
ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
+ previous, r.getLeaderCommit(), r.getInitializing(), entries);
return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous,
r.getLeaderCommit(),
- request.getCallId(), r.getInitializing(), r.getCommitInfosList(),
entries);
+ request.getCallId(), r.getInitializing(), r.getCommitInfosList(),
entries, requestRef)
+ .whenComplete((reply, e) -> requestRef.release());
} catch(Exception t) {
LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
+ requestRef.release();
throw t;
}
}
@@ -1554,7 +1557,8 @@ class RaftServerImpl implements RaftServer.Division,
@SuppressWarnings("checkstyle:parameternumber")
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
RaftPeerId leaderId, long leaderTerm, TermIndex previous, long
leaderCommit, long callId, boolean initializing,
- List<CommitInfoProto> commitInfos, List<LogEntryProto> entries) throws
IOException {
+ List<CommitInfoProto> commitInfos, List<LogEntryProto> entries,
+ ReferenceCountedObject<?> requestRef) throws IOException {
final boolean isHeartbeat = entries.isEmpty();
logAppendEntries(isHeartbeat,
() -> getMemberId() + ": receive appendEntries(" + leaderId + ", " +
leaderTerm + ", "
@@ -1612,8 +1616,9 @@ class RaftServerImpl implements RaftServer.Division,
state.updateConfiguration(entries);
}
+
final List<CompletableFuture<Long>> futures = entries.isEmpty() ?
Collections.emptyList()
- : state.getLog().append(entries);
+ : state.getLog().append(requestRef.delegate(entries));
commitInfos.forEach(commitInfoCache::update);
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index cb7918e51..589c7eeb1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -645,10 +645,16 @@ class RaftServerProxy implements RaftServer {
}
@Override
- public CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(AppendEntriesRequestProto request) {
- final RaftGroupId groupId =
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
- return getImplFuture(groupId)
- .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() ->
impl.appendEntriesAsync(request)));
+ public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+ ReferenceCountedObject<AppendEntriesRequestProto> requestRef) {
+ AppendEntriesRequestProto request = requestRef.retain();
+ try {
+ final RaftGroupId groupId =
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
+ return getImplFuture(groupId)
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() ->
impl.appendEntriesAsync(requestRef)));
+ } finally {
+ requestRef.release();
+ }
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 708b499cd..0b6fa15b6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -31,7 +31,9 @@ import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import java.io.IOException;
import java.util.List;
@@ -354,11 +356,19 @@ public abstract class RaftLogBase implements RaftLog {
protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto
entry, TransactionContext context);
@Override
- public final List<CompletableFuture<Long>> append(List<LogEntryProto>
entries) {
+ public final List<CompletableFuture<Long>>
append(ReferenceCountedObject<List<LogEntryProto>> entries) {
return runner.runSequentially(() -> appendImpl(entries));
}
- protected abstract List<CompletableFuture<Long>>
appendImpl(List<LogEntryProto> entries);
+ protected List<CompletableFuture<Long>> appendImpl(List<LogEntryProto>
entries) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected List<CompletableFuture<Long>>
appendImpl(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+ try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries =
entriesRef.retainAndReleaseOnClose()) {
+ return appendImpl(entries.get());
+ }
+ }
@Override
public String toString() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index ebb1e27d7..7435bb178 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,6 +36,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.LongSupplier;
/**
@@ -42,10 +44,10 @@ import java.util.function.LongSupplier;
*/
public class MemoryRaftLog extends RaftLogBase {
static class EntryList {
- private final List<LogEntryProto> entries = new ArrayList<>();
+ private final List<ReferenceCountedObject<LogEntryProto>> entries = new
ArrayList<>();
LogEntryProto get(int i) {
- return i >= 0 && i < entries.size() ? entries.get(i) : null;
+ return i >= 0 && i < entries.size() ? entries.get(i).get() : null;
}
TermIndex getTermIndex(int i) {
@@ -62,18 +64,25 @@ public class MemoryRaftLog extends RaftLogBase {
void truncate(int index) {
if (entries.size() > index) {
- entries.subList(index, entries.size()).clear();
+ clear(index, entries.size());
}
}
void purge(int index) {
if (entries.size() > index) {
- entries.subList(0, index).clear();
+ clear(0, index);
}
}
- void add(LogEntryProto entry) {
- entries.add(entry);
+ void clear(int from, int to) {
+ List<ReferenceCountedObject<LogEntryProto>> subList =
entries.subList(from, to);
+ subList.forEach(ReferenceCountedObject::release);
+ subList.clear();
+ }
+
+ void add(ReferenceCountedObject<LogEntryProto> entryRef) {
+ entryRef.retain();
+ entries.add(entryRef);
}
}
@@ -170,7 +179,9 @@ public class MemoryRaftLog extends RaftLogBase {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
- entries.add(entry);
+ Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> wrap =
context != null ?
+ context::wrap : ReferenceCountedObject::wrap;
+ entries.add(wrap.apply(entry));
}
return CompletableFuture.completedFuture(entry.getIndex());
}
@@ -181,12 +192,14 @@ public class MemoryRaftLog extends RaftLogBase {
}
@Override
- public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto>
logEntryProtos) {
+ public List<CompletableFuture<Long>>
appendImpl(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
checkLogState();
+ final List<LogEntryProto> logEntryProtos = entriesRef.retain();
if (logEntryProtos == null || logEntryProtos.isEmpty()) {
+ entriesRef.release();
return Collections.emptyList();
}
- try(AutoCloseableLock writeLock = writeLock()) {
+ try (AutoCloseableLock writeLock = writeLock()) {
// Before truncating the entries, we first need to check if some
// entries are duplicated. If the leader sends entry 6, entry 7, then
// entry 6 again, without this check the follower may truncate entry 7
@@ -214,10 +227,12 @@ public class MemoryRaftLog extends RaftLogBase {
}
for (int i = index; i < logEntryProtos.size(); i++) {
LogEntryProto logEntryProto = logEntryProtos.get(i);
- this.entries.add(logEntryProto);
+ entries.add(entriesRef.delegate(logEntryProto));
futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
}
return futures;
+ } finally {
+ entriesRef.release();
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1cfb5933f..b4e958965 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -36,6 +36,7 @@ import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateI
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AwaitToRun;
@@ -454,12 +455,14 @@ public final class SegmentedRaftLog extends RaftLogBase {
}
@Override
- public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> entries)
{
+ protected List<CompletableFuture<Long>>
appendImpl(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
checkLogState();
+ final List<LogEntryProto> entries = entriesRef.retain();
if (entries == null || entries.isEmpty()) {
+ entriesRef.release();
return Collections.emptyList();
}
- try(AutoCloseableLock writeLock = writeLock()) {
+ try (AutoCloseableLock writeLock = writeLock()) {
final TruncateIndices ti =
cache.computeTruncateIndices(server::notifyTruncatedLogEntry, entries);
final long truncateIndex = ti.getTruncateIndex();
final int index = ti.getArrayIndex();
@@ -474,9 +477,15 @@ public final class SegmentedRaftLog extends RaftLogBase {
}
for (int i = index; i < entries.size(); i++) {
final LogEntryProto entry = entries.get(i);
- futures.add(appendEntry(entry, server.getTransactionContext(entry,
true)));
+ TransactionContextImpl transactionContext = (TransactionContextImpl)
server.getTransactionContext(entry, true);
+ if (transactionContext != null) {
+ transactionContext.setDelegatedRef(entriesRef);
+ }
+ futures.add(appendEntry(entry, transactionContext));
}
return futures;
+ } finally {
+ entriesRef.release();
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
index 5d8d090a3..503de3453 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
@@ -33,6 +33,7 @@ import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.Slf4jUtils;
import org.junit.Test;
import org.slf4j.event.Level;
@@ -56,11 +57,11 @@ public class MemoryRaftLogTest extends BaseTest {
List<LogEntryProto> entries1 = new ArrayList<>();
entries1.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
entries1.add(LogEntryProto.newBuilder().setIndex(1).setTerm(0).build());
- raftLog.append(entries1).forEach(CompletableFuture::join);
+
raftLog.append(ReferenceCountedObject.wrap(entries1)).forEach(CompletableFuture::join);
List<LogEntryProto> entries2 = new ArrayList<>();
entries2.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
- raftLog.append(entries2).forEach(CompletableFuture::join);
+
raftLog.append(ReferenceCountedObject.wrap(entries2)).forEach(CompletableFuture::join);
final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
assertEquals(2, termIndices.length);
@@ -84,11 +85,11 @@ public class MemoryRaftLogTest extends BaseTest {
List<LogEntryProto> entries1 = new ArrayList<>();
entries1.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
entries1.add(LogEntryProto.newBuilder().setIndex(1).setTerm(0).build());
- raftLog.append(entries1).forEach(CompletableFuture::join);
+
raftLog.append(ReferenceCountedObject.wrap(entries1)).forEach(CompletableFuture::join);
List<LogEntryProto> entries2 = new ArrayList<>();
entries2.add(LogEntryProto.newBuilder().setIndex(0).setTerm(2).build());
- raftLog.append(entries2).forEach(CompletableFuture::join);
+
raftLog.append(ReferenceCountedObject.wrap(entries2)).forEach(CompletableFuture::join);
final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
assertEquals(1, termIndices.length);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 996f7ef52..675df51d6 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -38,6 +38,7 @@ import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.junit.Assert;
import org.junit.Test;
@@ -174,7 +175,7 @@ public class TestCacheEviction extends BaseTest {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0,
maxCachedNum, 7, 0);
List<LogEntryProto> entries = generateEntries(slist);
- raftLog.append(entries).forEach(CompletableFuture::join);
+
raftLog.append(ReferenceCountedObject.wrap(entries)).forEach(CompletableFuture::join);
// check the current cached segment number: the last segment is still open
Assert.assertEquals(maxCachedNum - 1,
@@ -184,7 +185,7 @@ public class TestCacheEviction extends BaseTest {
Mockito.when(info.getFollowerNextIndices()).thenReturn(new long[]{21, 40,
40});
slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, maxCachedNum + 2,
7, 7 * maxCachedNum);
entries = generateEntries(slist);
- raftLog.append(entries).forEach(CompletableFuture::join);
+
raftLog.append(ReferenceCountedObject.wrap(entries)).forEach(CompletableFuture::join);
// check the cached segment number again. since the slowest follower is on
// index 21, the eviction should happen and evict 3 segments
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 38fa45e6f..3d5d5f87d 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -43,6 +43,7 @@ import
org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
@@ -551,7 +552,7 @@ public class TestSegmentedRaftLog extends BaseTest {
LOG.info("newEntries[0] = {}", newEntries.get(0));
final int last = newEntries.size() - 1;
LOG.info("newEntries[{}] = {}", last, newEntries.get(last));
- raftLog.append(newEntries).forEach(CompletableFuture::join);
+
raftLog.append(ReferenceCountedObject.wrap(newEntries)).forEach(CompletableFuture::join);
checkFailedEntries(entries, 650, retryCache);
checkEntries(raftLog, entries, 0, 650);
@@ -710,8 +711,9 @@ public class TestSegmentedRaftLog extends BaseTest {
long start = System.nanoTime();
for (int i = 0; i < entries.size(); i += 5) {
// call append API
- futures.add(raftLog.append(Arrays.asList(
- entries.get(i), entries.get(i + 1), entries.get(i + 2),
entries.get(i + 3), entries.get(i + 4))));
+ List<LogEntryProto> entries1 = Arrays.asList(
+ entries.get(i), entries.get(i + 1), entries.get(i + 2),
entries.get(i + 3), entries.get(i + 4));
+ futures.add(raftLog.append(ReferenceCountedObject.wrap(entries1)));
}
for (List<CompletableFuture<Long>> futureList: futures) {
futureList.forEach(CompletableFuture::join);