This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 775b286c4 RATIS-1997. Refactor StateMachine interface to use 
ReferenceCountedObject (#1036)
775b286c4 is described below

commit 775b286c4540057e5f81419806d9e7737e5f568e
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Feb 1 15:39:48 2024 -0800

    RATIS-1997. Refactor StateMachine interface to use ReferenceCountedObject 
(#1036)
---
 .../examples/filestore/FileStoreStateMachine.java  |  9 +++++---
 .../apache/ratis/statemachine/StateMachine.java    | 27 ++++++++++++++++++++++
 .../server/raftlog/segmented/SegmentedRaftLog.java |  2 +-
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 24 +++++++++++--------
 .../impl/SimpleStateMachine4Testing.java           |  4 +++-
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  3 ++-
 6 files changed, 53 insertions(+), 16 deletions(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 858e300ec..0ee7a60ac 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -42,6 +42,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
 
 import java.io.IOException;
 import java.nio.file.Path;
@@ -123,7 +124,8 @@ public class FileStoreStateMachine extends BaseStateMachine 
{
   }
 
   @Override
-  public CompletableFuture<Integer> write(LogEntryProto entry, 
TransactionContext context) {
+  public CompletableFuture<Integer> 
write(ReferenceCountedObject<LogEntryProto> entryRef, TransactionContext 
context) {
+    LogEntryProto entry = entryRef.retain();
     final FileStoreRequestProto proto = getProto(context, entry);
     if (proto.getRequestCase() != 
FileStoreRequestProto.RequestCase.WRITEHEADER) {
       return null;
@@ -132,9 +134,10 @@ public class FileStoreStateMachine extends 
BaseStateMachine {
     final WriteRequestHeaderProto h = proto.getWriteHeader();
     final CompletableFuture<Integer> f = files.write(entry.getIndex(),
         h.getPath().toStringUtf8(), h.getClose(),  h.getSync(), h.getOffset(),
-        
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());
+        
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData()
+    ).whenComplete((r, e) -> entryRef.release());
     // sync only if closing the file
-    return h.getClose()? f: null;
+    return h.getClose() ? f: null;
   }
 
   static FileStoreRequestProto getProto(TransactionContext context, 
LogEntryProto entry) {
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index b1fc5adda..915b70bb8 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -92,7 +92,9 @@ public interface StateMachine extends Closeable {
      * Write asynchronously the state machine data in the given log entry to 
this state machine.
      *
      * @return a future for the write task
+     * @deprecated Applications should implement {@link 
#write(ReferenceCountedObject, TransactionContext)} instead.
      */
+    @Deprecated
     default CompletableFuture<?> write(LogEntryProto entry) {
       return CompletableFuture.completedFuture(null);
     }
@@ -101,11 +103,36 @@ public interface StateMachine extends Closeable {
      * Write asynchronously the state machine data in the given log entry to 
this state machine.
      *
      * @return a future for the write task
+     * @deprecated Applications should implement {@link 
#write(ReferenceCountedObject, TransactionContext)} instead.
      */
+    @Deprecated
     default CompletableFuture<?> write(LogEntryProto entry, TransactionContext 
context) {
       return write(entry);
     }
 
+    /**
+     * Write asynchronously the state machine data in the given log entry to 
this state machine.
+     *
+     * @param entryRef Reference to a log entry.
+     *                 Implementations of this method may call {@link 
ReferenceCountedObject#get()}
+     *                 to access the log entry before this method returns.
+     *                 If the log entry is needed after this method returns,
+     *                 e.g. for asynchronous computation or caching,
+     *                 the implementation must invoke {@link 
ReferenceCountedObject#retain()}
+     *                 and {@link ReferenceCountedObject#release()}.
+     * @return a future for the write task
+     */
+    default CompletableFuture<?> write(ReferenceCountedObject<LogEntryProto> 
entryRef, TransactionContext context) {
+      final LogEntryProto entry = entryRef.get();
+      try {
+        final LogEntryProto copy = 
LogEntryProto.parseFrom(entry.toByteString());
+        return write(copy, context);
+      } catch (InvalidProtocolBufferException e) {
+        return JavaUtils.completeExceptionally(new IllegalStateException(
+            "Failed to copy log entry " + TermIndex.valueOf(entry), e));
+      }
+    }
+
     /**
      * Create asynchronously a {@link DataStream} to stream state machine data.
      * The state machine may use the first message (i.e. request.getMessage()) 
as the header to create the stream.
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 4e057c07b..baac0c6c7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -428,7 +428,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
       // If the entry has state machine data, then the entry should be inserted
       // to statemachine first and then to the cache. Not following the order
       // will leave a spurious entry in the cache.
-      final Task write = fileLogWorker.writeLogEntry(entry, 
removedStateMachineData, context);
+      final Task write = fileLogWorker.writeLogEntry(entryRef, 
removedStateMachineData, context);
       if (stateMachineCachingEnabled) {
         // The stateMachineData will be cached inside the StateMachine itself.
         cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 0d1ea763b..02506079f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -438,7 +438,8 @@ class SegmentedRaftLogWorker {
     addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
   }
 
-  Task writeLogEntry(LogEntryProto entry, LogEntryProto 
removedStateMachineData, TransactionContext context) {
+  Task writeLogEntry(ReferenceCountedObject<LogEntryProto> entry,
+      LogEntryProto removedStateMachineData, TransactionContext context) {
     return addIOTask(new WriteLog(entry, removedStateMachineData, context));
   }
 
@@ -486,25 +487,28 @@ class SegmentedRaftLogWorker {
     private final CompletableFuture<?> stateMachineFuture;
     private final CompletableFuture<Long> combined;
 
-    WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData, 
TransactionContext context) {
+    WriteLog(ReferenceCountedObject<LogEntryProto> entryRef, LogEntryProto 
removedStateMachineData,
+        TransactionContext context) {
+      LogEntryProto origEntry = entryRef.get();
       this.entry = removedStateMachineData;
-      if (this.entry == entry) {
-        final StateMachineLogEntryProto proto = 
entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
+      if (this.entry == origEntry) {
+        final StateMachineLogEntryProto proto = 
origEntry.hasStateMachineLogEntry() ?
+            origEntry.getStateMachineLogEntry(): null;
         if (stateMachine != null && proto != null && proto.getType() == 
StateMachineLogEntryProto.Type.DATASTREAM) {
           final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(proto);
           final CompletableFuture<DataStream> removed = 
server.getDataStreamMap().remove(invocationId);
-          this.stateMachineFuture = removed == null? 
stateMachine.data().link(null, entry)
-              : removed.thenApply(stream -> stateMachine.data().link(stream, 
entry));
+          this.stateMachineFuture = removed == null? 
stateMachine.data().link(null, origEntry)
+              : removed.thenApply(stream -> stateMachine.data().link(stream, 
origEntry));
         } else {
           this.stateMachineFuture = null;
         }
       } else {
         try {
-          // this.entry != entry iff the entry has state machine data
-          this.stateMachineFuture = stateMachine.data().write(entry, context);
+          // this.entry != origEntry if it has state machine data
+          this.stateMachineFuture = stateMachine.data().write(entryRef, 
context);
         } catch (Exception e) {
-          LOG.error(name + ": writeStateMachineData failed for index " + 
entry.getIndex()
-              + ", entry=" + LogProtoUtils.toLogEntryString(entry, 
stateMachine::toStateMachineLogEntryString), e);
+          LOG.error(name + ": writeStateMachineData failed for index " + 
origEntry.getIndex()
+              + ", entry=" + LogProtoUtils.toLogEntryString(origEntry, 
stateMachine::toStateMachineLogEntryString), e);
           throw e;
         }
       }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 7c40ec251..17d5a607a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -48,6 +48,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.MD5FileUtil;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -367,7 +368,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   }
 
   @Override
-  public CompletableFuture<Void> write(LogEntryProto entry) {
+  public CompletableFuture<Void> write(ReferenceCountedObject<LogEntryProto> 
entry, TransactionContext context) {
+    Preconditions.assertTrue(entry.get() != null);
     return blocking.getFuture(Blocking.Type.WRITE_STATE_MACHINE_DATA);
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 3d5d5f87d..38341e025 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -39,6 +39,7 @@ import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -634,7 +635,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     final LogEntryProto entry = prepareLogEntry(0, 0, null, true);
     final StateMachine sm = new BaseStateMachine() {
       @Override
-      public CompletableFuture<Void> write(LogEntryProto entry) {
+      public CompletableFuture<Void> 
write(ReferenceCountedObject<LogEntryProto> entry, TransactionContext context) {
         getLifeCycle().transition(LifeCycle.State.STARTING);
         getLifeCycle().transition(LifeCycle.State.RUNNING);
 

Reply via email to