This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b7440123 RATIS-2235. Allow only one thread to perform appendLog  
(#1206)
9b7440123 is described below

commit 9b7440123fe543bb0d0fba1a6dc97da40d0f681a
Author: William Song <[email protected]>
AuthorDate: Tue Jan 7 02:06:08 2025 +0800

    RATIS-2235. Allow only one thread to perform appendLog  (#1206)
---
 .../org/apache/ratis/server/impl/RaftServerImpl.java     | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0ec73d5f4..4c752d43b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -133,6 +133,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -250,6 +251,8 @@ class RaftServerImpl implements RaftServer.Division,
   private final AtomicBoolean firstElectionSinceStartup = new 
AtomicBoolean(true);
   private final ThreadGroup threadGroup;
 
+  private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
+
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy 
proxy, RaftStorage.StartupOption option)
       throws IOException {
     final RaftPeerId id = proxy.getId();
@@ -283,6 +286,7 @@ class RaftServerImpl implements RaftServer.Division,
     this.transferLeadership = new TransferLeadership(this, properties);
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
     this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, 
properties);
+    this.appendLogFuture = new 
AtomicReference<>(CompletableFuture.completedFuture(null));
 
     this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1641,9 +1645,9 @@ class RaftServerImpl implements RaftServer.Division,
       state.updateConfiguration(entries);
     }
     future.join();
+    final CompletableFuture<Void> appendLog = entries.isEmpty()? 
CompletableFuture.completedFuture(null)
+        : appendLog(requestRef.delegate(entries));
 
-    final List<CompletableFuture<Long>> futures = entries.isEmpty() ? 
Collections.emptyList()
-        : state.getLog().append(requestRef.delegate(entries));
     proto.getCommitInfosList().forEach(commitInfoCache::update);
 
     CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
@@ -1657,7 +1661,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), 
previous, entries.size());
     final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: 
entries.get(entries.size() - 1).getIndex();
-    return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
+    return appendLog.whenCompleteAsync((r, t) -> {
       followerState.ifPresent(fs -> 
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
       timer.stop();
     }, getServerExecutor()).thenApply(v -> {
@@ -1674,6 +1678,12 @@ class RaftServerImpl implements RaftServer.Division,
       return reply;
     });
   }
+  private CompletableFuture<Void> 
appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
+    entriesRef.retain();
+    return appendLogFuture.updateAndGet(f -> f.thenCompose(
+            ignored -> JavaUtils.allOf(state.getLog().append(entriesRef))))
+        .whenComplete((v, e) -> entriesRef.release());
+  }
 
   private long checkInconsistentAppendEntries(TermIndex previous, 
List<LogEntryProto> entries) {
     // Check if a snapshot installation through state machine is in progress.

Reply via email to