This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new df1d38ac4 RATIS-2151. TestRaftWithGrpc keeps failing with zero-copy.
(#1164)
df1d38ac4 is described below
commit df1d38ac4ca06ad4891f9548a29ad437bef475fd
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Oct 11 08:00:06 2024 -0700
RATIS-2151. TestRaftWithGrpc keeps failing with zero-copy. (#1164)
---
.../org/apache/ratis/util/DataBlockingQueue.java | 30 ++++++
.../grpc/server/GrpcClientProtocolService.java | 26 ++---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 22 ++---
.../ratis/server/raftlog/segmented/LogSegment.java | 109 ++++++++++++++++-----
.../server/raftlog/segmented/SegmentedRaftLog.java | 14 ++-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 25 ++---
.../impl/SimpleStateMachine4Testing.java | 21 ++--
7 files changed, 175 insertions(+), 72 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index e905893e5..fb0f0715c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.ToLongFunction;
/**
@@ -46,6 +47,8 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
+ private boolean closed = false;
+
public DataBlockingQueue(Object name, SizeInBytes byteLimit, int
elementLimit, ToLongFunction<E> getNumBytes) {
super(name, byteLimit, elementLimit, getNumBytes);
}
@@ -72,10 +75,34 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
}
}
+ /** Apply the given handler to each element and then {@link #clear()}. */
+ public void clear(Consumer<E> handler) {
+ try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+ for(E e : this) {
+ handler.accept(e);
+ }
+ super.clear();
+ }
+ }
+
+ /**
+ * Close this queue to stop accepting new elements, i.e. the offer(…)
methods always return false.
+ * Note that closing the queue will not clear the existing elements.
+ * The existing elements can be peeked, polled or cleared after close.
+ */
+ public void close() {
+ try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) {
+ closed = true;
+ }
+ }
+
@Override
public boolean offer(E element) {
Objects.requireNonNull(element, "element == null");
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+ if (closed) {
+ return false;
+ }
if (super.offer(element)) {
notEmpty.signal();
return true;
@@ -95,6 +122,9 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(;;) {
+ if (closed) {
+ return false;
+ }
if (super.offer(element)) {
notEmpty.signal();
return true;
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 80a9a439b..b7548780c 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -29,7 +29,6 @@ import
org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import
org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
@@ -323,18 +322,19 @@ class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase {
@Override
void processClientRequest(ReferenceCountedObject<RaftClientRequest>
requestRef) {
- final RaftClientRequest request = requestRef.retain();
- final long callId = request.getCallId();
- final SlidingWindowEntry slidingWindowEntry =
request.getSlidingWindowEntry();
- final CompletableFuture<Void> f = processClientRequest(requestRef, reply
-> {
- if (!reply.isSuccess()) {
- LOG.info("Failed request cid={}, {}, reply={}", callId,
slidingWindowEntry, reply);
- }
- final RaftClientReplyProto proto =
ClientProtoUtils.toRaftClientReplyProto(reply);
- responseNext(proto);
- });
-
- requestRef.release();
+ final long callId = requestRef.retain().getCallId();
+ final CompletableFuture<Void> f;
+ try {
+ f = processClientRequest(requestRef, reply -> {
+ if (!reply.isSuccess()) {
+ LOG.info("Failed request cid={}, reply={}", callId, reply);
+ }
+ final RaftClientReplyProto proto =
ClientProtoUtils.toRaftClientReplyProto(reply);
+ responseNext(proto);
+ });
+ } finally {
+ requestRef.release();
+ }
put(callId, f);
f.thenAccept(dummy -> remove(callId));
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7edb3ae0b..18d4c62c6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -379,7 +379,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
private void appendLog(boolean heartbeat) throws IOException {
- ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
+ final ReferenceCountedObject<AppendEntriesRequestProto> pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
@@ -388,18 +388,18 @@ public class GrpcLogAppender extends LogAppenderBase {
if (pending == null) {
return;
}
- request = new AppendEntriesRequest(pending.get(), getFollowerId(),
grpcServerMetrics);
- pendingRequests.put(request);
- increaseNextIndex(pending.get());
- if (appendLogRequestObserver == null) {
- appendLogRequestObserver = new StreamObservers(
- getClient(), new AppendLogResponseHandler(), useSeparateHBChannel,
getWaitTimeMin());
- }
- } catch(Exception e) {
- if (pending != null) {
+ try {
+ request = new AppendEntriesRequest(pending.get(), getFollowerId(),
grpcServerMetrics);
+ pendingRequests.put(request);
+ increaseNextIndex(pending.get());
+ if (appendLogRequestObserver == null) {
+ appendLogRequestObserver = new StreamObservers(
+ getClient(), new AppendLogResponseHandler(),
useSeparateHBChannel, getWaitTimeMin());
+ }
+ } catch (Exception e) {
pending.release();
+ throw e;
}
- throw e;
}
try {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index c51464f9e..a88ade587 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -29,6 +29,7 @@ import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
@@ -38,10 +39,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -283,47 +284,103 @@ public final class LogSegment {
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
- entryRef.retain();
toReturn.set(entryRef);
+ } else {
+ entryRef.release();
}
- entryRef.release();
});
loadingTimes.incrementAndGet();
return Objects.requireNonNull(toReturn.get());
}
}
- static class EntryCache {
- private final Map<TermIndex, ReferenceCountedObject<LogEntryProto>> map =
new ConcurrentHashMap<>();
+ private static class Item {
+ private final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref;
+ private final long serializedSize;
+
+ Item(ReferenceCountedObject<LogEntryProto> obj, long serializedSize) {
+ this.ref = new AtomicReference<>(obj);
+ this.serializedSize = serializedSize;
+ }
+
+ ReferenceCountedObject<LogEntryProto> get() {
+ return ref.get();
+ }
+
+ long release() {
+ final ReferenceCountedObject<LogEntryProto> entry = ref.getAndSet(null);
+ if (entry == null) {
+ return 0;
+ }
+ entry.release();
+ return serializedSize;
+ }
+ }
+
+ class EntryCache {
+ private Map<TermIndex, Item> map = new HashMap<>();
private final AtomicLong size = new AtomicLong();
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass()) + "-" + LogSegment.this;
+ }
+
long size() {
return size.get();
}
- ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
- return map.get(ti);
+ synchronized ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
+ if (map == null) {
+ return null;
+ }
+ final Item ref = map.get(ti);
+ return ref == null? null: ref.get();
}
- void clear() {
- map.values().forEach(ReferenceCountedObject::release);
- map.clear();
- size.set(0);
+ /** After close(), the cache CANNOT be used again. */
+ synchronized void close() {
+ if (map == null) {
+ return;
+ }
+ evict();
+ map = null;
+ LOG.info("Successfully closed {}", this);
}
- void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op
op) {
+ /** After evict(), the cache can be used again. */
+ synchronized void evict() {
+ if (map == null) {
+ return;
+ }
+ for (Iterator<Map.Entry<TermIndex, Item>> i = map.entrySet().iterator();
i.hasNext(); i.remove()) {
+ release(i.next().getValue());
+ }
+ }
+
+ synchronized void put(TermIndex key, ReferenceCountedObject<LogEntryProto>
valueRef, Op op) {
+ if (map == null) {
+ return;
+ }
valueRef.retain();
- Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release);
- size.getAndAdd(getEntrySize(valueRef.get(), op));
+ final long serializedSize = getEntrySize(valueRef.get(), op);
+ release(map.put(key, new Item(valueRef, serializedSize)));
+ size.getAndAdd(serializedSize);
}
- private void release(ReferenceCountedObject<LogEntryProto> entry) {
- size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE));
- entry.release();
+ private void release(Item ref) {
+ if (ref == null) {
+ return;
+ }
+ final long serializedSize = ref.release();
+ size.getAndAdd(-serializedSize);
}
- void remove(TermIndex key) {
- Optional.ofNullable(map.remove(key)).ifPresent(this::release);
+ synchronized void remove(TermIndex key) {
+ if (map == null) {
+ return;
+ }
+ release(map.remove(key));
}
}
@@ -433,7 +490,13 @@ public final class LogSegment {
synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord
record) throws RaftLogIOException {
ReferenceCountedObject<LogEntryProto> entry =
entryCache.get(record.getTermIndex());
if (entry != null) {
- return entry;
+ try {
+ entry.retain();
+ return entry;
+ } catch (IllegalStateException ignored) {
+ // The entry could be removed from the cache and released.
+ // The exception can be safely ignored since it is the same as cache
miss.
+ }
}
try {
return cacheLoader.load(record);
@@ -505,7 +568,7 @@ public final class LogSegment {
synchronized void clear() {
records.clear();
- evictCache();
+ entryCache.close();
endIndex = startIndex - 1;
}
@@ -514,7 +577,7 @@ public final class LogSegment {
}
void evictCache() {
- entryCache.clear();
+ entryCache.evict();
}
void putEntryCache(TermIndex key, ReferenceCountedObject<LogEntryProto>
valueRef, Op op) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 44b9c7599..485eb53d1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -306,9 +306,14 @@ public final class SegmentedRaftLog extends RaftLogBase {
}
final ReferenceCountedObject<LogEntryProto> entry =
segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
- getRaftLogMetrics().onRaftLogCacheHit();
- entry.retain();
- return entry;
+ try {
+ entry.retain();
+ getRaftLogMetrics().onRaftLogCacheHit();
+ return entry;
+ } catch (IllegalStateException ignored) {
+ // The entry could be removed from the cache and released.
+ // The exception can be safely ignored since it is the same as cache
miss.
+ }
}
// the entry is not in the segment's cache. Load the cache without holding
the lock.
@@ -346,6 +351,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " +
toLogEntryString(entry);
LOG.error(err, e);
+ entryRef.release();
throw new RaftLogIOException(err,
JavaUtils.unwrapCompletionException(e));
}
}
@@ -558,6 +564,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
@Override
public void close() throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
+ LOG.info("Start closing {}", this);
super.close();
cacheEviction.close();
cache.close();
@@ -565,6 +572,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
fileLogWorker.close();
storage.close();
getRaftLogMetrics().unregister();
+ LOG.info("Successfully closed {}", this);
}
SegmentedRaftLogCache getRaftLogCache() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 9ed3a1b76..a3d13de9f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -51,6 +51,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -242,10 +243,11 @@ class SegmentedRaftLogWorker {
}
void close() {
+ queue.close();
this.running = false;
+ ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_MINUTE,
workerThreadExecutor,
+ timeout -> LOG.warn("{}: shutdown timeout in {}", name, timeout));
Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
- ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3),
- workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " +
timeout, name));
IOUtils.cleanup(LOG, out);
PlatformDependent.freeDirectBuffer(writeBuffer);
LOG.info("{} close()", name);
@@ -341,7 +343,7 @@ class SegmentedRaftLogWorker {
LOG.info(Thread.currentThread().getName()
+ " was interrupted, exiting. There are " + queue.getNumElements()
+ " tasks remaining in the queue.");
- return;
+ break;
} catch (Exception e) {
if (!running) {
LOG.info("{} got closed and hit exception",
@@ -352,6 +354,8 @@ class SegmentedRaftLogWorker {
}
}
}
+
+ queue.clear(Task::discard);
}
private boolean shouldFlush() {
@@ -494,7 +498,7 @@ class SegmentedRaftLogWorker {
private final LogEntryProto entry;
private final CompletableFuture<?> stateMachineFuture;
private final CompletableFuture<Long> combined;
- private final ReferenceCountedObject<LogEntryProto> ref;
+ private final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref =
new AtomicReference<>();
WriteLog(ReferenceCountedObject<LogEntryProto> entryRef, LogEntryProto
removedStateMachineData,
TransactionContext context) {
@@ -512,7 +516,7 @@ class SegmentedRaftLogWorker {
this.stateMachineFuture = null;
}
entryRef.retain();
- this.ref = entryRef;
+ this.ref.set(entryRef);
} else {
try {
// this.entry != origEntry if it has state machine data
@@ -522,7 +526,6 @@ class SegmentedRaftLogWorker {
+ ", entry=" + LogProtoUtils.toLogEntryString(origEntry,
stateMachine::toStateMachineLogEntryString), e);
throw e;
}
- this.ref = null;
}
this.combined = stateMachineFuture == null? super.getFuture()
: super.getFuture().thenCombine(stateMachineFuture, (index,
stateMachineResult) -> index);
@@ -532,6 +535,7 @@ class SegmentedRaftLogWorker {
void failed(IOException e) {
stateMachine.event().notifyLogFailed(e, entry);
super.failed(e);
+ discard();
}
@Override
@@ -547,15 +551,14 @@ class SegmentedRaftLogWorker {
@Override
void done() {
writeTasks.offerOrCompleteFuture(this);
- if (ref != null) {
- ref.release();
- }
+ discard();
}
@Override
void discard() {
- if (ref != null) {
- ref.release();
+ final ReferenceCountedObject<LogEntryProto> entryRef =
ref.getAndSet(null);
+ if (entryRef != null) {
+ entryRef.release();
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 18e4f2eca..74bc0c535 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -83,7 +83,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
return (SimpleStateMachine4Testing)s.getStateMachine();
}
- private final SortedMap<Long, ReferenceCountedObject<LogEntryProto>>
indexMap = Collections.synchronizedSortedMap(new TreeMap<>());
+ private final SortedMap<Long, LogEntryProto> indexMap =
Collections.synchronizedSortedMap(new TreeMap<>());
private final SortedMap<String, LogEntryProto> dataMap =
Collections.synchronizedSortedMap(new TreeMap<>());
private final Daemon checkpointer;
private final SimpleStateMachineStorage storage = new
SimpleStateMachineStorage();
@@ -198,9 +198,8 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
return leaderElectionTimeoutInfo;
}
- private void put(ReferenceCountedObject<LogEntryProto> entryRef) {
- LogEntryProto entry = entryRef.retain();
- final ReferenceCountedObject<LogEntryProto> previous =
indexMap.put(entry.getIndex(), entryRef);
+ private void put(LogEntryProto entry) {
+ final LogEntryProto previous = indexMap.put(entry.getIndex(), entry);
Preconditions.assertNull(previous, "previous");
final String s =
entry.getStateMachineLogEntry().getLogData().toStringUtf8();
dataMap.put(s, entry);
@@ -250,7 +249,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
LogEntryProto entry = entryRef.get();
LOG.info("applyTransaction for log index {}", entry.getIndex());
- put(entryRef);
+ put(LogProtoUtils.copy(entry));
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK");
@@ -270,8 +269,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile);
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(snapshotFile, false,
segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
- for (final ReferenceCountedObject<LogEntryProto> entryRef :
indexMap.values()) {
- LogEntryProto entry = entryRef.get();
+ for (final LogEntryProto entry : indexMap.values()) {
if (entry.getIndex() > endIndex) {
break;
} else {
@@ -306,7 +304,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
- put(ReferenceCountedObject.wrap(entry));
+ put(entry);
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
}
}
@@ -335,7 +333,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
LOG.info("query {}, all available: {}", string, dataMap.keySet());
final LogEntryProto entry = dataMap.get(string);
if (entry != null) {
- return
CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+ return CompletableFuture.completedFuture(Message.valueOf(entry));
}
exception = new IndexOutOfBoundsException(getId() + ": LogEntry not
found for query " + string);
} catch (Exception e) {
@@ -381,11 +379,12 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
running = false;
checkpointer.interrupt();
});
- indexMap.values().forEach(ReferenceCountedObject::release);
+ indexMap.clear();
+ dataMap.clear();
}
public LogEntryProto[] getContent() {
- return
indexMap.values().stream().map(ReferenceCountedObject::get).toArray(LogEntryProto[]::new);
+ return indexMap.values().toArray(new LogEntryProto[0]);
}
public void blockStartTransaction() {