This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 82c31eaf3 RATIS-2009. ReferenceCount should work for all LogEntry
types. (#1021)
82c31eaf3 is described below
commit 82c31eaf39b24ffb31452ec58b3d96dcee9bab7e
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Jan 18 22:40:40 2024 -0800
RATIS-2009. ReferenceCount should work for all LogEntry types. (#1021)
---
.../ratis/grpc/server/GrpcClientProtocolService.java | 3 ++-
.../ratis/server/raftlog/RaftLogSequentialOps.java | 16 +++++++++++++---
.../org/apache/ratis/server/raftlog/RaftLogBase.java | 11 +++++++----
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 13 +++++++------
.../server/raftlog/segmented/SegmentedRaftLog.java | 19 +++++++++----------
.../src/test/java/org/apache/ratis/RaftTestUtil.java | 3 ++-
6 files changed, 40 insertions(+), 25 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index e8de4def0..67e75d606 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -64,7 +64,7 @@ class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase {
PendingOrderedRequest(ReferenceCountedObject<RaftClientRequest>
requestRef) {
this.requestRef = requestRef;
- this.request = requestRef != null ? requestRef.get() : null;
+ this.request = requestRef != null ? requestRef.retain() : null;
}
@Override
@@ -363,6 +363,7 @@ class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase {
final long seq = pending.getSeqNum();
processClientRequest(pending.getRequestRef(),
reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
+ pending.getRequestRef().release();
}
@Override
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index e4523cd93..5e274a695 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -124,11 +124,21 @@ interface RaftLogSequentialOps {
CompletableFuture<Long> appendEntry(LogEntryProto entry);
/**
- * Append asynchronously an entry.
- * Used by the leader.
+ * @deprecated use {@link #appendEntry(ReferenceCountedObject,
TransactionContext)}}.
*/
+ @Deprecated
default CompletableFuture<Long> appendEntry(LogEntryProto entry,
TransactionContext context) {
- return appendEntry(entry);
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Append asynchronously an entry.
+ * Used for scenarios that there is a ReferenceCountedObject context for
resource cleanup when the given entry
+ * is no longer used/referenced by this log.
+ */
+ default CompletableFuture<Long>
appendEntry(ReferenceCountedObject<LogEntryProto> entryRef,
+ TransactionContext context) {
+ return appendEntry(entryRef.get(), context);
}
/**
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 0b6fa15b6..b56b343bd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -187,7 +187,8 @@ public abstract class RaftLogBase implements RaftLog {
throw new StateMachineException(memberId, new RaftLogIOException(
"Log entry size " + entrySize + " exceeds the max buffer limit of
" + maxBufferSize));
}
- appendEntry(e, operation).whenComplete((returned, t) -> {
+
+ appendEntry(operation.wrap(e), operation).whenComplete((returned, t) -> {
if (t != null) {
LOG.error(name + ": Failed to write log entry " +
LogProtoUtils.toLogEntryString(e), t);
} else if (returned != nextIndex) {
@@ -345,15 +346,17 @@ public abstract class RaftLogBase implements RaftLog {
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
- return appendEntry(entry, null);
+ return appendEntry(ReferenceCountedObject.wrap(entry), null);
}
@Override
- public final CompletableFuture<Long> appendEntry(LogEntryProto entry,
TransactionContext context) {
+ public final CompletableFuture<Long>
appendEntry(ReferenceCountedObject<LogEntryProto> entry,
+ TransactionContext context) {
return runner.runSequentially(() -> appendEntryImpl(entry, context));
}
- protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto
entry, TransactionContext context);
+ protected abstract CompletableFuture<Long>
appendEntryImpl(ReferenceCountedObject<LogEntryProto> entry,
+ TransactionContext context);
@Override
public final List<CompletableFuture<Long>>
append(ReferenceCountedObject<List<LogEntryProto>> entries) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 7435bb178..fc7973aab 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -36,7 +36,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import java.util.function.LongSupplier;
/**
@@ -175,13 +174,15 @@ public class MemoryRaftLog extends RaftLogBase {
}
@Override
- protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry,
TransactionContext context) {
+ protected CompletableFuture<Long>
appendEntryImpl(ReferenceCountedObject<LogEntryProto> entryRef,
+ TransactionContext context) {
checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
+ LogEntryProto entry = entryRef.retain();
+ try (AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
- Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> wrap =
context != null ?
- context::wrap : ReferenceCountedObject::wrap;
- entries.add(wrap.apply(entry));
+ entries.add(entryRef);
+ } finally {
+ entryRef.release();
}
return CompletableFuture.completedFuture(entry.getIndex());
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index b4e958965..92b2d4233 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -55,7 +55,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.function.LongSupplier;
import org.apache.ratis.util.UncheckedAutoCloseable;
@@ -389,8 +388,10 @@ public final class SegmentedRaftLog extends RaftLogBase {
}
@Override
- protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry,
TransactionContext context) {
+ protected CompletableFuture<Long>
appendEntryImpl(ReferenceCountedObject<LogEntryProto> entryRef,
+ TransactionContext context) {
checkLogState();
+ LogEntryProto entry = entryRef.retain();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getName(),
LogProtoUtils.toLogEntryString(entry));
}
@@ -426,19 +427,20 @@ public final class SegmentedRaftLog extends RaftLogBase {
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
final Task write = fileLogWorker.writeLogEntry(entry,
removedStateMachineData, context);
- final Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>>
wrap = context != null ?
- context::wrap : ReferenceCountedObject::wrap;
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
- cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
wrap.apply(removedStateMachineData));
+ cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
+ entryRef.delegate(removedStateMachineData));
} else {
-
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE,
wrap.apply(entry)
+
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE,
entryRef
);
}
return write.getFuture().whenComplete((clientReply, exception) ->
appendEntryTimerContext.stop());
} catch (Exception e) {
LOG.error("{}: Failed to append {}", getName(),
LogProtoUtils.toLogEntryString(entry), e);
throw e;
+ } finally {
+ entryRef.release();
}
}
@@ -478,10 +480,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
for (int i = index; i < entries.size(); i++) {
final LogEntryProto entry = entries.get(i);
TransactionContextImpl transactionContext = (TransactionContextImpl)
server.getTransactionContext(entry, true);
- if (transactionContext != null) {
- transactionContext.setDelegatedRef(entriesRef);
- }
- futures.add(appendEntry(entry, transactionContext));
+ futures.add(appendEntry(entriesRef.delegate(entry),
transactionContext));
}
return futures;
} finally {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 1d45c6821..fa4188716 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,6 +31,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -491,7 +492,7 @@ public interface RaftTestUtil {
final long lastIndex = expected.getNextIndex() - 1;
Assert.assertEquals(expected.getLastEntryTermIndex().getIndex(),
lastIndex);
for(long i = 0; i < lastIndex; i++) {
- Assert.assertEquals(expected.get(i), computed.get(i));
+ Assert.assertEquals("Checking " + TermIndex.valueOf(expected.get(i)),
expected.get(i), computed.get(i));
}
}