This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 53d35c01e RATIS-2020. Refactor TransactionContext to supply 
LogEntryProto via a ReferenceCountedObject (#1042)
53d35c01e is described below

commit 53d35c01e7bffeb596585cb981a2a89f685e9bef
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Feb 21 10:27:25 2024 -0800

    RATIS-2020. Refactor TransactionContext to supply LogEntryProto via a 
ReferenceCountedObject (#1042)
---
 .../arithmetic/ArithmeticStateMachine.java         |  2 +-
 .../counter/server/CounterStateMachine.java        |  2 +-
 .../examples/filestore/FileStoreStateMachine.java  |  2 +-
 .../ratis/statemachine/TransactionContext.java     | 34 ++++++++++++++++++++--
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../apache/ratis/server/impl/PendingRequest.java   |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  6 ++++
 .../org/apache/ratis/server/impl/ServerState.java  |  2 +-
 .../apache/ratis/server/raftlog/LogProtoUtils.java | 18 ++++++++++++
 .../ratis/statemachine/impl/BaseStateMachine.java  |  6 ++--
 .../statemachine/impl/TransactionContextImpl.java  | 32 +++++++++++++++++---
 .../org/apache/ratis/ReadOnlyRequestTests.java     |  8 +++--
 .../server/impl/StateMachineShutdownTests.java     |  2 +-
 .../impl/SimpleStateMachine4Testing.java           | 31 ++++++++------------
 .../ratis/datastream/DataStreamTestUtils.java      |  2 +-
 .../ratis/statemachine/TestStateMachine.java       |  2 +-
 16 files changed, 113 insertions(+), 40 deletions(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
index 28e3fb1c7..e8b142f5d 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
@@ -164,7 +164,7 @@ public class ArithmeticStateMachine extends 
BaseStateMachine {
 
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    final LogEntryProto entry = trx.getLogEntry();
+    final LogEntryProto entry = trx.getLogEntryUnsafe();
     final AssignmentMessage assignment = new 
AssignmentMessage(entry.getStateMachineLogEntry().getLogData());
 
     final long index = entry.getIndex();
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index b88a763e0..47880af55 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -247,7 +247,7 @@ public class CounterStateMachine extends BaseStateMachine {
    */
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    final LogEntryProto entry = trx.getLogEntry();
+    final LogEntryProto entry = trx.getLogEntryUnsafe();
     //increment the counter and update term-index
     final TermIndex termIndex = TermIndex.valueOf(entry);
     final int incremented = incrementCounter(termIndex);
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 0ee7a60ac..f870cbacf 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -227,7 +227,7 @@ public class FileStoreStateMachine extends BaseStateMachine 
{
 
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    final LogEntryProto entry = trx.getLogEntry();
+    final LogEntryProto entry = trx.getLogEntryUnsafe();
 
     final long index = entry.getIndex();
     updateLastAppliedTermIndex(entry.getTerm(), index);
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index e0190747f..2ec87e37a 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -28,6 +28,7 @@ import org.apache.ratis.util.ReflectionUtils;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * Context for a transaction.
@@ -94,11 +95,40 @@ public interface TransactionContext {
   LogEntryProto initLogEntry(long term, long index);
 
   /**
-   * Returns the committed log entry
-   * @return the committed log entry
+   * @return a copy of the committed log entry if it exists; otherwise, 
returns null
+   *
+   * @deprecated Use {@link #getLogEntryRef()} or {@link #getLogEntryUnsafe()} 
to avoid copying.
    */
+  @Deprecated
   LogEntryProto getLogEntry();
 
+  /**
+   * @return the committed log entry if it exists; otherwise, returns null.
+   *         The returned value is safe to use only before {@link 
StateMachine#applyTransaction} returns.
+   *         Once {@link StateMachine#applyTransaction} has returned, it is 
unsafe to use the log entry
+   *         since the underlying buffers can possiby be released.
+   */
+  default LogEntryProto getLogEntryUnsafe() {
+    return getLogEntryRef().get();
+  }
+
+  /**
+   * Get a {@link ReferenceCountedObject} to the committed log entry.
+   *
+   * It is safe to access the log entry by calling {@link 
ReferenceCountedObject#get()}
+   * (without {@link ReferenceCountedObject#retain()})
+   * inside the scope of {@link StateMachine#applyTransaction}.
+   *
+   * If the log entry is needed after {@link StateMachine#applyTransaction} 
returns,
+   * e.g. for asynchronous computation or caching,
+   * the caller must invoke {@link ReferenceCountedObject#retain()} and {@link 
ReferenceCountedObject#release()}.
+   *
+   * @return a reference to the committed log entry if it exists; otherwise, 
returns null.
+   */
+  default ReferenceCountedObject<LogEntryProto> getLogEntryRef() {
+    return 
Optional.ofNullable(getLogEntryUnsafe()).map(this::wrap).orElse(null);
+  }
+
   /** Wrap the given log entry as a {@link ReferenceCountedObject} for 
retaining it for later use. */
   default ReferenceCountedObject<LogEntryProto> wrap(LogEntryProto entry) {
     Preconditions.assertSame(getLogEntry().getTerm(), entry.getTerm(), 
"entry.term");
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 4f313a437..e8a4adc9c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -527,7 +527,7 @@ class LeaderStateImpl implements LeaderState {
   PendingRequest addPendingRequest(PendingRequests.Permit permit, 
RaftClientRequest request, TransactionContext entry) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
-          LogProtoUtils.toLogEntryString(entry.getLogEntry()));
+          LogProtoUtils.toLogEntryString(entry.getLogEntryUnsafe()));
     }
     return pendingRequests.add(permit, request, entry);
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 06a3a7b3c..4271d7616 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -38,7 +38,7 @@ class PendingRequest {
   private final CompletableFuture<RaftClientReply> futureToReturn;
 
   PendingRequest(RaftClientRequest request, TransactionContext entry) {
-    this.termIndex = entry == null? null: 
TermIndex.valueOf(entry.getLogEntry());
+    this.termIndex = entry == null? null: 
TermIndex.valueOf(entry.getLogEntryUnsafe());
     this.request = request;
     this.entry = entry;
     if (request.is(TypeCase.FORWARD)) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 133cfebdc..0885fb807 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1802,6 +1802,10 @@ class RaftServerImpl implements RaftServer.Division,
       final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(next.getStateMachineLogEntry());
       writeIndexCache.add(invocationId.getClientId(), 
((TransactionContextImpl) trx).getLogIndexFuture());
 
+      // TODO: RaftLog to provide the log entry as a ReferenceCountedObject as 
per RATIS-2028.
+      ReferenceCountedObject<?> ref = ReferenceCountedObject.wrap(next);
+      ((TransactionContextImpl) trx).setDelegatedRef(ref);
+      ref.retain();
       try {
         // Let the StateMachine inject logic for committed transactions in 
sequential order.
         trx = stateMachine.applyTransactionSerial(trx);
@@ -1810,6 +1814,8 @@ class RaftServerImpl implements RaftServer.Division,
         return replyPendingRequest(invocationId, TermIndex.valueOf(next), 
stateMachineFuture);
       } catch (Exception e) {
         throw new RaftLogIOException(e);
+      } finally {
+        ref.release();
       }
     }
     return null;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index d02994ec5..27eaf317e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -318,7 +318,7 @@ class ServerState {
 
   void appendLog(TransactionContext operation) throws StateMachineException {
     getLog().append(currentTerm.get(), operation);
-    Objects.requireNonNull(operation.getLogEntry());
+    Objects.requireNonNull(operation.getLogEntryUnsafe(), 
"transaction-logEntry");
   }
 
   /** @return true iff the given peer id is recognized as the leader. */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index de06faf63..b177f0e14 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -27,6 +27,7 @@ import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 
@@ -221,4 +222,21 @@ public final class LogProtoUtils {
     final List<RaftPeer> oldListener = 
ProtoUtils.toRaftPeers(proto.getOldListenersList());
     return ServerImplUtils.newRaftConfiguration(conf, listener, 
entry.getIndex(), oldConf, oldListener);
   }
+
+  public static LogEntryProto copy(LogEntryProto proto) {
+    if (proto == null) {
+      return null;
+    }
+
+    if (!proto.hasStateMachineLogEntry() && !proto.hasMetadataEntry() && 
!proto.hasConfigurationEntry()) {
+      // empty entry, just return as is.
+      return proto;
+    }
+
+    try {
+      return LogEntryProto.parseFrom(proto.toByteString());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException("Failed to copy log entry " + 
TermIndex.valueOf(proto), e);
+    }
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index c987c53dd..98f270d2d 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -18,7 +18,7 @@
 
 package org.apache.ratis.statemachine.impl;
 
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -110,10 +110,10 @@ public class BaseStateMachine implements StateMachine, 
StateMachine.DataApi,
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     // return the same message contained in the entry
-    RaftProtos.LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
+    final LogEntryProto entry = 
Objects.requireNonNull(trx.getLogEntryUnsafe());
     updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
     return CompletableFuture.completedFuture(
-        
Message.valueOf(trx.getLogEntry().getStateMachineLogEntry().getLogData()));
+        Message.valueOf(entry.getStateMachineLogEntry().getLogData()));
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
index 7c4f1782d..44bd32c67 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
@@ -25,12 +25,14 @@ import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ReferenceCountedObject;
 
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /**
  * Implementation of {@link TransactionContext}
@@ -69,6 +71,9 @@ public class TransactionContextImpl implements 
TransactionContext {
 
   /** Committed LogEntry. */
   private volatile LogEntryProto logEntry;
+  /** Committed LogEntry copy. */
+  private volatile Supplier<LogEntryProto> logEntryCopy;
+
   /** For wrapping {@link #logEntry} in order to release the underlying 
buffer. */
   private volatile ReferenceCountedObject<?> delegatedRef;
 
@@ -112,7 +117,7 @@ public class TransactionContextImpl implements 
TransactionContext {
    */
   TransactionContextImpl(RaftPeerRole serverRole, StateMachine stateMachine, 
LogEntryProto logEntry) {
     this(serverRole, null, stateMachine, logEntry.getStateMachineLogEntry());
-    this.logEntry = logEntry;
+    setLogEntry(logEntry);
     this.logIndexFuture.complete(logEntry.getIndex());
   }
 
@@ -135,8 +140,10 @@ public class TransactionContextImpl implements 
TransactionContext {
     if (delegatedRef == null) {
       return TransactionContext.super.wrap(entry);
     }
-    Preconditions.assertSame(getLogEntry().getTerm(), entry.getTerm(), 
"entry.term");
-    Preconditions.assertSame(getLogEntry().getIndex(), entry.getIndex(), 
"entry.index");
+    final LogEntryProto expected = getLogEntryUnsafe();
+    Objects.requireNonNull(expected, "logEntry == null");
+    Preconditions.assertSame(expected.getTerm(), entry.getTerm(), 
"entry.term");
+    Preconditions.assertSame(expected.getIndex(), entry.getIndex(), 
"entry.index");
     return delegatedRef.delegate(entry);
   }
 
@@ -168,18 +175,31 @@ public class TransactionContextImpl implements 
TransactionContext {
     Objects.requireNonNull(stateMachineLogEntry, "stateMachineLogEntry == 
null");
 
     logIndexFuture.complete(index);
-    return logEntry = LogProtoUtils.toLogEntryProto(stateMachineLogEntry, 
term, index);
+    return setLogEntry(LogProtoUtils.toLogEntryProto(stateMachineLogEntry, 
term, index));
   }
 
   public CompletableFuture<Long> getLogIndexFuture() {
     return logIndexFuture;
   }
 
+  private LogEntryProto setLogEntry(LogEntryProto entry) {
+    this.logEntry = entry;
+    this.logEntryCopy = MemoizedSupplier.valueOf(() -> 
LogProtoUtils.copy(entry));
+    return entry;
+  }
+
+
   @Override
   public LogEntryProto getLogEntry() {
+    return logEntryCopy == null ? null : logEntryCopy.get();
+  }
+
+  @Override
+  public LogEntryProto getLogEntryUnsafe() {
     return logEntry;
   }
 
+
   @Override
   public TransactionContext setException(Exception ioe) {
     this.exception = ioe;
@@ -209,4 +229,8 @@ public class TransactionContextImpl implements 
TransactionContext {
     // call this to let the SM know that Transaction cannot be synced
     return stateMachine.cancelTransaction(this);
   }
+
+  public static LogEntryProto getLogEntry(TransactionContext context) {
+    return ((TransactionContextImpl) context).logEntry;
+  }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index eea75592e..ead2a8be8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -19,6 +19,7 @@ package org.apache.ratis;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -366,10 +367,11 @@ public abstract class ReadOnlyRequestTests<CLUSTER 
extends MiniRaftCluster>
 
     @Override
     public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
-      LOG.debug("apply trx with index=" + trx.getLogEntry().getIndex());
-      updateLastAppliedTermIndex(trx.getLogEntry().getTerm(), 
trx.getLogEntry().getIndex());
+      final RaftProtos.LogEntryProto logEntry = trx.getLogEntryUnsafe();
+      LOG.debug("apply trx with index=" + logEntry.getIndex());
+      updateLastAppliedTermIndex(logEntry.getTerm(), logEntry.getIndex());
 
-      String command = trx.getLogEntry().getStateMachineLogEntry()
+      String command = logEntry.getStateMachineLogEntry()
           .getLogData().toString(StandardCharsets.UTF_8);
 
       LOG.info("receive command: {}", command);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 28f8e6ace..246abb99f 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -56,7 +56,7 @@ public abstract class StateMachineShutdownTests<CLUSTER 
extends MiniRaftCluster>
           }
         }
       }
-      RaftProtos.LogEntryProto entry = trx.getLogEntry();
+      final RaftProtos.LogEntryProto entry = trx.getLogEntryUnsafe();
       updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
       return CompletableFuture.completedFuture(new 
RaftTestUtil.SimpleMessage("done"));
     }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 17d5a607a..7dd1db3c7 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -84,7 +84,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     return (SimpleStateMachine4Testing)s.getStateMachine();
   }
 
-  private final SortedMap<Long, LogEntryProto> indexMap = 
Collections.synchronizedSortedMap(new TreeMap<>());
+  private final SortedMap<Long, ReferenceCountedObject<LogEntryProto>> 
indexMap = Collections.synchronizedSortedMap(new TreeMap<>());
   private final SortedMap<String, LogEntryProto> dataMap = 
Collections.synchronizedSortedMap(new TreeMap<>());
   private final Daemon checkpointer;
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
@@ -199,8 +199,9 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     return leaderElectionTimeoutInfo;
   }
 
-  private void put(LogEntryProto entry) {
-    final LogEntryProto previous = indexMap.put(entry.getIndex(), entry);
+  private void put(ReferenceCountedObject<LogEntryProto> entryRef) {
+    LogEntryProto entry = entryRef.retain();
+    final ReferenceCountedObject<LogEntryProto> previous = 
indexMap.put(entry.getIndex(), entryRef);
     Preconditions.assertNull(previous, "previous");
     final String s = 
entry.getStateMachineLogEntry().getLogData().toStringUtf8();
     dataMap.put(s, entry);
@@ -246,27 +247,17 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     blocking.await(Blocking.Type.APPLY_TRANSACTION);
-    LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
+    ReferenceCountedObject<LogEntryProto> entryRef = 
Objects.requireNonNull(trx.getLogEntryRef());
+    LogEntryProto entry = entryRef.get();
     LOG.info("applyTransaction for log index {}", entry.getIndex());
 
-    // TODO: Logs kept in StateMachine's cache may be corrupted. Copy for now 
to have the test pass.
-    // Use ReferenceCount per RATIS-1997.
-    LogEntryProto copied = copy(entry);
-    put(copied);
+    put(entryRef);
     updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
 
     final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK");
     return collecting.collect(Collecting.Type.APPLY_TRANSACTION, m);
   }
 
-  private LogEntryProto copy(LogEntryProto log) {
-    try {
-      return LogEntryProto.parseFrom(log.toByteString());
-    } catch (InvalidProtocolBufferException e) {
-      throw new IllegalStateException("Error copying log entry", e);
-    }
-  }
-
   @Override
   public long takeSnapshot() {
     final TermIndex termIndex = getLastAppliedTermIndex();
@@ -280,7 +271,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile);
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(snapshotFile, false,
         segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
-      for (final LogEntryProto entry : indexMap.values()) {
+      for (final ReferenceCountedObject<LogEntryProto> entryRef : 
indexMap.values()) {
+        LogEntryProto entry = entryRef.get();
         if (entry.getIndex() > endIndex) {
           break;
         } else {
@@ -315,7 +307,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
           snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
         LogEntryProto entry;
         while ((entry = in.nextEntry()) != null) {
-          put(entry);
+          put(ReferenceCountedObject.wrap(entry));
           updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
         }
       }
@@ -390,10 +382,11 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
       running = false;
       checkpointer.interrupt();
     });
+    indexMap.values().forEach(ReferenceCountedObject::release);
   }
 
   public LogEntryProto[] getContent() {
-    return indexMap.values().toArray(new LogEntryProto[0]);
+    return 
indexMap.values().stream().map(ReferenceCountedObject::get).toArray(LogEntryProto[]::new);
   }
 
   public void blockStartTransaction() {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index e4a930f1d..2970bbef8 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -165,7 +165,7 @@ public interface DataStreamTestUtils {
 
     @Override
     public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
-      final LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
+      final LogEntryProto entry = 
Objects.requireNonNull(trx.getLogEntryUnsafe());
       updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
       final SingleDataStream s = 
getSingleDataStream(ClientInvocationId.valueOf(entry.getStateMachineLogEntry()));
       final ByteString bytesWritten = 
bytesWritten2ByteString(s.getDataChannel().getBytesWritten());
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java 
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 094189827..07ea4edbc 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -91,7 +91,7 @@ public class TestStateMachine extends BaseTest implements 
MiniRaftClusterWithSim
     @Override
     public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
       try {
-        assertNotNull(trx.getLogEntry());
+        assertNotNull(trx.getLogEntryUnsafe());
         assertNotNull(trx.getStateMachineLogEntry());
         Object context = trx.getStateMachineContext();
         if (isLeader.get()) {

Reply via email to