This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 66401f2fd RATIS-2387. Performance degradation after RATIS-2235 (#1337)
66401f2fd is described below

commit 66401f2fd475b4f6c409b8f29abe818aa7524a06
Author: Sergey Soldatov <[email protected]>
AuthorDate: Wed Jan 28 23:49:11 2026 -0800

    RATIS-2387. Performance degradation after RATIS-2235 (#1337)
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 10 ++++++++
 .../apache/ratis/server/impl/RaftServerImpl.java   | 29 ++++++++--------------
 .../apache/ratis/server/impl/ServerImplUtils.java  | 19 +++++++++++---
 3 files changed, 37 insertions(+), 21 deletions(-)

diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 2538a472a..efb3c6796 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -440,6 +440,16 @@ public interface RaftServerConfigKeys {
       setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY, 
readLockEnabled);
     }
 
+    String APPEND_ENTRIES_COMPOSE_ENABLED_KEY = PREFIX + 
".append-entries.compose.enabled";
+    boolean APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT = true;
+    static boolean appendEntriesComposeEnabled(RaftProperties properties) {
+      return getBoolean(properties::getBoolean,
+          APPEND_ENTRIES_COMPOSE_ENABLED_KEY, 
APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT, getDefaultLog());
+    }
+    static void setAppendEntriesComposeEnabled(RaftProperties properties, 
boolean enabled) {
+      setBoolean(properties::setBoolean, APPEND_ENTRIES_COMPOSE_ENABLED_KEY, 
enabled);
+    }
+
     /**
      * Besides the open segment, the max number of segments caching log 
entries.
      */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 60f72e001..846b87702 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -81,7 +81,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.LeaderElection.Phase;
 import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
-import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices;
 import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices;
 import org.apache.ratis.server.leader.LeaderState.StepDownReason;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
@@ -133,7 +132,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -260,8 +258,7 @@ class RaftServerImpl implements RaftServer.Division,
   private final AtomicBoolean firstElectionSinceStartup = new 
AtomicBoolean(true);
   private final ThreadGroup threadGroup;
 
-  private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
-  private final NavigableIndices appendLogTermIndices = new NavigableIndices();
+  private final NavigableIndices appendLogTermIndices;
 
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy 
proxy, RaftStorage.StartupOption option)
       throws IOException {
@@ -296,7 +293,8 @@ class RaftServerImpl implements RaftServer.Division,
     this.transferLeadership = new TransferLeadership(this, properties);
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
     this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, 
properties);
-    this.appendLogFuture = new 
AtomicReference<>(CompletableFuture.completedFuture(null));
+    this.appendLogTermIndices = 
RaftServerConfigKeys.Log.appendEntriesComposeEnabled(properties) ?
+        new NavigableIndices() : null;
 
     this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1620,7 +1618,8 @@ class RaftServerImpl implements RaftServer.Division,
       state.updateConfiguration(entries);
     }
     future.join();
-    final CompletableFuture<Void> appendLog = entries.isEmpty()? 
CompletableFuture.completedFuture(null)
+    final CompletableFuture<Void> appendFuture = entries.isEmpty()? 
CompletableFuture.completedFuture(null)
+        : appendLogTermIndices != null ? appendLogTermIndices.append(entries, 
this::appendLog)
         : appendLog(entries);
 
     proto.getCommitInfosList().forEach(commitInfoCache::update);
@@ -1636,7 +1635,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), 
previous, entries.size());
     final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: 
entries.get(entries.size() - 1).getIndex();
-    return appendLog.whenCompleteAsync((r, t) -> {
+    return appendFuture.whenCompleteAsync((r, t) -> {
       followerState.ifPresent(fs -> 
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
       timer.stop();
     }, getServerExecutor()).thenApply(v -> {
@@ -1654,16 +1653,8 @@ class RaftServerImpl implements RaftServer.Division,
     });
   }
   private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
-    final List<ConsecutiveIndices> entriesTermIndices = 
ConsecutiveIndices.convert(entries);
-    if (!appendLogTermIndices.append(entriesTermIndices)) {
-      // index already exists, return the last future
-      return appendLogFuture.get();
-    }
-
-
-    return appendLogFuture.updateAndGet(f -> f.thenComposeAsync(
-            ignored -> JavaUtils.allOf(state.getLog().append(entries)), 
serverExecutor))
-        .whenComplete((v, e) -> 
appendLogTermIndices.removeExisting(entriesTermIndices));
+    return CompletableFuture.completedFuture(null)
+        .thenComposeAsync(dummy -> 
JavaUtils.allOf(state.getLog().append(entries)), serverExecutor);
   }
 
   private long checkInconsistentAppendEntries(TermIndex previous, 
List<LogEntryProto> entries) {
@@ -1690,7 +1681,9 @@ class RaftServerImpl implements RaftServer.Division,
     }
 
     // Check if "previous" is contained in current state.
-    if (previous != null && !(appendLogTermIndices.contains(previous) || 
state.containsTermIndex(previous))) {
+    if (previous != null
+        && !(appendLogTermIndices != null && 
appendLogTermIndices.contains(previous))
+        && !state.containsTermIndex(previous)) {
       final long replyNextIndex = Math.min(state.getNextIndex(), 
previous.getIndex());
       LOG.info("{}: Failed appendEntries as previous log entry ({}) is not 
found", getMemberId(), previous);
       return replyNextIndex;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 1a5fcfc85..434f98d68 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -47,7 +47,10 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 
 /** Server utilities for internal use. */
 public final class ServerImplUtils {
@@ -119,6 +122,8 @@ public final class ServerImplUtils {
   /** A data structure to support the {@link #contains(TermIndex)} method. */
   static class NavigableIndices {
     private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
+    private final AtomicReference<CompletableFuture<Void>> future
+        = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
     boolean contains(TermIndex ti) {
       final Long term = getTerm(ti.getIndex());
@@ -137,7 +142,15 @@ public final class ServerImplUtils {
       return floorEntry.getValue().getTerm(index);
     }
 
-    synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) {
+    CompletableFuture<Void> append(List<LogEntryProto> entries,
+        Function<List<LogEntryProto>, CompletableFuture<Void>> appendLog) {
+      final List<ConsecutiveIndices> entriesTermIndices = 
ConsecutiveIndices.convert(entries);
+      return alreadyExists(entriesTermIndices) ? future.get()
+          : future.updateAndGet(f -> f.thenComposeAsync(ignored -> 
appendLog.apply(entries)))
+              .whenComplete((v, e) -> removeExisting(entriesTermIndices));
+    }
+
+    private synchronized boolean alreadyExists(List<ConsecutiveIndices> 
entriesTermIndices) {
       for(int i = 0; i < entriesTermIndices.size(); i++) {
         final ConsecutiveIndices indices = entriesTermIndices.get(i);
         final ConsecutiveIndices previous = map.put(indices.startIndex, 
indices);
@@ -147,10 +160,10 @@ public final class ServerImplUtils {
           for(int j = 0; j < i; j++) {
             map.remove(entriesTermIndices.get(j).startIndex);
           }
-          return false;
+          return true;
         }
       }
-      return true;
+      return false;
     }
 
     synchronized void removeExisting(List<ConsecutiveIndices> 
entriesTermIndices) {

Reply via email to