This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-3.1.1_review
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit cdfd4f10d7febbd320c1104f2b173c5ac9b4264b
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Aug 21 08:57:20 2024 -0700

    Revert "RATIS-1983. Refactor client request processing to support reference 
count. (#998)"
    
    This reverts commit 43a02109d1f0b34dd80b1e36f1d023f86f24e3ab.
---
 .../protocol/RaftClientAsynchronousProtocol.java   |  34 +-----
 .../apache/ratis/util/ReferenceCountedObject.java  |  24 ----
 .../examples/filestore/FileStoreStateMachine.java  |  18 +--
 .../ratis/statemachine/TransactionContext.java     |   8 --
 .../apache/ratis/server/impl/RaftServerImpl.java   |  86 ++++----------
 .../apache/ratis/server/impl/RaftServerProxy.java  |  13 +-
 .../ratis/server/raftlog/segmented/LogSegment.java | 132 +++++++--------------
 .../server/raftlog/segmented/SegmentedRaftLog.java |  19 ++-
 .../raftlog/segmented/SegmentedRaftLogCache.java   |   5 +-
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |   8 +-
 .../statemachine/impl/TransactionContextImpl.java  |  18 ---
 .../server/raftlog/segmented/TestLogSegment.java   |  24 ++--
 .../segmented/TestSegmentedRaftLogCache.java       |  12 +-
 .../java/org/apache/ratis/tools/ParseRatisLog.java |   7 +-
 14 files changed, 111 insertions(+), 297 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index 1985bbe66..1a9f83c82 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,40 +17,12 @@
  */
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.ReferenceCountedObject;
-
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 /** Asynchronous version of {@link RaftClientProtocol}. */
 public interface RaftClientAsynchronousProtocol {
-  /**
-   * It is recommended to override {@link 
#submitClientRequestAsync(ReferenceCountedObject)} instead.
-   * Then, it does not have to override this method.
-   */
-  default CompletableFuture<RaftClientReply> submitClientRequestAsync(
-      RaftClientRequest request) throws IOException {
-    return submitClientRequestAsync(ReferenceCountedObject.wrap(request));
-  }
+  CompletableFuture<RaftClientReply> submitClientRequestAsync(
+      RaftClientRequest request) throws IOException;
 
-  /**
-   * A referenced counted request is submitted from a client for processing.
-   * Implementations of this method should retain the request, process it and 
then release it.
-   * The request may be retained even after the future returned by this method 
has completed.
-   *
-   * @return a future of the reply
-   * @see ReferenceCountedObject
-   */
-  default CompletableFuture<RaftClientReply> submitClientRequestAsync(
-      ReferenceCountedObject<RaftClientRequest> requestRef) {
-    try {
-      // for backward compatibility
-      return submitClientRequestAsync(requestRef.retain())
-          .whenComplete((r, e) -> requestRef.release());
-    } catch (Exception e) {
-      requestRef.release();
-      return JavaUtils.completeExceptionally(e);
-    }
-  }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 3f72f5ffe..0dd378dc0 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -101,30 +101,6 @@ public interface ReferenceCountedObject<T> {
     return wrap(value, () -> {}, ignored -> {});
   }
 
-  /**
-   * @return a {@link ReferenceCountedObject} of the given value by delegating 
to this object.
-   */
-  default <V> ReferenceCountedObject<V> delegate(V value) {
-    final ReferenceCountedObject<T> delegated = this;
-    return new ReferenceCountedObject<V>() {
-      @Override
-      public V get() {
-        return value;
-      }
-
-      @Override
-      public V retain() {
-        delegated.retain();
-        return value;
-      }
-
-      @Override
-      public boolean release() {
-        return delegated.release();
-      }
-    };
-  }
-
   /**
    * Wrap the given value as a {@link ReferenceCountedObject}.
    *
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 858e300ec..5f258ee3b 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -32,7 +32,6 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
@@ -41,7 +40,6 @@ import 
org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.JavaUtils;
 
 import java.io.IOException;
 import java.nio.file.Path;
@@ -170,11 +168,9 @@ public class FileStoreStateMachine extends 
BaseStateMachine {
   }
 
   static class LocalStream implements DataStream {
-    private final String name;
     private final DataChannel dataChannel;
 
-    LocalStream(String name, DataChannel dataChannel) {
-      this.name = JavaUtils.getClassSimpleName(getClass()) + "[" + name + "]";
+    LocalStream(DataChannel dataChannel) {
       this.dataChannel = dataChannel;
     }
 
@@ -194,11 +190,6 @@ public class FileStoreStateMachine extends 
BaseStateMachine {
         }
       });
     }
-
-    @Override
-    public String toString() {
-      return name;
-    }
   }
 
   @Override
@@ -211,14 +202,13 @@ public class FileStoreStateMachine extends 
BaseStateMachine {
       return FileStoreCommon.completeExceptionally(
           "Failed to parse stream header", e);
     }
-    final String file = proto.getStream().getPath().toStringUtf8();
-    return files.createDataChannel(file)
-        .thenApply(channel -> new LocalStream(file, channel));
+    return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
   }
 
   @Override
   public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
-    LOG.info("linking {} to {}", stream, 
LogProtoUtils.toLogEntryString(entry));
+    LOG.info("linking {}", stream);
     return files.streamLink(stream);
   }
 
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index e0190747f..3821b058c 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -23,7 +23,6 @@ import 
org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.ReflectionUtils;
 
 import java.io.IOException;
@@ -99,13 +98,6 @@ public interface TransactionContext {
    */
   LogEntryProto getLogEntry();
 
-  /** Wrap the given log entry as a {@link ReferenceCountedObject} for 
retaining it for later use. */
-  default ReferenceCountedObject<LogEntryProto> wrap(LogEntryProto entry) {
-    Preconditions.assertSame(getLogEntry().getTerm(), entry.getTerm(), 
"entry.term");
-    Preconditions.assertSame(getLogEntry().getIndex(), entry.getIndex(), 
"entry.index");
-    return ReferenceCountedObject.wrap(entry);
-  }
-
   /**
    * Sets whether to commit the transaction to the RAFT log or not
    * @param shouldCommit true if the transaction is supposed to be committed 
to the RAFT log
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c1a716bd0..8d7246fcf 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -109,7 +109,6 @@ import org.apache.ratis.util.LifeCycle.State;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedSupplier;
 
@@ -772,21 +771,15 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   /**
-   * Append a transaction to the log for processing a client request.
-   * Note that the given request could be different from {@link 
TransactionContext#getClientRequest()}
-   * since the request could be converted; see {@link 
#convertRaftClientRequest(RaftClientRequest)}.
-   *
-   * @param request The client request.
-   * @param context The context of the transaction.
-   * @param cacheEntry the entry in the retry cache.
-   * @return a future of the reply.
+   * Handle a normal update request from client.
    */
   private CompletableFuture<RaftClientReply> appendTransaction(
-      RaftClientRequest request, TransactionContextImpl context, CacheEntry 
cacheEntry) {
-    Objects.requireNonNull(request, "request == null");
+      RaftClientRequest request, TransactionContextImpl context, CacheEntry 
cacheEntry) throws IOException {
     CodeInjectionForTesting.execute(APPEND_TRANSACTION, getId(),
         request.getClientId(), request, context, cacheEntry);
 
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+
     final PendingRequest pending;
     synchronized (this) {
       final CompletableFuture<RaftClientReply> reply = 
checkLeaderState(request, cacheEntry);
@@ -805,7 +798,6 @@ class RaftServerImpl implements RaftServer.Division,
         return cacheEntry.getReplyFuture();
       }
       try {
-        assertLifeCycleState(LifeCycle.States.RUNNING);
         state.appendLog(context);
       } catch (StateMachineException e) {
         // the StateMachineException is thrown by the SM in the preAppend 
stage.
@@ -817,9 +809,6 @@ class RaftServerImpl implements RaftServer.Division,
           
leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
         }
         return CompletableFuture.completedFuture(exceptionReply);
-      } catch (ServerNotReadyException e) {
-        final RaftClientReply exceptionReply = newExceptionReply(request, e);
-        return CompletableFuture.completedFuture(exceptionReply);
       }
 
       // put the request into the pending queue
@@ -872,13 +861,11 @@ class RaftServerImpl implements RaftServer.Division,
     role.getLeaderState().ifPresent(leader -> 
leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE));
   }
 
-  /** If the given request is {@link TypeCase#FORWARD}, convert it. */
-  static RaftClientRequest convertRaftClientRequest(RaftClientRequest request) 
throws InvalidProtocolBufferException {
-    if (!request.is(TypeCase.FORWARD)) {
-      return request;
-    }
-    return 
ClientProtoUtils.toRaftClientRequest(RaftClientRequestProto.parseFrom(
-        request.getMessage().getContent().asReadOnlyByteBuffer()));
+  private RaftClientRequest 
filterDataStreamRaftClientRequest(RaftClientRequest request)
+      throws InvalidProtocolBufferException {
+    return !request.is(TypeCase.FORWARD) ? request : 
ClientProtoUtils.toRaftClientRequest(
+        RaftClientRequestProto.parseFrom(
+            request.getMessage().getContent().asReadOnlyByteBuffer()));
   }
 
   <REPLY> CompletableFuture<REPLY> executeSubmitServerRequestAsync(
@@ -888,29 +875,20 @@ class RaftServerImpl implements RaftServer.Division,
         serverExecutor).join();
   }
 
-  CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(
-      ReferenceCountedObject<RaftClientRequest> request) {
-    return CompletableFuture.supplyAsync(() -> 
submitClientRequestAsync(request), clientExecutor).join();
+  CompletableFuture<RaftClientReply> 
executeSubmitClientRequestAsync(RaftClientRequest request) {
+    return CompletableFuture.supplyAsync(
+        () -> JavaUtils.callAsUnchecked(() -> 
submitClientRequestAsync(request), CompletionException::new),
+        clientExecutor).join();
   }
 
   @Override
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
-      ReferenceCountedObject<RaftClientRequest> requestRef) {
-    final RaftClientRequest request = requestRef.retain();
+      RaftClientRequest request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
     LOG.debug("{}: receive client request({})", getMemberId(), request);
-
-    try {
-      assertLifeCycleState(LifeCycle.States.RUNNING);
-    } catch (ServerNotReadyException e) {
-      final RaftClientReply reply = newExceptionReply(request, e);
-      requestRef.release();
-      return CompletableFuture.completedFuture(reply);
-    }
-
     final Timekeeper timer = 
raftServerMetrics.getClientRequestTimer(request.getType());
     final Optional<Timekeeper.Context> timerContext = 
Optional.ofNullable(timer).map(Timekeeper::time);
-    return replyFuture(requestRef).whenComplete((clientReply, exception) -> {
-      requestRef.release();
+    return replyFuture(request).whenComplete((clientReply, exception) -> {
       timerContext.ifPresent(Timekeeper.Context::stop);
       if (exception != null || clientReply.getException() != null) {
         raftServerMetrics.incFailedRequestCount(request.getType());
@@ -918,8 +896,7 @@ class RaftServerImpl implements RaftServer.Division,
     });
   }
 
-  private CompletableFuture<RaftClientReply> 
replyFuture(ReferenceCountedObject<RaftClientRequest> requestRef) {
-    final RaftClientRequest request = requestRef.get();
+  private CompletableFuture<RaftClientReply> replyFuture(RaftClientRequest 
request) throws IOException {
     retryCache.invalidateRepliedRequests(request);
 
     final TypeCase type = request.getType().getTypeCase();
@@ -931,18 +908,17 @@ class RaftServerImpl implements RaftServer.Division,
       case WATCH:
         return watchAsync(request);
       case MESSAGESTREAM:
-        return messageStreamAsync(requestRef);
+        return messageStreamAsync(request);
       case WRITE:
       case FORWARD:
-        return writeAsync(requestRef);
+        return writeAsync(request);
       default:
         throw new IllegalStateException("Unexpected request type: " + type + 
", request=" + request);
     }
   }
 
-  private CompletableFuture<RaftClientReply> 
writeAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
-    final RaftClientRequest request = requestRef.get();
-    final CompletableFuture<RaftClientReply> future = 
writeAsyncImpl(requestRef);
+  private CompletableFuture<RaftClientReply> writeAsync(RaftClientRequest 
request) throws IOException {
+    final CompletableFuture<RaftClientReply> future = writeAsyncImpl(request);
     if (request.is(TypeCase.WRITE)) {
       // check replication
       final ReplicationLevel replication = 
request.getType().getWrite().getReplication();
@@ -953,8 +929,7 @@ class RaftServerImpl implements RaftServer.Division,
     return future;
   }
 
-  private CompletableFuture<RaftClientReply> 
writeAsyncImpl(ReferenceCountedObject<RaftClientRequest> requestRef) {
-    final RaftClientRequest request = requestRef.get();
+  private CompletableFuture<RaftClientReply> writeAsyncImpl(RaftClientRequest 
request) throws IOException {
     final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
     if (reply != null) {
       return reply;
@@ -970,15 +945,8 @@ class RaftServerImpl implements RaftServer.Division,
     // TODO: this client request will not be added to pending requests until
     // later which means that any failure in between will leave partial state 
in
     // the state machine. We should call cancelTransaction() for failed 
requests
-    final TransactionContextImpl context;
-    try {
-      context = (TransactionContextImpl) 
stateMachine.startTransaction(convertRaftClientRequest(request));
-    } catch (IOException e) {
-      final RaftClientReply exceptionReply = newExceptionReply(request,
-          new RaftException("Failed to startTransaction for " + request, e));
-      cacheEntry.failWithReply(exceptionReply);
-      return CompletableFuture.completedFuture(exceptionReply);
-    }
+    final TransactionContextImpl context = (TransactionContextImpl) 
stateMachine.startTransaction(
+        filterDataStreamRaftClientRequest(request));
     if (context.getException() != null) {
       final StateMachineException e = new StateMachineException(getMemberId(), 
context.getException());
       final RaftClientReply exceptionReply = newExceptionReply(request, e);
@@ -986,7 +954,6 @@ class RaftServerImpl implements RaftServer.Division,
       return CompletableFuture.completedFuture(exceptionReply);
     }
 
-    context.setDelegatedRef(requestRef);
     return appendTransaction(request, context, cacheEntry);
   }
 
@@ -1089,8 +1056,7 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  private CompletableFuture<RaftClientReply> 
messageStreamAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
-    final RaftClientRequest request = requestRef.get();
+  private CompletableFuture<RaftClientReply> 
messageStreamAsync(RaftClientRequest request) throws IOException {
     final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
     if (reply != null) {
       return reply;
@@ -1102,7 +1068,7 @@ class RaftServerImpl implements RaftServer.Division,
         return f.thenApply(r -> null);
       }
       // the message stream has ended and the request become a WRITE request
-      return replyFuture(requestRef.delegate(f.join()));
+      return replyFuture(f.join());
     }
 
     return role.getLeaderState()
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 9834d62ab..84221cfcf 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -52,7 +52,6 @@ import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.Closeable;
@@ -446,15 +445,9 @@ class RaftServerProxy implements RaftServer {
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> submitClientRequestAsync(
-      ReferenceCountedObject<RaftClientRequest> requestRef) {
-    final RaftClientRequest request = requestRef.retain();
-    try {
-      return getImplFuture(request.getRaftGroupId())
-          .thenCompose(impl -> 
impl.executeSubmitClientRequestAsync(requestRef));
-    } finally {
-      requestRef.release();
-    }
+  public CompletableFuture<RaftClientReply> 
submitClientRequestAsync(RaftClientRequest request) {
+    return getImplFuture(request.getRaftGroupId())
+        .thenCompose(impl -> impl.executeSubmitClientRequestAsync(request));
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 12e7c4f1d..89a6e2050 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -30,7 +30,6 @@ import 
org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -69,20 +67,17 @@ public final class LogSegment {
   }
 
   static long getEntrySize(LogEntryProto entry, Op op) {
-    switch (op) {
-      case CHECK_SEGMENT_FILE_FULL:
-      case LOAD_SEGMENT_FILE:
-      case WRITE_CACHE_WITH_STATE_MACHINE_CACHE:
-        Preconditions.assertTrue(entry == 
LogProtoUtils.removeStateMachineData(entry),
-            () -> "Unexpected LogEntryProto with StateMachine data: op=" + op 
+ ", entry=" + entry);
-        break;
-      case WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE:
-      case REMOVE_CACHE:
-        break;
-      default:
-        throw new IllegalStateException("Unexpected op " + op + ", entry=" + 
entry);
+    LogEntryProto e = entry;
+    if (op == Op.CHECK_SEGMENT_FILE_FULL) {
+      e = LogProtoUtils.removeStateMachineData(entry);
+    } else if (op == Op.LOAD_SEGMENT_FILE || op == 
Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE) {
+      Preconditions.assertTrue(entry == 
LogProtoUtils.removeStateMachineData(entry),
+          () -> "Unexpected LogEntryProto with StateMachine data: op=" + op + 
", entry=" + entry);
+    } else {
+      Preconditions.assertTrue(op == 
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE || op == Op.REMOVE_CACHE,
+          () -> "Unexpected op " + op + ", entry=" + entry);
     }
-    final int serialized = entry.getSerializedSize();
+    final int serialized = e.getSerializedSize();
     return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 
4L;
   }
 
@@ -129,8 +124,7 @@ public final class LogSegment {
   }
 
   public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, 
SizeInBytes maxOpSize,
-      CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics 
raftLogMetrics,
-      Consumer<ReferenceCountedObject<LogEntryProto>> entryConsumer)
+      CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics 
raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
       throws IOException {
     int count = 0;
     try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
@@ -142,8 +136,7 @@ public final class LogSegment {
         }
 
         if (entryConsumer != null) {
-          // TODO: use reference count to support zero buffer copying for 
readSegmentFile
-          entryConsumer.accept(ReferenceCountedObject.wrap(next));
+          entryConsumer.accept(next);
         }
         count++;
       }
@@ -170,7 +163,10 @@ public final class LogSegment {
     final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, 
RaftStorage::getLogCorruptionPolicy);
     final boolean isOpen = startEnd.isOpen();
     final int entryCount = readSegmentFile(file, startEnd, maxOpSize, 
corruptionPolicy, raftLogMetrics, entry -> {
-      segment.append(Op.LOAD_SEGMENT_FILE, entry, keepEntryInCache || isOpen, 
logConsumer);
+      segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
+      if (logConsumer != null) {
+        logConsumer.accept(entry);
+      }
     });
     LOG.info("Successfully read {} entries from segment file {}", entryCount, 
file);
 
@@ -238,10 +234,10 @@ public final class LogSegment {
       // the on-disk log file should be truncated but has not been done yet.
       final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
       final LogSegmentStartEnd startEnd = 
LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
-      readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), 
raftLogMetrics, entryRef -> {
-        final LogEntryProto entry = entryRef.retain();
+      readSegmentFile(file, startEnd, maxOpSize,
+          getLogCorruptionPolicy(), raftLogMetrics, entry -> {
         final TermIndex ti = TermIndex.valueOf(entry);
-        putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
+        putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE);
         if (ti.equals(key.getTermIndex())) {
           toReturn.set(entry);
         }
@@ -251,48 +247,13 @@ public final class LogSegment {
     }
   }
 
-  static class EntryCache {
-    private final Map<TermIndex, ReferenceCountedObject<LogEntryProto>> map = 
new ConcurrentHashMap<>();
-    private final AtomicLong size = new AtomicLong();
-
-    long size() {
-      return size.get();
-    }
-
-    LogEntryProto get(TermIndex ti) {
-      return Optional.ofNullable(map.get(ti))
-          .map(ReferenceCountedObject::get)
-          .orElse(null);
-    }
-
-    void clear() {
-      map.values().forEach(ReferenceCountedObject::release);
-      map.clear();
-      size.set(0);
-    }
-
-    void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op 
op) {
-      valueRef.retain();
-      Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release);
-      size.getAndAdd(getEntrySize(valueRef.get(), op));
-    }
-
-    private void release(ReferenceCountedObject<LogEntryProto> entry) {
-      size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE));
-      entry.release();
-    }
-
-    void remove(TermIndex key) {
-      Optional.ofNullable(map.remove(key)).ifPresent(this::release);
-    }
-  }
-
   File getFile() {
     return LogSegmentStartEnd.valueOf(startIndex, endIndex, 
isOpen).getFile(storage);
   }
 
   private volatile boolean isOpen;
   private long totalFileSize = SegmentedRaftLogFormat.getHeaderLength();
+  private AtomicLong totalCacheSize = new AtomicLong(0);
   /** Segment start index, inclusive. */
   private final long startIndex;
   /** Segment end index, inclusive. */
@@ -310,7 +271,7 @@ public final class LogSegment {
   /**
    * the entryCache caches the content of log entries.
    */
-  private final EntryCache entryCache = new EntryCache();
+  private final Map<TermIndex, LogEntryProto> entryCache = new 
ConcurrentHashMap<>();
 
   private LogSegment(RaftStorage storage, boolean isOpen, long start, long 
end, SizeInBytes maxOpSize,
       SegmentedRaftLogMetrics raftLogMetrics) {
@@ -342,29 +303,12 @@ public final class LogSegment {
     return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
   }
 
-  void appendToOpenSegment(Op op, ReferenceCountedObject<LogEntryProto> 
entryRef) {
+  void appendToOpenSegment(LogEntryProto entry, Op op) {
     Preconditions.assertTrue(isOpen(), "The log segment %s is not open for 
append", this);
-    append(op, entryRef, true, null);
-  }
-
-  private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef,
-      boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer) {
-    final LogEntryProto entry = entryRef.retain();
-    try {
-      final LogRecord record = appendLogRecord(op, entry);
-      if (keepEntryInCache) {
-        putEntryCache(record.getTermIndex(), entryRef, op);
-      }
-      if (logConsumer != null) {
-        logConsumer.accept(entry);
-      }
-    } finally {
-      entryRef.release();
-    }
+    append(true, entry, op);
   }
 
-
-  private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
+  private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
     Objects.requireNonNull(entry, "entry == null");
     if (records.isEmpty()) {
       Preconditions.assertTrue(entry.getIndex() == startIndex,
@@ -380,9 +324,11 @@ public final class LogSegment {
 
     final LogRecord record = new LogRecord(totalFileSize, entry);
     records.add(record);
+    if (keepEntryInCache) {
+      putEntryCache(record.getTermIndex(), entry, op);
+    }
     totalFileSize += getEntrySize(entry, op);
     endIndex = entry.getIndex();
-    return record;
   }
 
   LogEntryProto getEntryFromCache(TermIndex ti) {
@@ -425,7 +371,7 @@ public final class LogSegment {
   }
 
   long getTotalCacheSize() {
-    return entryCache.size();
+    return totalCacheSize.get();
   }
 
   /**
@@ -435,7 +381,7 @@ public final class LogSegment {
     Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
     for (long index = endIndex; index >= fromIndex; index--) {
       LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
-      removeEntryCache(removed.getTermIndex());
+      removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
       totalFileSize = removed.offset;
     }
     isOpen = false;
@@ -480,18 +426,28 @@ public final class LogSegment {
 
   void evictCache() {
     entryCache.clear();
+    totalCacheSize.set(0);
   }
 
-  void putEntryCache(TermIndex key, ReferenceCountedObject<LogEntryProto> 
valueRef, Op op) {
-    entryCache.put(key, valueRef, op);
+  void putEntryCache(TermIndex key, LogEntryProto value, Op op) {
+    final LogEntryProto previous = entryCache.put(key, value);
+    long previousSize = 0;
+    if (previous != null) {
+      // Different threads maybe load LogSegment file into cache at the same 
time, so duplicate maybe happen
+      previousSize = getEntrySize(value, Op.REMOVE_CACHE);
+    }
+    totalCacheSize.getAndAdd(getEntrySize(value, op) - previousSize);
   }
 
-  void removeEntryCache(TermIndex key) {
-    entryCache.remove(key);
+  void removeEntryCache(TermIndex key, Op op) {
+    LogEntryProto value = entryCache.remove(key);
+    if (value != null) {
+      totalCacheSize.getAndAdd(-getEntrySize(value, op));
+    }
   }
 
   boolean hasCache() {
-    return isOpen || entryCache.size() > 0; // open segment always has cache.
+    return isOpen || !entryCache.isEmpty(); // open segment always has cache.
   }
 
   boolean containsIndex(long index) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index def472a60..f49900f16 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -41,7 +41,6 @@ import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.AwaitToRun;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.StringUtils;
 
 import java.io.File;
@@ -54,7 +53,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.LongSupplier;
 
 import org.apache.ratis.util.UncheckedAutoCloseable;
@@ -395,7 +393,6 @@ public final class SegmentedRaftLog extends RaftLogBase {
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", getName(), 
LogProtoUtils.toLogEntryString(entry));
     }
-    final LogEntryProto removedStateMachineData = 
LogProtoUtils.removeStateMachineData(entry);
     try(AutoCloseableLock writeLock = writeLock()) {
       final Timekeeper.Context appendEntryTimerContext = 
getRaftLogMetrics().startAppendEntryTimer();
       validateLogEntry(entry);
@@ -404,7 +401,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
       if (currentOpenSegment == null) {
         cache.addOpenSegment(entry.getIndex());
         fileLogWorker.startLogSegment(entry.getIndex());
-      } else if (isSegmentFull(currentOpenSegment, removedStateMachineData)) {
+      } else if (isSegmentFull(currentOpenSegment, entry)) {
         rollOpenSegment = true;
       } else {
         final TermIndex last = currentOpenSegment.getLastTermIndex();
@@ -426,17 +423,17 @@ public final class SegmentedRaftLog extends RaftLogBase {
       // If the entry has state machine data, then the entry should be inserted
       // to statemachine first and then to the cache. Not following the order
       // will leave a spurious entry in the cache.
-      final Task write = fileLogWorker.writeLogEntry(entry, 
removedStateMachineData, context);
-      final Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> 
wrap = context != null ?
-          context::wrap : ReferenceCountedObject::wrap;
+      CompletableFuture<Long> writeFuture =
+          fileLogWorker.writeLogEntry(entry, context).getFuture();
       if (stateMachineCachingEnabled) {
         // The stateMachineData will be cached inside the StateMachine itself.
-        cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE, 
wrap.apply(removedStateMachineData));
+        cache.appendEntry(LogProtoUtils.removeStateMachineData(entry),
+            LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
       } else {
-        
cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
wrap.apply(entry)
-        );
+        cache.appendEntry(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       }
-      return write.getFuture().whenComplete((clientReply, exception) -> 
appendEntryTimerContext.stop());
+      writeFuture.whenComplete((clientReply, exception) -> 
appendEntryTimerContext.stop());
+      return writeFuture;
     } catch (Exception e) {
       LOG.error("{}: Failed to append {}", getName(), toLogEntryString(entry), 
e);
       throw e;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 0b05b14e5..58c70c4af 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -32,7 +32,6 @@ import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.AutoCloseableReadWriteLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -598,11 +597,11 @@ public class SegmentedRaftLogCache {
     }
   }
 
-  void appendEntry(LogSegment.Op op, ReferenceCountedObject<LogEntryProto> 
entry) {
+  void appendEntry(LogEntryProto entry, LogSegment.Op op) {
     // SegmentedRaftLog does the segment creation/rolling work. Here we just
     // simply append the entry into the open segment.
     Preconditions.assertNotNull(openSegment, "openSegment");
-    openSegment.appendToOpenSegment(op, entry);
+    openSegment.appendToOpenSegment(entry, op);
   }
 
   /**
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 5b0470d4f..68266b417 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -445,8 +445,8 @@ class SegmentedRaftLogWorker {
     addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
   }
 
-  Task writeLogEntry(LogEntryProto entry, LogEntryProto 
removedStateMachineData, TransactionContext context) {
-    return addIOTask(new WriteLog(entry, removedStateMachineData, context));
+  Task writeLogEntry(LogEntryProto entry, TransactionContext context) {
+    return addIOTask(new WriteLog(entry, context));
   }
 
   Task truncate(TruncationSegments ts, long index) {
@@ -493,8 +493,8 @@ class SegmentedRaftLogWorker {
     private final CompletableFuture<?> stateMachineFuture;
     private final CompletableFuture<Long> combined;
 
-    WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData, 
TransactionContext context) {
-      this.entry = removedStateMachineData;
+    WriteLog(LogEntryProto entry, TransactionContext context) {
+      this.entry = LogProtoUtils.removeStateMachineData(entry);
       if (this.entry == entry) {
         final StateMachineLogEntryProto proto = 
entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null;
         if (stateMachine != null && proto != null && proto.getType() == 
StateMachineLogEntryProto.Type.DATASTREAM) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
index 600625716..d92f3a1c8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
@@ -26,7 +26,6 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ReferenceCountedObject;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -72,9 +71,6 @@ public class TransactionContextImpl implements 
TransactionContext {
   /** Committed LogEntry. */
   @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
   private volatile LogEntryProto logEntry;
-  /** For wrapping {@link #logEntry} in order to release the underlying 
buffer. */
-  @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
-  private volatile ReferenceCountedObject<?> delegatedRef;
 
   private final CompletableFuture<Long> logIndexFuture = new 
CompletableFuture<>();
 
@@ -130,20 +126,6 @@ public class TransactionContextImpl implements 
TransactionContext {
     return clientRequest;
   }
 
-  public void setDelegatedRef(ReferenceCountedObject<?> ref) {
-    this.delegatedRef = ref;
-  }
-
-  @Override
-  public ReferenceCountedObject<LogEntryProto> wrap(LogEntryProto entry) {
-    if (delegatedRef == null) {
-      return TransactionContext.super.wrap(entry);
-    }
-    Preconditions.assertSame(getLogEntry().getTerm(), entry.getTerm(), 
"entry.term");
-    Preconditions.assertSame(getLogEntry().getIndex(), entry.getIndex(), 
"entry.index");
-    return delegatedRef.delegate(entry);
-  }
-
   @Override
   public StateMachineLogEntryProto getStateMachineLogEntry() {
     return stateMachineLogEntry;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 4e04e9e62..50f9d2382 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -21,20 +21,18 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.server.raftlog.segmented.LogSegment.Op;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.server.storage.RaftStorageTestUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TraditionalBinaryPrefix;
 import org.junit.jupiter.api.AfterEach;
@@ -146,7 +144,7 @@ public class TestLogSegment extends BaseTest {
       if (entry == null) {
         entry = segment.loadCache(record);
       }
-      offset += getEntrySize(entry, 
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      offset += getEntrySize(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
   }
 
@@ -205,8 +203,8 @@ public class TestLogSegment extends BaseTest {
     while (size < max) {
       SimpleOperation op = new SimpleOperation("m" + i);
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
-      size += getEntrySize(entry, Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
-      segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry));
+      size += getEntrySize(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
 
     Assertions.assertTrue(segment.getTotalFileSize() >= max);
@@ -238,18 +236,18 @@ public class TestLogSegment extends BaseTest {
     final StateMachineLogEntryProto m = op.getLogEntryContent();
     try {
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1001);
-      segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry));
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       Assertions.fail("should fail since the entry's index needs to be 1000");
     } catch (IllegalStateException e) {
       // the exception is expected.
     }
 
     LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1000);
-    segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry));
+    segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
 
     try {
       entry = LogProtoUtils.toLogEntryProto(m, 0, 1002);
-      segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry));
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       Assertions.fail("should fail since the entry's index needs to be 1001");
     } catch (IllegalStateException e) {
       // the exception is expected.
@@ -264,7 +262,7 @@ public class TestLogSegment extends BaseTest {
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(
           new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
-      segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry));
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
 
     // truncate an open segment (remove 1080~1099)
@@ -319,7 +317,7 @@ public class TestLogSegment extends BaseTest {
         1024, 1024, ByteBuffer.allocateDirect(bufferSize))) {
       SimpleOperation op = new SimpleOperation(new String(content));
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
-      size = LogSegment.getEntrySize(entry, 
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      size = LogSegment.getEntrySize(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       out.write(entry);
     }
     Assertions.assertEquals(file.length(),
@@ -346,7 +344,7 @@ public class TestLogSegment extends BaseTest {
     Arrays.fill(content, (byte) 1);
     SimpleOperation op = new SimpleOperation(new String(content));
     LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
-    final long entrySize = LogSegment.getEntrySize(entry, 
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+    final long entrySize = LogSegment.getEntrySize(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
 
     long totalSize = SegmentedRaftLogFormat.getHeaderLength();
     long preallocated = 16 * 1024;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index efcb90580..8015f1827 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -34,9 +34,7 @@ import org.apache.ratis.server.raftlog.LogEntryHeader;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
 import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
-import org.apache.ratis.server.raftlog.segmented.LogSegment.Op;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -66,7 +64,7 @@ public class TestSegmentedRaftLogCache {
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
-      s.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry));
+      s.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
     if (!isOpen) {
       s.close();
@@ -150,15 +148,14 @@ public class TestSegmentedRaftLogCache {
   }
 
   @Test
-  public void testAppendEntry() {
+  public void testAppendEntry() throws Exception {
     LogSegment closedSegment = prepareLogSegment(0, 99, false);
     cache.addSegment(closedSegment);
 
     final SimpleOperation m = new SimpleOperation("m");
     try {
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0);
-      cache.appendEntry(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry)
-      );
+      cache.appendEntry(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       Assertions.fail("the open segment is null");
     } catch (IllegalStateException ignored) {
     }
@@ -167,8 +164,7 @@ public class TestSegmentedRaftLogCache {
     cache.addSegment(openSegment);
     for (long index = 101; index < 200; index++) {
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index);
-      cache.appendEntry(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, 
ReferenceCountedObject.wrap(entry)
-      );
+      cache.appendEntry(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
 
     Assertions.assertNotNull(cache.getOpenSegment());
diff --git 
a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java 
b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
index ea512fa70..564ce0bf0 100644
--- a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
+++ b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
@@ -24,7 +24,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
 import org.apache.ratis.server.raftlog.segmented.LogSegment;
-import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
@@ -70,8 +69,7 @@ public final class ParseRatisLog {
   }
 
 
-  private void processLogEntry(ReferenceCountedObject<LogEntryProto> ref) {
-    final LogEntryProto proto = ref.retain();
+  private void processLogEntry(LogEntryProto proto) {
     if (proto.hasConfigurationEntry()) {
       numConfEntries++;
     } else if (proto.hasMetadataEntry()) {
@@ -79,13 +77,12 @@ public final class ParseRatisLog {
     } else if (proto.hasStateMachineLogEntry()) {
       numStateMachineEntries++;
     } else {
-      System.out.println("Found an invalid entry: " + proto);
+      System.out.println("Found invalid entry" + proto.toString());
       numInvalidEntries++;
     }
 
     String str = LogProtoUtils.toLogEntryString(proto, smLogToString);
     System.out.println(str);
-    ref.release();
   }
 
   public static class Builder {

Reply via email to