This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-3.1.1_review in repository https://gitbox.apache.org/repos/asf/ratis.git
commit cdfd4f10d7febbd320c1104f2b173c5ac9b4264b Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Aug 21 08:57:20 2024 -0700 Revert "RATIS-1983. Refactor client request processing to support reference count. (#998)" This reverts commit 43a02109d1f0b34dd80b1e36f1d023f86f24e3ab. --- .../protocol/RaftClientAsynchronousProtocol.java | 34 +----- .../apache/ratis/util/ReferenceCountedObject.java | 24 ---- .../examples/filestore/FileStoreStateMachine.java | 18 +-- .../ratis/statemachine/TransactionContext.java | 8 -- .../apache/ratis/server/impl/RaftServerImpl.java | 86 ++++---------- .../apache/ratis/server/impl/RaftServerProxy.java | 13 +- .../ratis/server/raftlog/segmented/LogSegment.java | 132 +++++++-------------- .../server/raftlog/segmented/SegmentedRaftLog.java | 19 ++- .../raftlog/segmented/SegmentedRaftLogCache.java | 5 +- .../raftlog/segmented/SegmentedRaftLogWorker.java | 8 +- .../statemachine/impl/TransactionContextImpl.java | 18 --- .../server/raftlog/segmented/TestLogSegment.java | 24 ++-- .../segmented/TestSegmentedRaftLogCache.java | 12 +- .../java/org/apache/ratis/tools/ParseRatisLog.java | 7 +- 14 files changed, 111 insertions(+), 297 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java index 1985bbe66..1a9f83c82 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,40 +17,12 @@ */ package org.apache.ratis.protocol; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.ReferenceCountedObject; - import java.io.IOException; import java.util.concurrent.CompletableFuture; /** Asynchronous version of {@link RaftClientProtocol}. */ public interface RaftClientAsynchronousProtocol { - /** - * It is recommended to override {@link #submitClientRequestAsync(ReferenceCountedObject)} instead. - * Then, it does not have to override this method. - */ - default CompletableFuture<RaftClientReply> submitClientRequestAsync( - RaftClientRequest request) throws IOException { - return submitClientRequestAsync(ReferenceCountedObject.wrap(request)); - } + CompletableFuture<RaftClientReply> submitClientRequestAsync( + RaftClientRequest request) throws IOException; - /** - * A referenced counted request is submitted from a client for processing. - * Implementations of this method should retain the request, process it and then release it. - * The request may be retained even after the future returned by this method has completed. - * - * @return a future of the reply - * @see ReferenceCountedObject - */ - default CompletableFuture<RaftClientReply> submitClientRequestAsync( - ReferenceCountedObject<RaftClientRequest> requestRef) { - try { - // for backward compatibility - return submitClientRequestAsync(requestRef.retain()) - .whenComplete((r, e) -> requestRef.release()); - } catch (Exception e) { - requestRef.release(); - return JavaUtils.completeExceptionally(e); - } - } } \ No newline at end of file diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java index 3f72f5ffe..0dd378dc0 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java @@ -101,30 +101,6 @@ public interface ReferenceCountedObject<T> { return wrap(value, () -> {}, ignored -> {}); } - /** - * @return a {@link ReferenceCountedObject} of the given value by delegating to this object. - */ - default <V> ReferenceCountedObject<V> delegate(V value) { - final ReferenceCountedObject<T> delegated = this; - return new ReferenceCountedObject<V>() { - @Override - public V get() { - return value; - } - - @Override - public V retain() { - delegated.retain(); - return value; - } - - @Override - public boolean release() { - return delegated.release(); - } - }; - } - /** * Wrap the given value as a {@link ReferenceCountedObject}. * diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java index 858e300ec..5f258ee3b 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java @@ -32,7 +32,6 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; @@ -41,7 +40,6 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.JavaUtils; import java.io.IOException; import java.nio.file.Path; @@ -170,11 +168,9 @@ public class FileStoreStateMachine extends BaseStateMachine { } static class LocalStream implements DataStream { - private final String name; private final DataChannel dataChannel; - LocalStream(String name, DataChannel dataChannel) { - this.name = JavaUtils.getClassSimpleName(getClass()) + "[" + name + "]"; + LocalStream(DataChannel dataChannel) { this.dataChannel = dataChannel; } @@ -194,11 +190,6 @@ public class FileStoreStateMachine extends BaseStateMachine { } }); } - - @Override - public String toString() { - return name; - } } @Override @@ -211,14 +202,13 @@ public class FileStoreStateMachine extends BaseStateMachine { return FileStoreCommon.completeExceptionally( "Failed to parse stream header", e); } - final String file = proto.getStream().getPath().toStringUtf8(); - return files.createDataChannel(file) - .thenApply(channel -> new LocalStream(file, channel)); + return files.createDataChannel(proto.getStream().getPath().toStringUtf8()) + .thenApply(LocalStream::new); } @Override public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) { - LOG.info("linking {} to {}", stream, LogProtoUtils.toLogEntryString(entry)); + LOG.info("linking {}", stream); return files.streamLink(stream); } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java index e0190747f..3821b058c 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java @@ -23,7 +23,6 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.ReflectionUtils; import java.io.IOException; @@ -99,13 +98,6 @@ public interface TransactionContext { */ LogEntryProto getLogEntry(); - /** Wrap the given log entry as a {@link ReferenceCountedObject} for retaining it for later use. */ - default ReferenceCountedObject<LogEntryProto> wrap(LogEntryProto entry) { - Preconditions.assertSame(getLogEntry().getTerm(), entry.getTerm(), "entry.term"); - Preconditions.assertSame(getLogEntry().getIndex(), entry.getIndex(), "entry.index"); - return ReferenceCountedObject.wrap(entry); - } - /** * Sets whether to commit the transaction to the RAFT log or not * @param shouldCommit true if the transaction is supposed to be committed to the RAFT log diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c1a716bd0..8d7246fcf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -109,7 +109,6 @@ import org.apache.ratis.util.LifeCycle.State; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedSupplier; @@ -772,21 +771,15 @@ class RaftServerImpl implements RaftServer.Division, } /** - * Append a transaction to the log for processing a client request. - * Note that the given request could be different from {@link TransactionContext#getClientRequest()} - * since the request could be converted; see {@link #convertRaftClientRequest(RaftClientRequest)}. - * - * @param request The client request. - * @param context The context of the transaction. - * @param cacheEntry the entry in the retry cache. - * @return a future of the reply. + * Handle a normal update request from client. */ private CompletableFuture<RaftClientReply> appendTransaction( - RaftClientRequest request, TransactionContextImpl context, CacheEntry cacheEntry) { - Objects.requireNonNull(request, "request == null"); + RaftClientRequest request, TransactionContextImpl context, CacheEntry cacheEntry) throws IOException { CodeInjectionForTesting.execute(APPEND_TRANSACTION, getId(), request.getClientId(), request, context, cacheEntry); + assertLifeCycleState(LifeCycle.States.RUNNING); + final PendingRequest pending; synchronized (this) { final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry); @@ -805,7 +798,6 @@ class RaftServerImpl implements RaftServer.Division, return cacheEntry.getReplyFuture(); } try { - assertLifeCycleState(LifeCycle.States.RUNNING); state.appendLog(context); } catch (StateMachineException e) { // the StateMachineException is thrown by the SM in the preAppend stage. @@ -817,9 +809,6 @@ class RaftServerImpl implements RaftServer.Division, leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION); } return CompletableFuture.completedFuture(exceptionReply); - } catch (ServerNotReadyException e) { - final RaftClientReply exceptionReply = newExceptionReply(request, e); - return CompletableFuture.completedFuture(exceptionReply); } // put the request into the pending queue @@ -872,13 +861,11 @@ class RaftServerImpl implements RaftServer.Division, role.getLeaderState().ifPresent(leader -> leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE)); } - /** If the given request is {@link TypeCase#FORWARD}, convert it. */ - static RaftClientRequest convertRaftClientRequest(RaftClientRequest request) throws InvalidProtocolBufferException { - if (!request.is(TypeCase.FORWARD)) { - return request; - } - return ClientProtoUtils.toRaftClientRequest(RaftClientRequestProto.parseFrom( - request.getMessage().getContent().asReadOnlyByteBuffer())); + private RaftClientRequest filterDataStreamRaftClientRequest(RaftClientRequest request) + throws InvalidProtocolBufferException { + return !request.is(TypeCase.FORWARD) ? request : ClientProtoUtils.toRaftClientRequest( + RaftClientRequestProto.parseFrom( + request.getMessage().getContent().asReadOnlyByteBuffer())); } <REPLY> CompletableFuture<REPLY> executeSubmitServerRequestAsync( @@ -888,29 +875,20 @@ class RaftServerImpl implements RaftServer.Division, serverExecutor).join(); } - CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync( - ReferenceCountedObject<RaftClientRequest> request) { - return CompletableFuture.supplyAsync(() -> submitClientRequestAsync(request), clientExecutor).join(); + CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(RaftClientRequest request) { + return CompletableFuture.supplyAsync( + () -> JavaUtils.callAsUnchecked(() -> submitClientRequestAsync(request), CompletionException::new), + clientExecutor).join(); } @Override public CompletableFuture<RaftClientReply> submitClientRequestAsync( - ReferenceCountedObject<RaftClientRequest> requestRef) { - final RaftClientRequest request = requestRef.retain(); + RaftClientRequest request) throws IOException { + assertLifeCycleState(LifeCycle.States.RUNNING); LOG.debug("{}: receive client request({})", getMemberId(), request); - - try { - assertLifeCycleState(LifeCycle.States.RUNNING); - } catch (ServerNotReadyException e) { - final RaftClientReply reply = newExceptionReply(request, e); - requestRef.release(); - return CompletableFuture.completedFuture(reply); - } - final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType()); final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time); - return replyFuture(requestRef).whenComplete((clientReply, exception) -> { - requestRef.release(); + return replyFuture(request).whenComplete((clientReply, exception) -> { timerContext.ifPresent(Timekeeper.Context::stop); if (exception != null || clientReply.getException() != null) { raftServerMetrics.incFailedRequestCount(request.getType()); @@ -918,8 +896,7 @@ class RaftServerImpl implements RaftServer.Division, }); } - private CompletableFuture<RaftClientReply> replyFuture(ReferenceCountedObject<RaftClientRequest> requestRef) { - final RaftClientRequest request = requestRef.get(); + private CompletableFuture<RaftClientReply> replyFuture(RaftClientRequest request) throws IOException { retryCache.invalidateRepliedRequests(request); final TypeCase type = request.getType().getTypeCase(); @@ -931,18 +908,17 @@ class RaftServerImpl implements RaftServer.Division, case WATCH: return watchAsync(request); case MESSAGESTREAM: - return messageStreamAsync(requestRef); + return messageStreamAsync(request); case WRITE: case FORWARD: - return writeAsync(requestRef); + return writeAsync(request); default: throw new IllegalStateException("Unexpected request type: " + type + ", request=" + request); } } - private CompletableFuture<RaftClientReply> writeAsync(ReferenceCountedObject<RaftClientRequest> requestRef) { - final RaftClientRequest request = requestRef.get(); - final CompletableFuture<RaftClientReply> future = writeAsyncImpl(requestRef); + private CompletableFuture<RaftClientReply> writeAsync(RaftClientRequest request) throws IOException { + final CompletableFuture<RaftClientReply> future = writeAsyncImpl(request); if (request.is(TypeCase.WRITE)) { // check replication final ReplicationLevel replication = request.getType().getWrite().getReplication(); @@ -953,8 +929,7 @@ class RaftServerImpl implements RaftServer.Division, return future; } - private CompletableFuture<RaftClientReply> writeAsyncImpl(ReferenceCountedObject<RaftClientRequest> requestRef) { - final RaftClientRequest request = requestRef.get(); + private CompletableFuture<RaftClientReply> writeAsyncImpl(RaftClientRequest request) throws IOException { final CompletableFuture<RaftClientReply> reply = checkLeaderState(request); if (reply != null) { return reply; @@ -970,15 +945,8 @@ class RaftServerImpl implements RaftServer.Division, // TODO: this client request will not be added to pending requests until // later which means that any failure in between will leave partial state in // the state machine. We should call cancelTransaction() for failed requests - final TransactionContextImpl context; - try { - context = (TransactionContextImpl) stateMachine.startTransaction(convertRaftClientRequest(request)); - } catch (IOException e) { - final RaftClientReply exceptionReply = newExceptionReply(request, - new RaftException("Failed to startTransaction for " + request, e)); - cacheEntry.failWithReply(exceptionReply); - return CompletableFuture.completedFuture(exceptionReply); - } + final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( + filterDataStreamRaftClientRequest(request)); if (context.getException() != null) { final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); final RaftClientReply exceptionReply = newExceptionReply(request, e); @@ -986,7 +954,6 @@ class RaftServerImpl implements RaftServer.Division, return CompletableFuture.completedFuture(exceptionReply); } - context.setDelegatedRef(requestRef); return appendTransaction(request, context, cacheEntry); } @@ -1089,8 +1056,7 @@ class RaftServerImpl implements RaftServer.Division, } } - private CompletableFuture<RaftClientReply> messageStreamAsync(ReferenceCountedObject<RaftClientRequest> requestRef) { - final RaftClientRequest request = requestRef.get(); + private CompletableFuture<RaftClientReply> messageStreamAsync(RaftClientRequest request) throws IOException { final CompletableFuture<RaftClientReply> reply = checkLeaderState(request); if (reply != null) { return reply; @@ -1102,7 +1068,7 @@ class RaftServerImpl implements RaftServer.Division, return f.thenApply(r -> null); } // the message stream has ended and the request become a WRITE request - return replyFuture(requestRef.delegate(f.join())); + return replyFuture(f.join()); } return role.getLeaderState() diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 9834d62ab..84221cfcf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -52,7 +52,6 @@ import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.TimeDuration; import java.io.Closeable; @@ -446,15 +445,9 @@ class RaftServerProxy implements RaftServer { } @Override - public CompletableFuture<RaftClientReply> submitClientRequestAsync( - ReferenceCountedObject<RaftClientRequest> requestRef) { - final RaftClientRequest request = requestRef.retain(); - try { - return getImplFuture(request.getRaftGroupId()) - .thenCompose(impl -> impl.executeSubmitClientRequestAsync(requestRef)); - } finally { - requestRef.release(); - } + public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) { + return getImplFuture(request.getRaftGroupId()) + .thenCompose(impl -> impl.executeSubmitClientRequestAsync(request)); } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index 12e7c4f1d..89a6e2050 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -30,7 +30,6 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -69,20 +67,17 @@ public final class LogSegment { } static long getEntrySize(LogEntryProto entry, Op op) { - switch (op) { - case CHECK_SEGMENT_FILE_FULL: - case LOAD_SEGMENT_FILE: - case WRITE_CACHE_WITH_STATE_MACHINE_CACHE: - Preconditions.assertTrue(entry == LogProtoUtils.removeStateMachineData(entry), - () -> "Unexpected LogEntryProto with StateMachine data: op=" + op + ", entry=" + entry); - break; - case WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE: - case REMOVE_CACHE: - break; - default: - throw new IllegalStateException("Unexpected op " + op + ", entry=" + entry); + LogEntryProto e = entry; + if (op == Op.CHECK_SEGMENT_FILE_FULL) { + e = LogProtoUtils.removeStateMachineData(entry); + } else if (op == Op.LOAD_SEGMENT_FILE || op == Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE) { + Preconditions.assertTrue(entry == LogProtoUtils.removeStateMachineData(entry), + () -> "Unexpected LogEntryProto with StateMachine data: op=" + op + ", entry=" + entry); + } else { + Preconditions.assertTrue(op == Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE || op == Op.REMOVE_CACHE, + () -> "Unexpected op " + op + ", entry=" + entry); } - final int serialized = entry.getSerializedSize(); + final int serialized = e.getSerializedSize(); return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4L; } @@ -129,8 +124,7 @@ public final class LogSegment { } public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize, - CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, - Consumer<ReferenceCountedObject<LogEntryProto>> entryConsumer) + CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer) throws IOException { int count = 0; try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream( @@ -142,8 +136,7 @@ public final class LogSegment { } if (entryConsumer != null) { - // TODO: use reference count to support zero buffer copying for readSegmentFile - entryConsumer.accept(ReferenceCountedObject.wrap(next)); + entryConsumer.accept(next); } count++; } @@ -170,7 +163,10 @@ public final class LogSegment { final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy); final boolean isOpen = startEnd.isOpen(); final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> { - segment.append(Op.LOAD_SEGMENT_FILE, entry, keepEntryInCache || isOpen, logConsumer); + segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE); + if (logConsumer != null) { + logConsumer.accept(entry); + } }); LOG.info("Successfully read {} entries from segment file {}", entryCount, file); @@ -238,10 +234,10 @@ public final class LogSegment { // the on-disk log file should be truncated but has not been done yet. final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>(); final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen); - readSegmentFile(file, startEnd, maxOpSize, getLogCorruptionPolicy(), raftLogMetrics, entryRef -> { - final LogEntryProto entry = entryRef.retain(); + readSegmentFile(file, startEnd, maxOpSize, + getLogCorruptionPolicy(), raftLogMetrics, entry -> { final TermIndex ti = TermIndex.valueOf(entry); - putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE); + putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE); if (ti.equals(key.getTermIndex())) { toReturn.set(entry); } @@ -251,48 +247,13 @@ public final class LogSegment { } } - static class EntryCache { - private final Map<TermIndex, ReferenceCountedObject<LogEntryProto>> map = new ConcurrentHashMap<>(); - private final AtomicLong size = new AtomicLong(); - - long size() { - return size.get(); - } - - LogEntryProto get(TermIndex ti) { - return Optional.ofNullable(map.get(ti)) - .map(ReferenceCountedObject::get) - .orElse(null); - } - - void clear() { - map.values().forEach(ReferenceCountedObject::release); - map.clear(); - size.set(0); - } - - void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) { - valueRef.retain(); - Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release); - size.getAndAdd(getEntrySize(valueRef.get(), op)); - } - - private void release(ReferenceCountedObject<LogEntryProto> entry) { - size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE)); - entry.release(); - } - - void remove(TermIndex key) { - Optional.ofNullable(map.remove(key)).ifPresent(this::release); - } - } - File getFile() { return LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen).getFile(storage); } private volatile boolean isOpen; private long totalFileSize = SegmentedRaftLogFormat.getHeaderLength(); + private AtomicLong totalCacheSize = new AtomicLong(0); /** Segment start index, inclusive. */ private final long startIndex; /** Segment end index, inclusive. */ @@ -310,7 +271,7 @@ public final class LogSegment { /** * the entryCache caches the content of log entries. */ - private final EntryCache entryCache = new EntryCache(); + private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>(); private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) { @@ -342,29 +303,12 @@ public final class LogSegment { return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy); } - void appendToOpenSegment(Op op, ReferenceCountedObject<LogEntryProto> entryRef) { + void appendToOpenSegment(LogEntryProto entry, Op op) { Preconditions.assertTrue(isOpen(), "The log segment %s is not open for append", this); - append(op, entryRef, true, null); - } - - private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef, - boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer) { - final LogEntryProto entry = entryRef.retain(); - try { - final LogRecord record = appendLogRecord(op, entry); - if (keepEntryInCache) { - putEntryCache(record.getTermIndex(), entryRef, op); - } - if (logConsumer != null) { - logConsumer.accept(entry); - } - } finally { - entryRef.release(); - } + append(true, entry, op); } - - private LogRecord appendLogRecord(Op op, LogEntryProto entry) { + private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { Objects.requireNonNull(entry, "entry == null"); if (records.isEmpty()) { Preconditions.assertTrue(entry.getIndex() == startIndex, @@ -380,9 +324,11 @@ public final class LogSegment { final LogRecord record = new LogRecord(totalFileSize, entry); records.add(record); + if (keepEntryInCache) { + putEntryCache(record.getTermIndex(), entry, op); + } totalFileSize += getEntrySize(entry, op); endIndex = entry.getIndex(); - return record; } LogEntryProto getEntryFromCache(TermIndex ti) { @@ -425,7 +371,7 @@ public final class LogSegment { } long getTotalCacheSize() { - return entryCache.size(); + return totalCacheSize.get(); } /** @@ -435,7 +381,7 @@ public final class LogSegment { Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex); for (long index = endIndex; index >= fromIndex; index--) { LogRecord removed = records.remove(Math.toIntExact(index - startIndex)); - removeEntryCache(removed.getTermIndex()); + removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE); totalFileSize = removed.offset; } isOpen = false; @@ -480,18 +426,28 @@ public final class LogSegment { void evictCache() { entryCache.clear(); + totalCacheSize.set(0); } - void putEntryCache(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) { - entryCache.put(key, valueRef, op); + void putEntryCache(TermIndex key, LogEntryProto value, Op op) { + final LogEntryProto previous = entryCache.put(key, value); + long previousSize = 0; + if (previous != null) { + // Different threads maybe load LogSegment file into cache at the same time, so duplicate maybe happen + previousSize = getEntrySize(value, Op.REMOVE_CACHE); + } + totalCacheSize.getAndAdd(getEntrySize(value, op) - previousSize); } - void removeEntryCache(TermIndex key) { - entryCache.remove(key); + void removeEntryCache(TermIndex key, Op op) { + LogEntryProto value = entryCache.remove(key); + if (value != null) { + totalCacheSize.getAndAdd(-getEntrySize(value, op)); + } } boolean hasCache() { - return isOpen || entryCache.size() > 0; // open segment always has cache. + return isOpen || !entryCache.isEmpty(); // open segment always has cache. } boolean containsIndex(long index) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index def472a60..f49900f16 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -41,7 +41,6 @@ import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.AwaitToRun; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.StringUtils; import java.io.File; @@ -54,7 +53,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.LongSupplier; import org.apache.ratis.util.UncheckedAutoCloseable; @@ -395,7 +393,6 @@ public final class SegmentedRaftLog extends RaftLogBase { if (LOG.isTraceEnabled()) { LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry)); } - final LogEntryProto removedStateMachineData = LogProtoUtils.removeStateMachineData(entry); try(AutoCloseableLock writeLock = writeLock()) { final Timekeeper.Context appendEntryTimerContext = getRaftLogMetrics().startAppendEntryTimer(); validateLogEntry(entry); @@ -404,7 +401,7 @@ public final class SegmentedRaftLog extends RaftLogBase { if (currentOpenSegment == null) { cache.addOpenSegment(entry.getIndex()); fileLogWorker.startLogSegment(entry.getIndex()); - } else if (isSegmentFull(currentOpenSegment, removedStateMachineData)) { + } else if (isSegmentFull(currentOpenSegment, entry)) { rollOpenSegment = true; } else { final TermIndex last = currentOpenSegment.getLastTermIndex(); @@ -426,17 +423,17 @@ public final class SegmentedRaftLog extends RaftLogBase { // If the entry has state machine data, then the entry should be inserted // to statemachine first and then to the cache. Not following the order // will leave a spurious entry in the cache. - final Task write = fileLogWorker.writeLogEntry(entry, removedStateMachineData, context); - final Function<LogEntryProto, ReferenceCountedObject<LogEntryProto>> wrap = context != null ? - context::wrap : ReferenceCountedObject::wrap; + CompletableFuture<Long> writeFuture = + fileLogWorker.writeLogEntry(entry, context).getFuture(); if (stateMachineCachingEnabled) { // The stateMachineData will be cached inside the StateMachine itself. - cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE, wrap.apply(removedStateMachineData)); + cache.appendEntry(LogProtoUtils.removeStateMachineData(entry), + LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE); } else { - cache.appendEntry(LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, wrap.apply(entry) - ); + cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); } - return write.getFuture().whenComplete((clientReply, exception) -> appendEntryTimerContext.stop()); + writeFuture.whenComplete((clientReply, exception) -> appendEntryTimerContext.stop()); + return writeFuture; } catch (Exception e) { LOG.error("{}: Failed to append {}", getName(), toLogEntryString(entry), e); throw e; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java index 0b05b14e5..58c70c4af 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java @@ -32,7 +32,6 @@ import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.AutoCloseableReadWriteLock; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -598,11 +597,11 @@ public class SegmentedRaftLogCache { } } - void appendEntry(LogSegment.Op op, ReferenceCountedObject<LogEntryProto> entry) { + void appendEntry(LogEntryProto entry, LogSegment.Op op) { // SegmentedRaftLog does the segment creation/rolling work. Here we just // simply append the entry into the open segment. Preconditions.assertNotNull(openSegment, "openSegment"); - openSegment.appendToOpenSegment(op, entry); + openSegment.appendToOpenSegment(entry, op); } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 5b0470d4f..68266b417 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -445,8 +445,8 @@ class SegmentedRaftLogWorker { addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1)); } - Task writeLogEntry(LogEntryProto entry, LogEntryProto removedStateMachineData, TransactionContext context) { - return addIOTask(new WriteLog(entry, removedStateMachineData, context)); + Task writeLogEntry(LogEntryProto entry, TransactionContext context) { + return addIOTask(new WriteLog(entry, context)); } Task truncate(TruncationSegments ts, long index) { @@ -493,8 +493,8 @@ class SegmentedRaftLogWorker { private final CompletableFuture<?> stateMachineFuture; private final CompletableFuture<Long> combined; - WriteLog(LogEntryProto entry, LogEntryProto removedStateMachineData, TransactionContext context) { - this.entry = removedStateMachineData; + WriteLog(LogEntryProto entry, TransactionContext context) { + this.entry = LogProtoUtils.removeStateMachineData(entry); if (this.entry == entry) { final StateMachineLogEntryProto proto = entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null; if (stateMachine != null && proto != null && proto.getType() == StateMachineLogEntryProto.Type.DATASTREAM) { diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index 600625716..d92f3a1c8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -26,7 +26,6 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ReferenceCountedObject; import java.io.IOException; import java.util.Objects; @@ -72,9 +71,6 @@ public class TransactionContextImpl implements TransactionContext { /** Committed LogEntry. */ @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type private volatile LogEntryProto logEntry; - /** For wrapping {@link #logEntry} in order to release the underlying buffer. */ - @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type - private volatile ReferenceCountedObject<?> delegatedRef; private final CompletableFuture<Long> logIndexFuture = new CompletableFuture<>(); @@ -130,20 +126,6 @@ public class TransactionContextImpl implements TransactionContext { return clientRequest; } - public void setDelegatedRef(ReferenceCountedObject<?> ref) { - this.delegatedRef = ref; - } - - @Override - public ReferenceCountedObject<LogEntryProto> wrap(LogEntryProto entry) { - if (delegatedRef == null) { - return TransactionContext.super.wrap(entry); - } - Preconditions.assertSame(getLogEntry().getTerm(), entry.getTerm(), "entry.term"); - Preconditions.assertSame(getLogEntry().getIndex(), entry.getIndex(), "entry.index"); - return delegatedRef.delegate(entry); - } - @Override public StateMachineLogEntryProto getStateMachineLogEntry() { return stateMachineLogEntry; diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java index 4e04e9e62..50f9d2382 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java @@ -21,20 +21,18 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil.SimpleOperation; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogProtoUtils; -import org.apache.ratis.server.raftlog.segmented.LogSegment.Op; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.server.storage.RaftStorageTestUtils; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TraditionalBinaryPrefix; import org.junit.jupiter.api.AfterEach; @@ -146,7 +144,7 @@ public class TestLogSegment extends BaseTest { if (entry == null) { entry = segment.loadCache(record); } - offset += getEntrySize(entry, Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + offset += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); } } @@ -205,8 +203,8 @@ public class TestLogSegment extends BaseTest { while (size < max) { SimpleOperation op = new SimpleOperation("m" + i); LogEntryProto entry = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start); - size += getEntrySize(entry, Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); - segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)); + size += getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); } Assertions.assertTrue(segment.getTotalFileSize() >= max); @@ -238,18 +236,18 @@ public class TestLogSegment extends BaseTest { final StateMachineLogEntryProto m = op.getLogEntryContent(); try { LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1001); - segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); Assertions.fail("should fail since the entry's index needs to be 1000"); } catch (IllegalStateException e) { // the exception is expected. } LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1000); - segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); try { entry = LogProtoUtils.toLogEntryProto(m, 0, 1002); - segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); Assertions.fail("should fail since the entry's index needs to be 1001"); } catch (IllegalStateException e) { // the exception is expected. @@ -264,7 +262,7 @@ public class TestLogSegment extends BaseTest { for (int i = 0; i < 100; i++) { LogEntryProto entry = LogProtoUtils.toLogEntryProto( new SimpleOperation("m" + i).getLogEntryContent(), term, i + start); - segment.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)); + segment.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); } // truncate an open segment (remove 1080~1099) @@ -319,7 +317,7 @@ public class TestLogSegment extends BaseTest { 1024, 1024, ByteBuffer.allocateDirect(bufferSize))) { SimpleOperation op = new SimpleOperation(new String(content)); LogEntryProto entry = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); - size = LogSegment.getEntrySize(entry, Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + size = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); out.write(entry); } Assertions.assertEquals(file.length(), @@ -346,7 +344,7 @@ public class TestLogSegment extends BaseTest { Arrays.fill(content, (byte) 1); SimpleOperation op = new SimpleOperation(new String(content)); LogEntryProto entry = LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0); - final long entrySize = LogSegment.getEntrySize(entry, Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); + final long entrySize = LogSegment.getEntrySize(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); long totalSize = SegmentedRaftLogFormat.getHeaderLength(); long preallocated = 16 * 1024; diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java index efcb90580..8015f1827 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java @@ -34,9 +34,7 @@ import org.apache.ratis.server.raftlog.LogEntryHeader; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments; import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord; -import org.apache.ratis.server.raftlog.segmented.LogSegment.Op; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.util.ReferenceCountedObject; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -66,7 +64,7 @@ public class TestSegmentedRaftLogCache { for (long i = start; i <= end; i++) { SimpleOperation m = new SimpleOperation("m" + i); LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i); - s.appendToOpenSegment(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry)); + s.appendToOpenSegment(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); } if (!isOpen) { s.close(); @@ -150,15 +148,14 @@ public class TestSegmentedRaftLogCache { } @Test - public void testAppendEntry() { + public void testAppendEntry() throws Exception { LogSegment closedSegment = prepareLogSegment(0, 99, false); cache.addSegment(closedSegment); final SimpleOperation m = new SimpleOperation("m"); try { LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0); - cache.appendEntry(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry) - ); + cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); Assertions.fail("the open segment is null"); } catch (IllegalStateException ignored) { } @@ -167,8 +164,7 @@ public class TestSegmentedRaftLogCache { cache.addSegment(openSegment); for (long index = 101; index < 200; index++) { LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index); - cache.appendEntry(Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, ReferenceCountedObject.wrap(entry) - ); + cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE); } Assertions.assertNotNull(cache.getOpenSegment()); diff --git a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java index ea512fa70..564ce0bf0 100644 --- a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java +++ b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java @@ -24,7 +24,6 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.segmented.LogSegmentPath; import org.apache.ratis.server.raftlog.segmented.LogSegment; -import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; import java.io.File; @@ -70,8 +69,7 @@ public final class ParseRatisLog { } - private void processLogEntry(ReferenceCountedObject<LogEntryProto> ref) { - final LogEntryProto proto = ref.retain(); + private void processLogEntry(LogEntryProto proto) { if (proto.hasConfigurationEntry()) { numConfEntries++; } else if (proto.hasMetadataEntry()) { @@ -79,13 +77,12 @@ public final class ParseRatisLog { } else if (proto.hasStateMachineLogEntry()) { numStateMachineEntries++; } else { - System.out.println("Found an invalid entry: " + proto); + System.out.println("Found invalid entry" + proto.toString()); numInvalidEntries++; } String str = LogProtoUtils.toLogEntryString(proto, smLogToString); System.out.println(str); - ref.release(); } public static class Builder {
