This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new df1d38ac4 RATIS-2151. TestRaftWithGrpc keeps failing with zero-copy. 
(#1164)
df1d38ac4 is described below

commit df1d38ac4ca06ad4891f9548a29ad437bef475fd
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Oct 11 08:00:06 2024 -0700

    RATIS-2151. TestRaftWithGrpc keeps failing with zero-copy. (#1164)
---
 .../org/apache/ratis/util/DataBlockingQueue.java   |  30 ++++++
 .../grpc/server/GrpcClientProtocolService.java     |  26 ++---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  22 ++---
 .../ratis/server/raftlog/segmented/LogSegment.java | 109 ++++++++++++++++-----
 .../server/raftlog/segmented/SegmentedRaftLog.java |  14 ++-
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |  25 ++---
 .../impl/SimpleStateMachine4Testing.java           |  21 ++--
 7 files changed, 175 insertions(+), 72 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java 
b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index e905893e5..fb0f0715c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 import java.util.function.ToLongFunction;
 
 /**
@@ -46,6 +47,8 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
   private final Condition notFull  = lock.newCondition();
   private final Condition notEmpty = lock.newCondition();
 
+  private boolean closed = false;
+
   public DataBlockingQueue(Object name, SizeInBytes byteLimit, int 
elementLimit, ToLongFunction<E> getNumBytes) {
     super(name, byteLimit, elementLimit, getNumBytes);
   }
@@ -72,10 +75,34 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
     }
   }
 
+  /** Apply the given handler to each element and then {@link #clear()}. */
+  public void clear(Consumer<E> handler) {
+    try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      for(E e : this) {
+        handler.accept(e);
+      }
+      super.clear();
+    }
+  }
+
+  /**
+   * Close this queue to stop accepting new elements, i.e. the offer(…) 
methods always return false.
+   * Note that closing the queue will not clear the existing elements.
+   * The existing elements can be peeked, polled or cleared after close.
+   */
+  public void close() {
+    try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) {
+      closed = true;
+    }
+  }
+
   @Override
   public boolean offer(E element) {
     Objects.requireNonNull(element, "element == null");
     try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+      if (closed) {
+        return false;
+      }
       if (super.offer(element)) {
         notEmpty.signal();
         return true;
@@ -95,6 +122,9 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
     long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
     try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
       for(;;) {
+        if (closed) {
+          return false;
+        }
         if (super.offer(element)) {
           notEmpty.signal();
           return true;
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 80a9a439b..b7548780c 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -29,7 +29,6 @@ import 
org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import 
org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -323,18 +322,19 @@ class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
 
     @Override
     void processClientRequest(ReferenceCountedObject<RaftClientRequest> 
requestRef) {
-      final RaftClientRequest request = requestRef.retain();
-      final long callId = request.getCallId();
-      final SlidingWindowEntry slidingWindowEntry = 
request.getSlidingWindowEntry();
-      final CompletableFuture<Void> f = processClientRequest(requestRef, reply 
-> {
-        if (!reply.isSuccess()) {
-          LOG.info("Failed request cid={}, {}, reply={}", callId, 
slidingWindowEntry, reply);
-        }
-        final RaftClientReplyProto proto = 
ClientProtoUtils.toRaftClientReplyProto(reply);
-        responseNext(proto);
-      });
-
-      requestRef.release();
+      final long callId = requestRef.retain().getCallId();
+      final CompletableFuture<Void> f;
+      try {
+        f = processClientRequest(requestRef, reply -> {
+          if (!reply.isSuccess()) {
+            LOG.info("Failed request cid={}, reply={}", callId, reply);
+          }
+          final RaftClientReplyProto proto = 
ClientProtoUtils.toRaftClientReplyProto(reply);
+          responseNext(proto);
+        });
+      } finally {
+        requestRef.release();
+      }
 
       put(callId, f);
       f.thenAccept(dummy -> remove(callId));
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7edb3ae0b..18d4c62c6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -379,7 +379,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   }
 
   private void appendLog(boolean heartbeat) throws IOException {
-    ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
+    final ReferenceCountedObject<AppendEntriesRequestProto> pending;
     final AppendEntriesRequest request;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       // Prepare and send the append request.
@@ -388,18 +388,18 @@ public class GrpcLogAppender extends LogAppenderBase {
       if (pending == null) {
         return;
       }
-      request = new AppendEntriesRequest(pending.get(), getFollowerId(), 
grpcServerMetrics);
-      pendingRequests.put(request);
-      increaseNextIndex(pending.get());
-      if (appendLogRequestObserver == null) {
-        appendLogRequestObserver = new StreamObservers(
-            getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, 
getWaitTimeMin());
-      }
-    } catch(Exception e) {
-      if (pending != null) {
+      try {
+        request = new AppendEntriesRequest(pending.get(), getFollowerId(), 
grpcServerMetrics);
+        pendingRequests.put(request);
+        increaseNextIndex(pending.get());
+        if (appendLogRequestObserver == null) {
+          appendLogRequestObserver = new StreamObservers(
+              getClient(), new AppendLogResponseHandler(), 
useSeparateHBChannel, getWaitTimeMin());
+        }
+      } catch (Exception e) {
         pending.release();
+        throw e;
       }
-      throw e;
     }
 
     try {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index c51464f9e..a88ade587 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -29,6 +29,7 @@ import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesti
 import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
@@ -38,10 +39,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -283,47 +284,103 @@ public final class LogSegment {
         final TermIndex ti = TermIndex.valueOf(entry);
         putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
         if (ti.equals(key.getTermIndex())) {
-          entryRef.retain();
           toReturn.set(entryRef);
+        } else {
+          entryRef.release();
         }
-        entryRef.release();
       });
       loadingTimes.incrementAndGet();
       return Objects.requireNonNull(toReturn.get());
     }
   }
 
-  static class EntryCache {
-    private final Map<TermIndex, ReferenceCountedObject<LogEntryProto>> map = 
new ConcurrentHashMap<>();
+  private static class Item {
+    private final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref;
+    private final long serializedSize;
+
+    Item(ReferenceCountedObject<LogEntryProto> obj, long serializedSize) {
+      this.ref = new AtomicReference<>(obj);
+      this.serializedSize = serializedSize;
+    }
+
+    ReferenceCountedObject<LogEntryProto> get() {
+      return ref.get();
+    }
+
+    long release() {
+      final ReferenceCountedObject<LogEntryProto> entry = ref.getAndSet(null);
+      if (entry == null) {
+        return 0;
+      }
+      entry.release();
+      return serializedSize;
+    }
+  }
+
+  class EntryCache {
+    private Map<TermIndex, Item> map = new HashMap<>();
     private final AtomicLong size = new AtomicLong();
 
+    @Override
+    public String toString() {
+      return JavaUtils.getClassSimpleName(getClass()) + "-" + LogSegment.this;
+    }
+
     long size() {
       return size.get();
     }
 
-    ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
-      return map.get(ti);
+    synchronized ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
+      if (map == null) {
+        return null;
+      }
+      final Item ref = map.get(ti);
+      return ref == null? null: ref.get();
     }
 
-    void clear() {
-      map.values().forEach(ReferenceCountedObject::release);
-      map.clear();
-      size.set(0);
+    /** After close(), the cache CANNOT be used again. */
+    synchronized void close() {
+      if (map == null) {
+        return;
+      }
+      evict();
+      map = null;
+      LOG.info("Successfully closed {}", this);
     }
 
-    void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op 
op) {
+    /** After evict(), the cache can be used again. */
+    synchronized void evict() {
+      if (map == null) {
+        return;
+      }
+      for (Iterator<Map.Entry<TermIndex, Item>> i = map.entrySet().iterator(); 
i.hasNext(); i.remove()) {
+        release(i.next().getValue());
+      }
+    }
+
+    synchronized void put(TermIndex key, ReferenceCountedObject<LogEntryProto> 
valueRef, Op op) {
+      if (map == null) {
+        return;
+      }
       valueRef.retain();
-      Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release);
-      size.getAndAdd(getEntrySize(valueRef.get(), op));
+      final long serializedSize = getEntrySize(valueRef.get(), op);
+      release(map.put(key,  new Item(valueRef, serializedSize)));
+      size.getAndAdd(serializedSize);
     }
 
-    private void release(ReferenceCountedObject<LogEntryProto> entry) {
-      size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE));
-      entry.release();
+    private void release(Item ref) {
+      if (ref == null) {
+        return;
+      }
+      final long serializedSize = ref.release();
+      size.getAndAdd(-serializedSize);
     }
 
-    void remove(TermIndex key) {
-      Optional.ofNullable(map.remove(key)).ifPresent(this::release);
+    synchronized void remove(TermIndex key) {
+      if (map == null) {
+        return;
+      }
+      release(map.remove(key));
     }
   }
 
@@ -433,7 +490,13 @@ public final class LogSegment {
   synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord 
record) throws RaftLogIOException {
     ReferenceCountedObject<LogEntryProto> entry = 
entryCache.get(record.getTermIndex());
     if (entry != null) {
-      return entry;
+      try {
+        entry.retain();
+        return entry;
+      } catch (IllegalStateException ignored) {
+        // The entry could be removed from the cache and released.
+        // The exception can be safely ignored since it is the same as cache 
miss.
+      }
     }
     try {
       return cacheLoader.load(record);
@@ -505,7 +568,7 @@ public final class LogSegment {
 
   synchronized void clear() {
     records.clear();
-    evictCache();
+    entryCache.close();
     endIndex = startIndex - 1;
   }
 
@@ -514,7 +577,7 @@ public final class LogSegment {
   }
 
   void evictCache() {
-    entryCache.clear();
+    entryCache.evict();
   }
 
   void putEntryCache(TermIndex key, ReferenceCountedObject<LogEntryProto> 
valueRef, Op op) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 44b9c7599..485eb53d1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -306,9 +306,14 @@ public final class SegmentedRaftLog extends RaftLogBase {
     }
     final ReferenceCountedObject<LogEntryProto> entry = 
segment.getEntryFromCache(record.getTermIndex());
     if (entry != null) {
-      getRaftLogMetrics().onRaftLogCacheHit();
-      entry.retain();
-      return entry;
+      try {
+        entry.retain();
+        getRaftLogMetrics().onRaftLogCacheHit();
+        return entry;
+      } catch (IllegalStateException ignored) {
+        // The entry could be removed from the cache and released.
+        // The exception can be safely ignored since it is the same as cache 
miss.
+      }
     }
 
     // the entry is not in the segment's cache. Load the cache without holding 
the lock.
@@ -346,6 +351,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
     } catch (Exception e) {
       final String err = getName() + ": Failed readStateMachineData for " + 
toLogEntryString(entry);
       LOG.error(err, e);
+      entryRef.release();
       throw new RaftLogIOException(err, 
JavaUtils.unwrapCompletionException(e));
     }
   }
@@ -558,6 +564,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
   @Override
   public void close() throws IOException {
     try(AutoCloseableLock writeLock = writeLock()) {
+      LOG.info("Start closing {}", this);
       super.close();
       cacheEviction.close();
       cache.close();
@@ -565,6 +572,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
     fileLogWorker.close();
     storage.close();
     getRaftLogMetrics().unregister();
+    LOG.info("Successfully closed {}", this);
   }
 
   SegmentedRaftLogCache getRaftLogCache() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9ed3a1b76..a3d13de9f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -51,6 +51,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -242,10 +243,11 @@ class SegmentedRaftLogWorker {
   }
 
   void close() {
+    queue.close();
     this.running = false;
+    ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_MINUTE, 
workerThreadExecutor,
+        timeout -> LOG.warn("{}: shutdown timeout in {}", name, timeout));
     Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
-    ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3),
-        workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + 
timeout, name));
     IOUtils.cleanup(LOG, out);
     PlatformDependent.freeDirectBuffer(writeBuffer);
     LOG.info("{} close()", name);
@@ -341,7 +343,7 @@ class SegmentedRaftLogWorker {
         LOG.info(Thread.currentThread().getName()
             + " was interrupted, exiting. There are " + queue.getNumElements()
             + " tasks remaining in the queue.");
-        return;
+        break;
       } catch (Exception e) {
         if (!running) {
           LOG.info("{} got closed and hit exception",
@@ -352,6 +354,8 @@ class SegmentedRaftLogWorker {
         }
       }
     }
+
+    queue.clear(Task::discard);
   }
 
   private boolean shouldFlush() {
@@ -494,7 +498,7 @@ class SegmentedRaftLogWorker {
     private final LogEntryProto entry;
     private final CompletableFuture<?> stateMachineFuture;
     private final CompletableFuture<Long> combined;
-    private final ReferenceCountedObject<LogEntryProto> ref;
+    private final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref = 
new AtomicReference<>();
 
     WriteLog(ReferenceCountedObject<LogEntryProto> entryRef, LogEntryProto 
removedStateMachineData,
         TransactionContext context) {
@@ -512,7 +516,7 @@ class SegmentedRaftLogWorker {
           this.stateMachineFuture = null;
         }
         entryRef.retain();
-        this.ref = entryRef;
+        this.ref.set(entryRef);
       } else {
         try {
           // this.entry != origEntry if it has state machine data
@@ -522,7 +526,6 @@ class SegmentedRaftLogWorker {
               + ", entry=" + LogProtoUtils.toLogEntryString(origEntry, 
stateMachine::toStateMachineLogEntryString), e);
           throw e;
         }
-        this.ref = null;
       }
       this.combined = stateMachineFuture == null? super.getFuture()
           : super.getFuture().thenCombine(stateMachineFuture, (index, 
stateMachineResult) -> index);
@@ -532,6 +535,7 @@ class SegmentedRaftLogWorker {
     void failed(IOException e) {
       stateMachine.event().notifyLogFailed(e, entry);
       super.failed(e);
+      discard();
     }
 
     @Override
@@ -547,15 +551,14 @@ class SegmentedRaftLogWorker {
     @Override
     void done() {
       writeTasks.offerOrCompleteFuture(this);
-      if (ref != null) {
-        ref.release();
-      }
+      discard();
     }
 
     @Override
     void discard() {
-      if (ref != null) {
-        ref.release();
+      final ReferenceCountedObject<LogEntryProto> entryRef = 
ref.getAndSet(null);
+      if (entryRef != null) {
+        entryRef.release();
       }
     }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 18e4f2eca..74bc0c535 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -83,7 +83,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     return (SimpleStateMachine4Testing)s.getStateMachine();
   }
 
-  private final SortedMap<Long, ReferenceCountedObject<LogEntryProto>> 
indexMap = Collections.synchronizedSortedMap(new TreeMap<>());
+  private final SortedMap<Long, LogEntryProto> indexMap = 
Collections.synchronizedSortedMap(new TreeMap<>());
   private final SortedMap<String, LogEntryProto> dataMap = 
Collections.synchronizedSortedMap(new TreeMap<>());
   private final Daemon checkpointer;
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
@@ -198,9 +198,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     return leaderElectionTimeoutInfo;
   }
 
-  private void put(ReferenceCountedObject<LogEntryProto> entryRef) {
-    LogEntryProto entry = entryRef.retain();
-    final ReferenceCountedObject<LogEntryProto> previous = 
indexMap.put(entry.getIndex(), entryRef);
+  private void put(LogEntryProto entry) {
+    final LogEntryProto previous = indexMap.put(entry.getIndex(), entry);
     Preconditions.assertNull(previous, "previous");
     final String s = 
entry.getStateMachineLogEntry().getLogData().toStringUtf8();
     dataMap.put(s, entry);
@@ -250,7 +249,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     LogEntryProto entry = entryRef.get();
     LOG.info("applyTransaction for log index {}", entry.getIndex());
 
-    put(entryRef);
+    put(LogProtoUtils.copy(entry));
     updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
 
     final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK");
@@ -270,8 +269,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile);
     try (SegmentedRaftLogOutputStream out = new 
SegmentedRaftLogOutputStream(snapshotFile, false,
         segmentMaxSize, preallocatedSize, 
ByteBuffer.allocateDirect(bufferSize))) {
-      for (final ReferenceCountedObject<LogEntryProto> entryRef : 
indexMap.values()) {
-        LogEntryProto entry = entryRef.get();
+      for (final LogEntryProto entry : indexMap.values()) {
         if (entry.getIndex() > endIndex) {
           break;
         } else {
@@ -306,7 +304,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
           snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
         LogEntryProto entry;
         while ((entry = in.nextEntry()) != null) {
-          put(ReferenceCountedObject.wrap(entry));
+          put(entry);
           updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
         }
       }
@@ -335,7 +333,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
       LOG.info("query {}, all available: {}", string, dataMap.keySet());
       final LogEntryProto entry = dataMap.get(string);
       if (entry != null) {
-        return 
CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+        return CompletableFuture.completedFuture(Message.valueOf(entry));
       }
       exception = new IndexOutOfBoundsException(getId() + ": LogEntry not 
found for query " + string);
     } catch (Exception e) {
@@ -381,11 +379,12 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
       running = false;
       checkpointer.interrupt();
     });
-    indexMap.values().forEach(ReferenceCountedObject::release);
+    indexMap.clear();
+    dataMap.clear();
   }
 
   public LogEntryProto[] getContent() {
-    return 
indexMap.values().stream().map(ReferenceCountedObject::get).toArray(LogEntryProto[]::new);
+    return indexMap.values().toArray(new LogEntryProto[0]);
   }
 
   public void blockStartTransaction() {

Reply via email to