This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c73a3eb8c RATIS-1979. Allow StateMachine.read to return a 
ReferentCountedObject (#1062)
c73a3eb8c is described below

commit c73a3eb8c026133e0e9b8ada1a2aeb467812cb37
Author: Duong Nguyen <[email protected]>
AuthorDate: Tue Apr 2 07:09:24 2024 -0700

    RATIS-1979. Allow StateMachine.read to return a ReferentCountedObject 
(#1062)
---
 .../apache/ratis/examples/filestore/FileInfo.java  |  3 +-
 .../apache/ratis/statemachine/StateMachine.java    | 22 ++++++++
 .../apache/ratis/server/raftlog/LogProtoUtils.java | 10 +++-
 .../apache/ratis/server/raftlog/RaftLogBase.java   | 62 +++++++++++++++++++---
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |  2 +-
 .../ratis/server/raftlog/segmented/LogSegment.java |  2 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java | 10 ++--
 7 files changed, 95 insertions(+), 16 deletions(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index c7d8cb7cd..bba001002 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -19,6 +19,7 @@ package org.apache.ratis.examples.filestore;
 
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
@@ -84,7 +85,7 @@ abstract class FileInfo {
       final ByteBuffer buffer = 
ByteBuffer.allocateDirect(FileStoreCommon.getChunkSize(length));
       in.position(offset).read(buffer);
       buffer.flip();
-      return ByteString.copyFrom(buffer);
+      return UnsafeByteOperations.unsafeWrap(buffer);
     }
   }
 
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 915b70bb8..b68c724ab 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -88,6 +88,28 @@ public interface StateMachine extends Closeable {
       return read(entry);
     }
 
+    /**
+     * Read asynchronously the state machine data from this state machine.
+     * StateMachines implement this method when the read result contains 
retained resources that should be released
+     * after use.
+     *
+     * @return a future for the read task. The result of the future is a 
{@link ReferenceCountedObject} wrapping the
+     * read result. Client code of this method must call  {@link 
ReferenceCountedObject#release()} after
+     * use.
+     */
+    default CompletableFuture<ReferenceCountedObject<ByteString>> 
retainRead(LogEntryProto entry,
+        TransactionContext context) {
+      return read(entry, context).thenApply(r -> {
+        if (r == null) {
+          return null;
+        }
+        ReferenceCountedObject<ByteString> ref = 
ReferenceCountedObject.wrap(r);
+        ref.retain();
+        return ref;
+
+      });
+    }
+
     /**
      * Write asynchronously the state machine data in the given log entry to 
this state machine.
      *
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index b177f0e14..59da4c3fd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -137,8 +137,9 @@ public final class LogProtoUtils {
   }
 
   private static LogEntryProto 
replaceStateMachineDataWithSerializedSize(LogEntryProto entry) {
-    return replaceStateMachineEntry(entry,
+    LogEntryProto replaced = replaceStateMachineEntry(entry,
         
StateMachineEntryProto.newBuilder().setLogEntryProtoSerializedSize(entry.getSerializedSize()));
+    return copy(replaced);
   }
 
   private static LogEntryProto replaceStateMachineEntry(LogEntryProto proto, 
StateMachineEntryProto.Builder newEntry) {
@@ -160,6 +161,13 @@ public final class LogProtoUtils {
     return replaceStateMachineEntry(entry, 
StateMachineEntryProto.newBuilder().setStateMachineData(stateMachineData));
   }
 
+  public static boolean hasStateMachineData(LogEntryProto entry) {
+    return getStateMachineEntry(entry)
+        .map(StateMachineEntryProto::getStateMachineData)
+        .map(data -> !data.isEmpty())
+        .orElse(false);
+  }
+
   public static boolean isStateMachineDataEmpty(LogEntryProto entry) {
     return getStateMachineEntry(entry)
         .map(StateMachineEntryProto::getStateMachineData)
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 284776d10..9e079564f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.raftlog;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -410,8 +411,43 @@ public abstract class RaftLogBase implements RaftLog {
     return name;
   }
 
-  protected EntryWithData newEntryWithData(LogEntryProto logEntry, 
CompletableFuture<ByteString> future) {
-    return new EntryWithDataImpl(logEntry, future);
+  protected ReferenceCountedObject<EntryWithData> 
newEntryWithData(ReferenceCountedObject<LogEntryProto> retained) {
+    return retained.delegate(new EntryWithDataImpl(retained.get(), null));
+  }
+
+  protected ReferenceCountedObject<EntryWithData> 
newEntryWithData(ReferenceCountedObject<LogEntryProto> retained,
+      CompletableFuture<ReferenceCountedObject<ByteString>> 
stateMachineDataFuture) {
+    final EntryWithDataImpl impl = new EntryWithDataImpl(retained.get(), 
stateMachineDataFuture);
+    return new ReferenceCountedObject<EntryWithData>() {
+      private CompletableFuture<ReferenceCountedObject<ByteString>> future
+          = Objects.requireNonNull(stateMachineDataFuture, 
"stateMachineDataFuture == null");
+
+      @Override
+      public EntryWithData get() {
+        return impl;
+      }
+
+      synchronized void updateFuture(Consumer<ReferenceCountedObject<?>> 
action) {
+        future = future.whenComplete((ref, e) -> {
+          if (ref != null) {
+            action.accept(ref);
+          }
+        });
+      }
+
+      @Override
+      public EntryWithData retain() {
+        retained.retain();
+        updateFuture(ReferenceCountedObject::retain);
+        return impl;
+      }
+
+      @Override
+      public boolean release() {
+        updateFuture(ReferenceCountedObject::release);
+        return retained.release();
+      }
+    };
   }
 
   /**
@@ -419,14 +455,14 @@ public abstract class RaftLogBase implements RaftLog {
    */
   class EntryWithDataImpl implements EntryWithData {
     private final LogEntryProto logEntry;
-    private final CompletableFuture<ByteString> future;
+    private final CompletableFuture<ReferenceCountedObject<ByteString>> future;
 
-    EntryWithDataImpl(LogEntryProto logEntry, CompletableFuture<ByteString> 
future) {
+    EntryWithDataImpl(LogEntryProto logEntry, 
CompletableFuture<ReferenceCountedObject<ByteString>> future) {
       this.logEntry = logEntry;
       this.future = future == null? null: 
future.thenApply(this::checkStateMachineData);
     }
 
-    private ByteString checkStateMachineData(ByteString data) {
+    private ReferenceCountedObject<ByteString> 
checkStateMachineData(ReferenceCountedObject<ByteString> data) {
       if (data == null) {
         throw new IllegalStateException("State machine data is null for log 
entry " + this);
       }
@@ -450,18 +486,21 @@ public abstract class RaftLogBase implements RaftLog {
       }
 
       final LogEntryProto entryProto;
+      ReferenceCountedObject<ByteString> data;
       try {
-        entryProto = future.thenApply(data -> 
LogProtoUtils.addStateMachineData(data, logEntry))
-            .get(timeout.getDuration(), timeout.getUnit());
+        data = future.get(timeout.getDuration(), timeout.getUnit());
+        entryProto = LogProtoUtils.addStateMachineData(data.get(), logEntry);
       } catch (TimeoutException t) {
         if (timeout.compareTo(stateMachineDataReadTimeout) > 0) {
           getRaftLogMetrics().onStateMachineDataReadTimeout();
         }
+        discardData();
         throw t;
       } catch (Exception e) {
         if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt();
         }
+        discardData();
         final String err = getName() + ": Failed readStateMachineData for " + 
this;
         LOG.error(err, e);
         throw new RaftLogIOException(err, 
JavaUtils.unwrapCompletionException(e));
@@ -471,11 +510,20 @@ public abstract class RaftLogBase implements RaftLog {
       if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
         final String err = getName() + ": State machine data not set for " + 
this;
         LOG.error(err);
+        data.release();
         throw new RaftLogIOException(err);
       }
       return entryProto;
     }
 
+    private void discardData() {
+      future.whenComplete((r, ex) -> {
+        if (r != null) {
+          r.release();
+        }
+      });
+    }
+
     @Override
     public String toString() {
       return toLogEntryString(logEntry);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 55036fac5..2aac6c1b1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -142,7 +142,7 @@ public class MemoryRaftLog extends RaftLogBase {
   @Override
   public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) 
{
     final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
-    return ref.delegate(newEntryWithData(ref.get(), null));
+    return newEntryWithData(ref);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 2fcd7914e..2542e9998 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -73,7 +73,7 @@ public final class LogSegment {
       case CHECK_SEGMENT_FILE_FULL:
       case LOAD_SEGMENT_FILE:
       case WRITE_CACHE_WITH_STATE_MACHINE_CACHE:
-        Preconditions.assertTrue(entry == 
LogProtoUtils.removeStateMachineData(entry),
+        Preconditions.assertTrue(!LogProtoUtils.hasStateMachineData(entry),
             () -> "Unexpected LogEntryProto with StateMachine data: op=" + op 
+ ", entry=" + entry);
         break;
       case WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE:
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index b7dd32689..a0f7c1e72 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -331,18 +331,18 @@ public final class SegmentedRaftLog extends RaftLogBase {
 
     final LogEntryProto entry = entryRef.get();
     if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
-      return entryRef.delegate(newEntryWithData(entry, null));
+      return newEntryWithData(entryRef);
     }
 
     try {
-      CompletableFuture<ByteString> future = null;
+      CompletableFuture<ReferenceCountedObject<ByteString>> future = null;
       if (stateMachine != null) {
-        future = stateMachine.data().read(entry, 
server.getTransactionContext(entry, false)).exceptionally(ex -> {
+        future = stateMachine.data().retainRead(entry, 
server.getTransactionContext(entry, false)).exceptionally(ex -> {
           stateMachine.event().notifyLogFailed(ex, entry);
           throw new CompletionException("Failed to read state machine data for 
log entry " + entry, ex);
         });
       }
-      return entryRef.delegate(newEntryWithData(entry, future));
+      return future != null? newEntryWithData(entryRef, future): 
newEntryWithData(entryRef);
     } catch (Exception e) {
       final String err = getName() + ": Failed readStateMachineData for " +
           LogProtoUtils.toLogEntryString(entry);
@@ -459,7 +459,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
       if (stateMachineCachingEnabled) {
         // The stateMachineData will be cached inside the StateMachine itself.
         cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE,
-            entryRef.delegate(removedStateMachineData));
+            ReferenceCountedObject.wrap(removedStateMachineData));
       } else {
         
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
entryRef
         );

Reply via email to