This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 66401f2fd RATIS-2387. Performance degradation after RATIS-2235 (#1337)
66401f2fd is described below
commit 66401f2fd475b4f6c409b8f29abe818aa7524a06
Author: Sergey Soldatov <[email protected]>
AuthorDate: Wed Jan 28 23:49:11 2026 -0800
RATIS-2387. Performance degradation after RATIS-2235 (#1337)
---
.../apache/ratis/server/RaftServerConfigKeys.java | 10 ++++++++
.../apache/ratis/server/impl/RaftServerImpl.java | 29 ++++++++--------------
.../apache/ratis/server/impl/ServerImplUtils.java | 19 +++++++++++---
3 files changed, 37 insertions(+), 21 deletions(-)
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 2538a472a..efb3c6796 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -440,6 +440,16 @@ public interface RaftServerConfigKeys {
setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY,
readLockEnabled);
}
+ String APPEND_ENTRIES_COMPOSE_ENABLED_KEY = PREFIX +
".append-entries.compose.enabled";
+ boolean APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT = true;
+ static boolean appendEntriesComposeEnabled(RaftProperties properties) {
+ return getBoolean(properties::getBoolean,
+ APPEND_ENTRIES_COMPOSE_ENABLED_KEY,
APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT, getDefaultLog());
+ }
+ static void setAppendEntriesComposeEnabled(RaftProperties properties,
boolean enabled) {
+ setBoolean(properties::setBoolean, APPEND_ENTRIES_COMPOSE_ENABLED_KEY,
enabled);
+ }
+
/**
* Besides the open segment, the max number of segments caching log
entries.
*/
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 60f72e001..846b87702 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -81,7 +81,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.LeaderElection.Phase;
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
-import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices;
import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices;
import org.apache.ratis.server.leader.LeaderState.StepDownReason;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
@@ -133,7 +132,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -260,8 +258,7 @@ class RaftServerImpl implements RaftServer.Division,
private final AtomicBoolean firstElectionSinceStartup = new
AtomicBoolean(true);
private final ThreadGroup threadGroup;
- private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
- private final NavigableIndices appendLogTermIndices = new NavigableIndices();
+ private final NavigableIndices appendLogTermIndices;
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy
proxy, RaftStorage.StartupOption option)
throws IOException {
@@ -296,7 +293,8 @@ class RaftServerImpl implements RaftServer.Division,
this.transferLeadership = new TransferLeadership(this, properties);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this,
properties);
- this.appendLogFuture = new
AtomicReference<>(CompletableFuture.completedFuture(null));
+ this.appendLogTermIndices =
RaftServerConfigKeys.Log.appendEntriesComposeEnabled(properties) ?
+ new NavigableIndices() : null;
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1620,7 +1618,8 @@ class RaftServerImpl implements RaftServer.Division,
state.updateConfiguration(entries);
}
future.join();
- final CompletableFuture<Void> appendLog = entries.isEmpty()?
CompletableFuture.completedFuture(null)
+ final CompletableFuture<Void> appendFuture = entries.isEmpty()?
CompletableFuture.completedFuture(null)
+ : appendLogTermIndices != null ? appendLogTermIndices.append(entries,
this::appendLog)
: appendLog(entries);
proto.getCommitInfosList().forEach(commitInfoCache::update);
@@ -1636,7 +1635,7 @@ class RaftServerImpl implements RaftServer.Division,
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(),
previous, entries.size());
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX:
entries.get(entries.size() - 1).getIndex();
- return appendLog.whenCompleteAsync((r, t) -> {
+ return appendFuture.whenCompleteAsync((r, t) -> {
followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
timer.stop();
}, getServerExecutor()).thenApply(v -> {
@@ -1654,16 +1653,8 @@ class RaftServerImpl implements RaftServer.Division,
});
}
private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
- final List<ConsecutiveIndices> entriesTermIndices =
ConsecutiveIndices.convert(entries);
- if (!appendLogTermIndices.append(entriesTermIndices)) {
- // index already exists, return the last future
- return appendLogFuture.get();
- }
-
-
- return appendLogFuture.updateAndGet(f -> f.thenComposeAsync(
- ignored -> JavaUtils.allOf(state.getLog().append(entries)),
serverExecutor))
- .whenComplete((v, e) ->
appendLogTermIndices.removeExisting(entriesTermIndices));
+ return CompletableFuture.completedFuture(null)
+ .thenComposeAsync(dummy ->
JavaUtils.allOf(state.getLog().append(entries)), serverExecutor);
}
private long checkInconsistentAppendEntries(TermIndex previous,
List<LogEntryProto> entries) {
@@ -1690,7 +1681,9 @@ class RaftServerImpl implements RaftServer.Division,
}
// Check if "previous" is contained in current state.
- if (previous != null && !(appendLogTermIndices.contains(previous) ||
state.containsTermIndex(previous))) {
+ if (previous != null
+ && !(appendLogTermIndices != null &&
appendLogTermIndices.contains(previous))
+ && !state.containsTermIndex(previous)) {
final long replyNextIndex = Math.min(state.getNextIndex(),
previous.getIndex());
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not
found", getMemberId(), previous);
return replyNextIndex;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 1a5fcfc85..434f98d68 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -47,7 +47,10 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
/** Server utilities for internal use. */
public final class ServerImplUtils {
@@ -119,6 +122,8 @@ public final class ServerImplUtils {
/** A data structure to support the {@link #contains(TermIndex)} method. */
static class NavigableIndices {
private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
+ private final AtomicReference<CompletableFuture<Void>> future
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
boolean contains(TermIndex ti) {
final Long term = getTerm(ti.getIndex());
@@ -137,7 +142,15 @@ public final class ServerImplUtils {
return floorEntry.getValue().getTerm(index);
}
- synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) {
+ CompletableFuture<Void> append(List<LogEntryProto> entries,
+ Function<List<LogEntryProto>, CompletableFuture<Void>> appendLog) {
+ final List<ConsecutiveIndices> entriesTermIndices =
ConsecutiveIndices.convert(entries);
+ return alreadyExists(entriesTermIndices) ? future.get()
+ : future.updateAndGet(f -> f.thenComposeAsync(ignored ->
appendLog.apply(entries)))
+ .whenComplete((v, e) -> removeExisting(entriesTermIndices));
+ }
+
+ private synchronized boolean alreadyExists(List<ConsecutiveIndices>
entriesTermIndices) {
for(int i = 0; i < entriesTermIndices.size(); i++) {
final ConsecutiveIndices indices = entriesTermIndices.get(i);
final ConsecutiveIndices previous = map.put(indices.startIndex,
indices);
@@ -147,10 +160,10 @@ public final class ServerImplUtils {
for(int j = 0; j < i; j++) {
map.remove(entriesTermIndices.get(j).startIndex);
}
- return false;
+ return true;
}
}
- return true;
+ return false;
}
synchronized void removeExisting(List<ConsecutiveIndices>
entriesTermIndices) {