This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 82c31eaf3 RATIS-2009. ReferenceCount should work for all LogEntry 
types. (#1021)
82c31eaf3 is described below

commit 82c31eaf39b24ffb31452ec58b3d96dcee9bab7e
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Jan 18 22:40:40 2024 -0800

    RATIS-2009. ReferenceCount should work for all LogEntry types. (#1021)
---
 .../ratis/grpc/server/GrpcClientProtocolService.java  |  3 ++-
 .../ratis/server/raftlog/RaftLogSequentialOps.java    | 16 +++++++++++++---
 .../org/apache/ratis/server/raftlog/RaftLogBase.java  | 11 +++++++----
 .../ratis/server/raftlog/memory/MemoryRaftLog.java    | 13 +++++++------
 .../server/raftlog/segmented/SegmentedRaftLog.java    | 19 +++++++++----------
 .../src/test/java/org/apache/ratis/RaftTestUtil.java  |  3 ++-
 6 files changed, 40 insertions(+), 25 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index e8de4def0..67e75d606 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -64,7 +64,7 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
 
     PendingOrderedRequest(ReferenceCountedObject<RaftClientRequest> 
requestRef) {
       this.requestRef = requestRef;
-      this.request = requestRef != null ? requestRef.get() : null;
+      this.request = requestRef != null ? requestRef.retain() : null;
     }
 
     @Override
@@ -363,6 +363,7 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
       final long seq = pending.getSeqNum();
       processClientRequest(pending.getRequestRef(),
           reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
+      pending.getRequestRef().release();
     }
 
     @Override
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index e4523cd93..5e274a695 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -124,11 +124,21 @@ interface RaftLogSequentialOps {
   CompletableFuture<Long> appendEntry(LogEntryProto entry);
 
   /**
-   * Append asynchronously an entry.
-   * Used by the leader.
+   * @deprecated use {@link #appendEntry(ReferenceCountedObject, 
TransactionContext)}}.
    */
+  @Deprecated
   default CompletableFuture<Long> appendEntry(LogEntryProto entry, 
TransactionContext context) {
-    return appendEntry(entry);
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Append asynchronously an entry.
+   * Used for scenarios that there is a ReferenceCountedObject context for 
resource cleanup when the given entry
+   * is no longer used/referenced by this log.
+   */
+  default CompletableFuture<Long> 
appendEntry(ReferenceCountedObject<LogEntryProto> entryRef,
+      TransactionContext context) {
+    return appendEntry(entryRef.get(), context);
   }
 
   /**
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 0b6fa15b6..b56b343bd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -187,7 +187,8 @@ public abstract class RaftLogBase implements RaftLog {
         throw new StateMachineException(memberId, new RaftLogIOException(
             "Log entry size " + entrySize + " exceeds the max buffer limit of 
" + maxBufferSize));
       }
-      appendEntry(e, operation).whenComplete((returned, t) -> {
+
+      appendEntry(operation.wrap(e), operation).whenComplete((returned, t) -> {
         if (t != null) {
           LOG.error(name + ": Failed to write log entry " + 
LogProtoUtils.toLogEntryString(e), t);
         } else if (returned != nextIndex) {
@@ -345,15 +346,17 @@ public abstract class RaftLogBase implements RaftLog {
 
   @Override
   public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
-    return appendEntry(entry, null);
+    return appendEntry(ReferenceCountedObject.wrap(entry), null);
   }
 
   @Override
-  public final CompletableFuture<Long> appendEntry(LogEntryProto entry, 
TransactionContext context) {
+  public final CompletableFuture<Long> 
appendEntry(ReferenceCountedObject<LogEntryProto> entry,
+      TransactionContext context) {
     return runner.runSequentially(() -> appendEntryImpl(entry, context));
   }
 
-  protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto 
entry, TransactionContext context);
+  protected abstract CompletableFuture<Long> 
appendEntryImpl(ReferenceCountedObject<LogEntryProto> entry,
+      TransactionContext context);
 
   @Override
   public final List<CompletableFuture<Long>> 
append(ReferenceCountedObject<List<LogEntryProto>> entries) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 7435bb178..fc7973aab 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -36,7 +36,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.function.LongSupplier;
 
 /**
@@ -175,13 +174,15 @@ public class MemoryRaftLog extends RaftLogBase {
   }
 
   @Override
-  protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry, 
TransactionContext context) {
+  protected CompletableFuture<Long> 
appendEntryImpl(ReferenceCountedObject<LogEntryProto> entryRef,
+      TransactionContext context) {
     checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
+    LogEntryProto entry = entryRef.retain();
+    try (AutoCloseableLock writeLock = writeLock()) {
       validateLogEntry(entry);
-      Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> wrap = 
context != null ?
-          context::wrap : ReferenceCountedObject::wrap;
-      entries.add(wrap.apply(entry));
+      entries.add(entryRef);
+    } finally {
+      entryRef.release();
     }
     return CompletableFuture.completedFuture(entry.getIndex());
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index b4e958965..92b2d4233 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -55,7 +55,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.LongSupplier;
 
 import org.apache.ratis.util.UncheckedAutoCloseable;
@@ -389,8 +388,10 @@ public final class SegmentedRaftLog extends RaftLogBase {
   }
 
   @Override
-  protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry, 
TransactionContext context) {
+  protected CompletableFuture<Long> 
appendEntryImpl(ReferenceCountedObject<LogEntryProto> entryRef,
+      TransactionContext context) {
     checkLogState();
+    LogEntryProto entry = entryRef.retain();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", getName(), 
LogProtoUtils.toLogEntryString(entry));
     }
@@ -426,19 +427,20 @@ public final class SegmentedRaftLog extends RaftLogBase {
       // to statemachine first and then to the cache. Not following the order
       // will leave a spurious entry in the cache.
       final Task write = fileLogWorker.writeLogEntry(entry, 
removedStateMachineData, context);
-      final Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> 
wrap = context != null ?
-          context::wrap : ReferenceCountedObject::wrap;
       if (stateMachineCachingEnabled) {
         // The stateMachineData will be cached inside the StateMachine itself.
-        cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE, 
wrap.apply(removedStateMachineData));
+        cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
+            entryRef.delegate(removedStateMachineData));
       } else {
-        
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
wrap.apply(entry)
+        
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
entryRef
         );
       }
       return write.getFuture().whenComplete((clientReply, exception) -> 
appendEntryTimerContext.stop());
     } catch (Exception e) {
       LOG.error("{}: Failed to append {}", getName(), 
LogProtoUtils.toLogEntryString(entry), e);
       throw e;
+    } finally {
+      entryRef.release();
     }
   }
 
@@ -478,10 +480,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
       for (int i = index; i < entries.size(); i++) {
         final LogEntryProto entry = entries.get(i);
         TransactionContextImpl transactionContext = (TransactionContextImpl) 
server.getTransactionContext(entry, true);
-        if (transactionContext != null) {
-          transactionContext.setDelegatedRef(entriesRef);
-        }
-        futures.add(appendEntry(entry, transactionContext));
+        futures.add(appendEntry(entriesRef.delegate(entry), 
transactionContext));
       }
       return futures;
     } finally {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 1d45c6821..fa4188716 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,6 +31,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -491,7 +492,7 @@ public interface RaftTestUtil {
     final long lastIndex = expected.getNextIndex() - 1;
     Assert.assertEquals(expected.getLastEntryTermIndex().getIndex(), 
lastIndex);
     for(long i = 0; i < lastIndex; i++) {
-      Assert.assertEquals(expected.get(i), computed.get(i));
+      Assert.assertEquals("Checking " + TermIndex.valueOf(expected.get(i)), 
expected.get(i), computed.get(i));
     }
   }
 

Reply via email to