This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9b7440123 RATIS-2235. Allow only one thread to perform appendLog
(#1206)
9b7440123 is described below
commit 9b7440123fe543bb0d0fba1a6dc97da40d0f681a
Author: William Song <[email protected]>
AuthorDate: Tue Jan 7 02:06:08 2025 +0800
RATIS-2235. Allow only one thread to perform appendLog (#1206)
---
.../org/apache/ratis/server/impl/RaftServerImpl.java | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0ec73d5f4..4c752d43b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -133,6 +133,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -250,6 +251,8 @@ class RaftServerImpl implements RaftServer.Division,
private final AtomicBoolean firstElectionSinceStartup = new
AtomicBoolean(true);
private final ThreadGroup threadGroup;
+ private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
+
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy
proxy, RaftStorage.StartupOption option)
throws IOException {
final RaftPeerId id = proxy.getId();
@@ -283,6 +286,7 @@ class RaftServerImpl implements RaftServer.Division,
this.transferLeadership = new TransferLeadership(this, properties);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this,
properties);
+ this.appendLogFuture = new
AtomicReference<>(CompletableFuture.completedFuture(null));
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1641,9 +1645,9 @@ class RaftServerImpl implements RaftServer.Division,
state.updateConfiguration(entries);
}
future.join();
+ final CompletableFuture<Void> appendLog = entries.isEmpty()?
CompletableFuture.completedFuture(null)
+ : appendLog(requestRef.delegate(entries));
- final List<CompletableFuture<Long>> futures = entries.isEmpty() ?
Collections.emptyList()
- : state.getLog().append(requestRef.delegate(entries));
proto.getCommitInfosList().forEach(commitInfoCache::update);
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
@@ -1657,7 +1661,7 @@ class RaftServerImpl implements RaftServer.Division,
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(),
previous, entries.size());
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX:
entries.get(entries.size() - 1).getIndex();
- return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
+ return appendLog.whenCompleteAsync((r, t) -> {
followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
timer.stop();
}, getServerExecutor()).thenApply(v -> {
@@ -1674,6 +1678,12 @@ class RaftServerImpl implements RaftServer.Division,
return reply;
});
}
+ private CompletableFuture<Void>
appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+ entriesRef.retain();
+ return appendLogFuture.updateAndGet(f -> f.thenCompose(
+ ignored -> JavaUtils.allOf(state.getLog().append(entriesRef))))
+ .whenComplete((v, e) -> entriesRef.release());
+ }
private long checkInconsistentAppendEntries(TermIndex previous,
List<LogEntryProto> entries) {
// Check if a snapshot installation through state machine is in progress.