This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch release-3.1.3_review
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 17ca6f41d0a577de2ecb452368c1a38b0c63d8b7
Author: William Song <48054931+szywill...@users.noreply.github.com>
AuthorDate: Tue Jan 7 02:06:08 2025 +0800

    RATIS-2235. Allow only one thread to perform appendLog  (#1206)
---
 .../java/org/apache/ratis/server/impl/RaftServerImpl.java  | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3ffcee079..14b09a023 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -132,6 +132,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -249,6 +250,8 @@ class RaftServerImpl implements RaftServer.Division,
   private final AtomicBoolean firstElectionSinceStartup = new 
AtomicBoolean(true);
   private final ThreadGroup threadGroup;
 
+  private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
+
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy 
proxy, RaftStorage.StartupOption option)
       throws IOException {
     final RaftPeerId id = proxy.getId();
@@ -282,6 +285,7 @@ class RaftServerImpl implements RaftServer.Division,
     this.transferLeadership = new TransferLeadership(this, properties);
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
     this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, 
properties);
+    this.appendLogFuture = new 
AtomicReference<>(CompletableFuture.completedFuture(null));
 
     this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1585,9 +1589,9 @@ class RaftServerImpl implements RaftServer.Division,
       state.updateConfiguration(entries);
     }
     future.join();
+    final CompletableFuture<Void> appendLog = entries.isEmpty()? 
CompletableFuture.completedFuture(null)
+        : appendLog(entries);
 
-    final List<CompletableFuture<Long>> futures = entries.isEmpty() ? 
Collections.emptyList()
-        : state.getLog().append(entries);
     proto.getCommitInfosList().forEach(commitInfoCache::update);
 
     CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
@@ -1601,7 +1605,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), 
previous, entries.size());
     final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: 
entries.get(entries.size() - 1).getIndex();
-    return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
+    return appendLog.whenCompleteAsync((r, t) -> {
       followerState.ifPresent(fs -> 
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
       timer.stop();
     }, getServerExecutor()).thenApply(v -> {
@@ -1618,6 +1622,10 @@ class RaftServerImpl implements RaftServer.Division,
       return reply;
     });
   }
+  private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
+    return appendLogFuture.updateAndGet(f -> f.thenCompose(
+            ignored -> JavaUtils.allOf(state.getLog().append(entries))));
+  }
 
   private long checkInconsistentAppendEntries(TermIndex previous, 
List<LogEntryProto> entries) {
     // Check if a snapshot installation through state machine is in progress.

Reply via email to