This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e199daaf2 RATIS-2028. Refactor RaftLog to supply log as 
ReferenceCountedObject (#1045)
e199daaf2 is described below

commit e199daaf2a288508bd3e119e9c252c70ee90937d
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Mar 11 17:58:00 2024 -0700

    RATIS-2028. Refactor RaftLog to supply log as ReferenceCountedObject (#1045)
---
 .../org/apache/ratis/server/raftlog/RaftLog.java   | 17 +++++++++-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 12 +++----
 .../ratis/server/impl/StateMachineUpdater.java     | 19 +++++++----
 .../apache/ratis/server/raftlog/RaftLogBase.java   |  8 ++++-
 .../ratis/server/raftlog/memory/MemoryRaftLog.java | 35 +++++++++++++++++---
 .../ratis/server/raftlog/segmented/LogSegment.java | 21 ++++++------
 .../server/raftlog/segmented/SegmentedRaftLog.java | 37 +++++++++++++++++++---
 .../ratis/server/storage/RaftStorageTestUtils.java | 16 +++++++++-
 .../ratis/statemachine/RaftSnapshotBaseTest.java   |  4 ++-
 .../ratis/datastream/DataStreamTestUtils.java      |  4 ++-
 .../apache/ratis/server/ServerRestartTests.java    | 10 +++---
 .../server/raftlog/segmented/TestLogSegment.java   |  4 +--
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  7 ++--
 .../segmented/TestSegmentedRaftLogCache.java       |  4 +--
 14 files changed, 147 insertions(+), 51 deletions(-)

diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index e504462b8..e4fbd664e 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -21,6 +21,7 @@ import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorageMetadata;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,10 +58,24 @@ public interface RaftLog extends RaftLogSequentialOps, 
Closeable {
 
   /**
    * @return null if the log entry is not found in this log;
-   *         otherwise, return the log entry corresponding to the given index.
+   *         otherwise, return a copy of the log entry corresponding to the 
given index.
+   * @deprecated use {@link RaftLog#retainLog(long)} instead in order to avoid 
copying.
    */
+  @Deprecated
   LogEntryProto get(long index) throws RaftLogIOException;
 
+  /**
+   * @return a retained {@link ReferenceCountedObject} to the log entry 
corresponding to the given index if it exists;
+   *         otherwise, return null.
+   *         Since the returned reference is retained, the caller must call 
{@link ReferenceCountedObject#release()}}
+   *         after use.
+   */
+  default ReferenceCountedObject<LogEntryProto> retainLog(long index) throws 
RaftLogIOException {
+    ReferenceCountedObject<LogEntryProto> wrap = 
ReferenceCountedObject.wrap(get(index));
+    wrap.retain();
+    return wrap;
+  }
+
   /**
    * @return null if the log entry is not found in this log;
    *         otherwise, return the {@link EntryWithData} corresponding to the 
given index.
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 17a741e70..e8aeb6678 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1800,7 +1800,9 @@ class RaftServerImpl implements RaftServer.Division,
         MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry, 
getInfo().getCurrentRole())));
   }
 
-  CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws 
RaftLogIOException {
+  CompletableFuture<Message> 
applyLogToStateMachine(ReferenceCountedObject<LogEntryProto> nextRef)
+      throws RaftLogIOException {
+    LogEntryProto next = nextRef.get();
     if (!next.hasStateMachineLogEntry()) {
       stateMachine.event().notifyTermIndexUpdated(next.getTerm(), 
next.getIndex());
     }
@@ -1815,11 +1817,7 @@ class RaftServerImpl implements RaftServer.Division,
       TransactionContext trx = getTransactionContext(next, true);
       final ClientInvocationId invocationId = 
ClientInvocationId.valueOf(next.getStateMachineLogEntry());
       writeIndexCache.add(invocationId.getClientId(), 
((TransactionContextImpl) trx).getLogIndexFuture());
-
-      // TODO: RaftLog to provide the log entry as a ReferenceCountedObject as 
per RATIS-2028.
-      ReferenceCountedObject<?> ref = ReferenceCountedObject.wrap(next);
-      ((TransactionContextImpl) trx).setDelegatedRef(ref);
-      ref.retain();
+      ((TransactionContextImpl) trx).setDelegatedRef(nextRef);
       try {
         // Let the StateMachine inject logic for committed transactions in 
sequential order.
         trx = stateMachine.applyTransactionSerial(trx);
@@ -1828,8 +1826,6 @@ class RaftServerImpl implements RaftServer.Division,
         return replyPendingRequest(invocationId, TermIndex.valueOf(next), 
stateMachineFuture);
       } catch (Exception e) {
         throw new RaftLogIOException(e);
-      } finally {
-        ref.release();
       }
     }
     return null;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 5f6e972e2..b01270dc0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -235,10 +235,17 @@ class StateMachineUpdater implements Runnable {
     final long committed = raftLog.getLastCommittedIndex();
     for(long applied; (applied = getLastAppliedIndex()) < committed && state 
== State.RUNNING && !shouldStop(); ) {
       final long nextIndex = applied + 1;
-      final LogEntryProto next = raftLog.get(nextIndex);
-      if (next != null) {
+      final ReferenceCountedObject<LogEntryProto> next = 
raftLog.retainLog(nextIndex);
+      if (next == null) {
+        LOG.debug("{}: logEntry {} is null. There may be snapshot to load. 
state:{}",
+            this, nextIndex, state);
+        break;
+      }
+
+      try {
+        final LogEntryProto entry = next.get();
         if (LOG.isTraceEnabled()) {
-          LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex, 
LogProtoUtils.toLogEntryString(next));
+          LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex, 
LogProtoUtils.toLogEntryString(entry));
         } else {
           LOG.debug("{}: applying nextIndex={}", this, nextIndex);
         }
@@ -252,10 +259,8 @@ class StateMachineUpdater implements Runnable {
         } else {
           notifyAppliedIndex(incremented);
         }
-      } else {
-        LOG.debug("{}: logEntry {} is null. There may be snapshot to load. 
state:{}",
-            this, nextIndex, state);
-        break;
+      } finally {
+        next.release();
       }
     }
     return futures;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 49e66e253..0a9a1c93c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -240,13 +240,19 @@ public abstract class RaftLogBase implements RaftLog {
       //log neither lastMetadataEntry, nor entries with a smaller commit index.
       return false;
     }
+    ReferenceCountedObject<LogEntryProto> ref = null;
     try {
-      if (get(newCommitIndex).hasMetadataEntry()) {
+      ref = retainLog(newCommitIndex);
+      if (ref.get().hasMetadataEntry()) {
         // do not log the metadata entry
         return false;
       }
     } catch(RaftLogIOException e) {
       LOG.error("Failed to get log entry for index " + newCommitIndex, e);
+    } finally {
+      if (ref != null) {
+        ref.release();
+      }
     }
     return true;
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index fc7973aab..feedaeee4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -22,8 +22,10 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.server.metrics.RaftLogMetricsBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.raftlog.LogEntryHeader;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -45,8 +47,13 @@ public class MemoryRaftLog extends RaftLogBase {
   static class EntryList {
     private final List<ReferenceCountedObject<LogEntryProto>> entries = new 
ArrayList<>();
 
+    ReferenceCountedObject<LogEntryProto> getRef(int i) {
+      return i >= 0 && i < entries.size() ? entries.get(i) : null;
+    }
+
     LogEntryProto get(int i) {
-      return i >= 0 && i < entries.size() ? entries.get(i).get() : null;
+      final ReferenceCountedObject<LogEntryProto> ref = getRef(i);
+      return ref != null ? ref.get() : null;
     }
 
     TermIndex getTermIndex(int i) {
@@ -108,16 +115,34 @@ public class MemoryRaftLog extends RaftLogBase {
   }
 
   @Override
-  public LogEntryProto get(long index) {
+  public LogEntryProto get(long index) throws RaftLogIOException {
+    final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
+    try {
+      return LogProtoUtils.copy(ref.get());
+    } finally {
+      ref.release();
+    }
+  }
+
+  @Override
+  public ReferenceCountedObject<LogEntryProto> retainLog(long index) {
     checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      return entries.get(Math.toIntExact(index));
+    try (AutoCloseableLock readLock = readLock()) {
+      ReferenceCountedObject<LogEntryProto> ref = 
entries.getRef(Math.toIntExact(index));
+      ref.retain();
+      return ref;
     }
   }
 
   @Override
   public EntryWithData getEntryWithData(long index) {
-    return newEntryWithData(get(index), null);
+    // TODO. The reference counted object should be passed to LogAppender 
RATIS-2026.
+    ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
+    try {
+      return newEntryWithData(ref.get(), null);
+    } finally {
+      ref.release();
+    }
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 68da35014..2fcd7914e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -224,7 +224,7 @@ public final class LogSegment {
    *
    * In the future we can make the cache loader configurable if necessary.
    */
-  class LogEntryLoader extends CacheLoader<LogRecord, LogEntryProto> {
+  class LogEntryLoader extends CacheLoader<LogRecord, 
ReferenceCountedObject<LogEntryProto>> {
     private final SegmentedRaftLogMetrics raftLogMetrics;
 
     LogEntryLoader(SegmentedRaftLogMetrics raftLogMetrics) {
@@ -232,18 +232,19 @@ public final class LogSegment {
     }
 
     @Override
-    public LogEntryProto load(LogRecord key) throws IOException {
+    public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws 
IOException {
       final File file = getFile();
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
-      final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
+      final AtomicReference<ReferenceCountedObject<LogEntryProto>> toReturn = 
new AtomicReference<>();
       final LogSegmentStartEnd startEnd = 
LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
       readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), 
raftLogMetrics, entryRef -> {
         final LogEntryProto entry = entryRef.retain();
         final TermIndex ti = TermIndex.valueOf(entry);
         putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
         if (ti.equals(key.getTermIndex())) {
-          toReturn.set(entry);
+          entryRef.retain();
+          toReturn.set(entryRef);
         }
         entryRef.release();
       });
@@ -260,10 +261,8 @@ public final class LogSegment {
       return size.get();
     }
 
-    LogEntryProto get(TermIndex ti) {
-      return Optional.ofNullable(map.get(ti))
-          .map(ReferenceCountedObject::get)
-          .orElse(null);
+    ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
+      return map.get(ti);
     }
 
     void clear() {
@@ -386,15 +385,15 @@ public final class LogSegment {
     return record;
   }
 
-  LogEntryProto getEntryFromCache(TermIndex ti) {
+  ReferenceCountedObject<LogEntryProto> getEntryFromCache(TermIndex ti) {
     return entryCache.get(ti);
   }
 
   /**
    * Acquire LogSegment's monitor so that there is no concurrent loading.
    */
-  synchronized LogEntryProto loadCache(LogRecord record) throws 
RaftLogIOException {
-    LogEntryProto entry = entryCache.get(record.getTermIndex());
+  synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord 
record) throws RaftLogIOException {
+    ReferenceCountedObject<LogEntryProto> entry = 
entryCache.get(record.getTermIndex());
     if (entry != null) {
       return entry;
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index baac0c6c7..bb0793abe 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -180,11 +180,17 @@ public final class SegmentedRaftLog extends RaftLogBase {
 
       @Override
       public void notifyTruncatedLogEntry(TermIndex ti) {
+        ReferenceCountedObject<LogEntryProto> ref = null;
         try {
-          final LogEntryProto entry = get(ti.getIndex());
+          ref = retainLog(ti.getIndex());
+          final LogEntryProto entry = ref != null ? ref.get() : null;
           notifyTruncatedLogEntry.accept(entry);
         } catch (RaftLogIOException e) {
           LOG.error("{}: Failed to read log {}", getName(), ti, e);
+        } finally {
+          if (ref != null) {
+            ref.release();
+          }
         }
       }
 
@@ -272,6 +278,19 @@ public final class SegmentedRaftLog extends RaftLogBase {
 
   @Override
   public LogEntryProto get(long index) throws RaftLogIOException {
+    final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
+    if (ref == null) {
+      return null;
+    }
+    try {
+      return LogProtoUtils.copy(ref.get());
+    } finally {
+      ref.release();
+    }
+  }
+
+  @Override
+  public ReferenceCountedObject<LogEntryProto> retainLog(long index) throws 
RaftLogIOException {
     checkLogState();
     final LogSegment segment;
     final LogRecord record;
@@ -284,9 +303,10 @@ public final class SegmentedRaftLog extends RaftLogBase {
       if (record == null) {
         return null;
       }
-      final LogEntryProto entry = 
segment.getEntryFromCache(record.getTermIndex());
+      final ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(record.getTermIndex());
       if (entry != null) {
         getRaftLogMetrics().onRaftLogCacheHit();
+        entry.retain();
         return entry;
       }
     }
@@ -299,10 +319,19 @@ public final class SegmentedRaftLog extends RaftLogBase {
 
   @Override
   public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
-    final LogEntryProto entry = get(index);
-    if (entry == null) {
+    final ReferenceCountedObject<LogEntryProto> entryRef = retainLog(index);
+    if (entryRef == null) {
       throw new RaftLogIOException("Log entry not found: index = " + index);
     }
+    try {
+      // TODO. The reference counted object should be passed to LogAppender 
RATIS-2026.
+      return getEntryWithData(entryRef.get());
+    } finally {
+      entryRef.release();
+    }
+  }
+
+  private EntryWithData getEntryWithData(LogEntryProto entry) throws 
RaftLogIOException {
     if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
       return newEntryWithData(entry, null);
     }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index bb4f6a076..ee30bd2c7 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -21,12 +21,15 @@ import static 
org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.RAFT_LOG_F
 import static 
org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.RATIS_LOG_WORKER_METRICS;
 
 import org.apache.ratis.metrics.RatisMetrics;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogBase;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.ReferenceCountedObject;
 
 import java.io.File;
 import java.io.IOException;
@@ -72,11 +75,22 @@ public interface RaftStorageTestUtils {
       b.append(i == committed? 'c': ' ');
       b.append(String.format("%3d: ", i));
       try {
-        b.append(LogProtoUtils.toLogEntryString(log.get(i)));
+        b.append(LogProtoUtils.toLogEntryString(getLogUnsafe(log, i)));
       } catch (RaftLogIOException e) {
         b.append(e);
       }
       println.accept(b.toString());
     }
   }
+
+  static LogEntryProto getLogUnsafe(RaftLog log, long index) throws 
RaftLogIOException {
+    ReferenceCountedObject<LogEntryProto> ref = log.retainLog(index);
+    try {
+      return ref != null ? ref.get() : null;
+    } finally {
+      if (ref != null) {
+        ref.release();
+      }
+    }
+  }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index fe1a97ddc..9a716cad3 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHIN
 import static 
org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS_DESC;
 import static 
org.apache.ratis.server.impl.StateMachineMetrics.STATEMACHINE_TAKE_SNAPSHOT_TIMER;
 import static 
org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
+import static 
org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe;
 
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.metrics.LongCounter;
@@ -43,6 +44,7 @@ import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.storage.RaftStorageTestUtils;
 import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.util.FileUtils;
@@ -95,7 +97,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
   public static void assertLogContent(RaftServer.Division server, boolean 
isLeader) throws Exception {
     final RaftLog log = server.getRaftLog();
     final long lastIndex = log.getLastEntryTermIndex().getIndex();
-    final LogEntryProto e = log.get(lastIndex);
+    final LogEntryProto e = getLogUnsafe(log, lastIndex);
     Assert.assertTrue(e.hasMetadataEntry());
 
     JavaUtils.attemptRepeatedly(() -> {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 2970bbef8..47138919d 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -69,6 +69,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static 
org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe;
+
 public interface DataStreamTestUtils {
   Logger LOG = LoggerFactory.getLogger(DataStreamTestUtils.class);
 
@@ -383,7 +385,7 @@ public interface DataStreamTestUtils {
 
   static LogEntryProto searchLogEntry(ClientInvocationId invocationId, RaftLog 
log) throws Exception {
     for (LogEntryHeader termIndex : log.getEntries(0, Long.MAX_VALUE)) {
-      final LogEntryProto entry = log.get(termIndex.getIndex());
+      final LogEntryProto entry = getLogUnsafe(log, termIndex.getIndex());
       if (entry.hasStateMachineLogEntry()) {
         if (invocationId.match(entry.getStateMachineLogEntry())) {
           return entry;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index db4e92b7c..11311f360 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -65,6 +65,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static 
org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe;
+
 /**
  * Test restarting raft peers.
  */
@@ -268,10 +270,10 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
 
     final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
     LOG.info("{}: leader lastIndex={}", leaderId, lastIndex);
-    final LogEntryProto lastEntry = leaderLog.get(lastIndex);
+    final LogEntryProto lastEntry = getLogUnsafe(leaderLog, lastIndex);
     LOG.info("{}: leader lastEntry entry[{}] = {}", leaderId, lastIndex, 
LogProtoUtils.toLogEntryString(lastEntry));
     final long loggedCommitIndex = 
lastEntry.getMetadataEntry().getCommitIndex();
-    final LogEntryProto lastCommittedEntry = leaderLog.get(loggedCommitIndex);
+    final LogEntryProto lastCommittedEntry = getLogUnsafe(leaderLog, 
loggedCommitIndex);
     LOG.info("{}: leader lastCommittedEntry = entry[{}] = {}",
         leaderId, loggedCommitIndex, 
LogProtoUtils.toLogEntryString(lastCommittedEntry));
 
@@ -317,11 +319,11 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
   static void assertLastLogEntry(RaftServer.Division server) throws 
RaftLogIOException {
     final RaftLog raftLog = server.getRaftLog();
     final long lastIndex = raftLog.getLastEntryTermIndex().getIndex();
-    final LogEntryProto lastEntry = raftLog.get(lastIndex);
+    final LogEntryProto lastEntry = getLogUnsafe(raftLog, lastIndex);
     Assertions.assertTrue(lastEntry.hasMetadataEntry());
 
     final long loggedCommitIndex = 
lastEntry.getMetadataEntry().getCommitIndex();
-    final LogEntryProto lastCommittedEntry = raftLog.get(loggedCommitIndex);
+    final LogEntryProto lastCommittedEntry = getLogUnsafe(raftLog, 
loggedCommitIndex);
     Assertions.assertTrue(lastCommittedEntry.hasStateMachineLogEntry());
 
     final SimpleStateMachine4Testing leaderStateMachine = 
SimpleStateMachine4Testing.get(server);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 8355c6733..7692ad06b 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -141,11 +141,11 @@ public class TestLogSegment extends BaseTest {
       Assertions.assertEquals(term, ti.getTerm());
       Assertions.assertEquals(offset, record.getOffset());
 
-      LogEntryProto entry = segment.getEntryFromCache(ti);
+      ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(ti);
       if (entry == null) {
         entry = segment.loadCache(record);
       }
-      offset += getEntrySize(entry, 
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      offset += getEntrySize(entry.get(), 
Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
     }
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 55fd6fbdf..7b20babf4 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -74,6 +74,7 @@ import org.slf4j.event.Level;
 
 import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.TRUE;
+import static 
org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
@@ -204,7 +205,7 @@ public class TestSegmentedRaftLog extends BaseTest {
 
   private LogEntryProto getLastEntry(SegmentedRaftLog raftLog)
       throws IOException {
-    return raftLog.get(raftLog.getLastEntryTermIndex().getIndex());
+    return getLogUnsafe(raftLog, raftLog.getLastEntryTermIndex().getIndex());
   }
 
   @ParameterizedTest
@@ -229,7 +230,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
           .map(ti -> {
             try {
-              return raftLog.get(ti.getIndex());
+              return getLogUnsafe(raftLog, ti.getIndex());
             } catch (IOException e) {
               throw new RuntimeException(e);
             }
@@ -451,7 +452,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
           .map(ti -> {
             try {
-              return raftLog.get(ti.getIndex());
+              return getLogUnsafe(raftLog, ti.getIndex());
             } catch (IOException e) {
               throw new RuntimeException(e);
             }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index fa892b200..87172323d 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -81,8 +81,8 @@ public class TestSegmentedRaftLogCache {
     for (long index = start; index <= end; index++) {
       final LogSegment segment = cache.getSegment(index);
       final LogRecord record = segment.getLogRecord(index);
-      final LogEntryProto entry = 
segment.getEntryFromCache(record.getTermIndex());
-      Assertions.assertEquals(index, entry.getIndex());
+      final ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(record.getTermIndex());
+      Assertions.assertEquals(index, entry.get().getIndex());
     }
 
     long[] offsets = new long[]{start, start + 1, start + (end - start) / 2,

Reply via email to