This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 32e7925ee RATIS-2403. Support leader batch write to improve
linearizable follower read throughput (#1362)
32e7925ee is described below
commit 32e7925ee9aec86922d173f3922337618d918362
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Apr 6 12:42:03 2026 +0800
RATIS-2403. Support leader batch write to improve linearizable follower
read throughput (#1362)
---
.../org/apache/ratis/client/impl/OrderedAsync.java | 2 +-
ratis-docs/src/site/markdown/configurations.md | 37 +++++-
.../apache/ratis/server/RaftServerConfigKeys.java | 33 ++++-
.../apache/ratis/server/impl/LeaderStateImpl.java | 62 +++++++--
.../apache/ratis/server/impl/PendingRequests.java | 5 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 8 +-
.../org/apache/ratis/server/impl/ReplyFlusher.java | 137 +++++++++++++++++++
.../org/apache/ratis/LinearizableReadTests.java | 47 +++++--
.../org/apache/ratis/OutputStreamBaseTest.java | 4 +-
.../org/apache/ratis/ReadOnlyRequestTests.java | 17 ++-
.../TestLinearizableLeaderLeaseReadWithGrpc.java | 18 +--
...bleReadAppliedIndexLeaderLeaseReadWithGrpc.java | 6 +-
.../TestLinearizableReadAppliedIndexWithGrpc.java | 6 +-
...izableReadRepliedIndexLeaderLeaseWithGrpc.java} | 6 +-
.../TestLinearizableReadRepliedIndexWithGrpc.java | 145 +++++++++++++++++++++
.../ratis/grpc/TestLinearizableReadWithGrpc.java | 18 +--
16 files changed, 468 insertions(+), 83 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index fbeb4b992..ecf4db3dc 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -198,7 +198,7 @@ public final class OrderedAsync {
return;
}
- if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
+ if (getSlidingWindow(request).isFirst(pending.getSeqNum())) {
pending.setFirstRequest();
}
LOG.debug("{}: send* {}", client.getId(), request);
diff --git a/ratis-docs/src/site/markdown/configurations.md
b/ratis-docs/src/site/markdown/configurations.md
index 67e988348..f5189ed86 100644
--- a/ratis-docs/src/site/markdown/configurations.md
+++ b/ratis-docs/src/site/markdown/configurations.md
@@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer
within this specified ti
### Read Index - Configurations related to ReadIndex used in linearizable read
-| **Property** | `raft.server.read.read-index.applied-index.enabled`
|
-|:----------------|:----------------------------------------------------------------------|
-| **Description** | whether applied index (instead of commit index) is used
for ReadIndex |
-| **Type** | boolean
|
-| **Default** | false
|
+| **Property** | `raft.server.read.read-index.type`
|
+|:----------------|:-----------------------------------------------------------------------------|
+| **Description** | type of read index returned
|
+| **Type** | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`,
`APPLIED_INDEX`, `REPLIED_INDEX` |
+| **Default** | `Read.ReadIndex.Type.COMMIT_INDEX`
|
+
+* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft
Paper section 6.4)
+ * The safest type as it is specified in the Raft dissertation
+ * This ReadIndex type can be chosen if the base linearizable read from
followers performance already meets expectations.
+
+* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex
+ * Allow leader to return AppliedIndex (instead of CommitIndex) as the
ReadIndex
+ * This reduces the time follower applying logs up to ReadIndex since
AppliedIndex ≤ CommitIndex
+ * This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX`
read latency is too high.
+
+* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex
+ * RepliedIndex is defined as the last AppliedIndex of the leader when
returning the last batch.
+ * Leader delays replying write requests and only reply them every write
batch boundary configurable by
`raft.server.read.read-index.replied-index.batch-interval`.
+ * This allows the ReadIndex to advance in a coarser, less frequent steps,
so followers are more likely to have already applied past the ReadIndex when a
read arrives.
+ * This is most effective on read-heavy, follower-read workloads which
prioritizes overall read throughput without consistency sacrifice.
+ * There is a trade-off in increased write latency (up to one
`raft.server.read.read-index.replied-index.batch-interval`) per write.
+ * RepliedIndex still guarantees linearizability (no stale read) since by
definition each ReadIndex returns the index of the last replied request.
+ * If the RepliedIndex is set to 0, the behavior is identical to
`Read.ReadIndex.Type.APPLIED_INDEX`
+
+Note that theoretically all the ReadIndex types still guarantee
linearizability,
+but there are tradeoffs (e.g. Write and Read performance) between different
types.
+
+| **Property** | `raft.server.read.read-index.replied-index.batch-interval`
|
+|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------|
+| **Description** | if `Read.ReadIndex.Type` is `REPLIED_INDEX`, the interval
at which held write replies are flushed to clients and `repliedIndex` is
advanced |
+| **Type** | TimeDuration
|
+| **Default** | 10ms
|
| **Property** | `raft.server.read.leader.heartbeat-check.enabled` |
|:----------------|:--------------------------------------------------|
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index ef16f67f6..2d5559478 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -280,15 +280,34 @@ public interface RaftServerConfigKeys {
interface ReadIndex {
String PREFIX = Read.PREFIX + ".read-index";
- String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled";
- boolean APPLIED_INDEX_ENABLED_DEFAULT = false;
- static boolean appliedIndexEnabled(RaftProperties properties) {
- return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY,
- APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog());
+ enum Type {
+ /** ReadIndex returns leader's commitIndex (see Raft Paper section
6.4). */
+ COMMIT_INDEX,
+
+ /** ReadIndex returns leader's appliedIndex to reduce the ReadIndex
latency. */
+ APPLIED_INDEX,
+
+ /** ReadIndex returns leader's repliedIndex, the index of the last
replied request. */
+ REPLIED_INDEX
+ }
+
+ String TYPE_KEY = PREFIX + ".type";
+ Type TYPE_DEFAULT = Type.COMMIT_INDEX;
+ static Type type(RaftProperties properties) {
+ return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT,
getDefaultLog());
+ }
+ static void setType(RaftProperties properties, Type type) {
+ set(properties::setEnum, TYPE_KEY, type);
}
- static void setAppliedIndexEnabled(RaftProperties properties, boolean
enabled) {
- setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled);
+ String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX +
".replied-index.batch-interval";
+ TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT =
TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
+ static TimeDuration repliedIndexBatchInterval(RaftProperties properties)
{
+ return
getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()),
+ REPLIED_INDEX_BATCH_INTERVAL_KEY,
REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog());
+ }
+ static void setRepliedIndexBatchInterval(RaftProperties properties,
TimeDuration interval) {
+ setTimeDuration(properties::setTimeDuration,
REPLIED_INDEX_BATCH_INTERVAL_KEY, interval);
}
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index ef0bb6b70..1c986ca63 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -39,6 +39,7 @@ import
org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
@@ -82,6 +83,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -353,10 +355,13 @@ class LeaderStateImpl implements LeaderState {
private final PendingStepDown pendingStepDown;
private final ReadIndexHeartbeats readIndexHeartbeats;
- private final boolean readIndexAppliedIndexEnabled;
+ private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType;
+ private final Supplier<Long> readIndexSupplier;
private final boolean leaderHeartbeatCheckEnabled;
private final LeaderLease lease;
+ private ReplyFlusher replyFlusher;
+
LeaderStateImpl(RaftServerImpl server) {
this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(),
getClass());
this.server = server;
@@ -391,8 +396,21 @@ class LeaderStateImpl implements LeaderState {
} else {
this.followerMaxGapThreshold = (long) (followerGapRatioMax *
maxPendingRequests);
}
- this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex
- .appliedIndexEnabled(properties);
+
+ this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties);
+ switch (readIndexType) {
+ case REPLIED_INDEX:
+ this.replyFlusher = new ReplyFlusher(server.getId(),
state.getLastAppliedIndex(),
+
RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties));
+ readIndexSupplier = replyFlusher::getRepliedIndex;
+ break;
+ case APPLIED_INDEX:
+ readIndexSupplier = () -> server.getState().getLastAppliedIndex();
+ break;
+ case COMMIT_INDEX:
+ default:
+ readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex();
+ }
this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read
.leaderHeartbeatCheckEnabled(properties);
@@ -418,6 +436,11 @@ class LeaderStateImpl implements LeaderState {
// Initialize startup log entry and append it to the RaftLog
startupLogEntry.get();
processor.start();
+
+ if (replyFlusher != null) {
+ replyFlusher.start(startupLogEntry.get().startIndex);
+ }
+
senders.forEach(LogAppender::start);
}
@@ -453,6 +476,9 @@ class LeaderStateImpl implements LeaderState {
startupLogEntry.get().getAppliedIndexFuture().completeExceptionally(
new ReadIndexException("failed to obtain read index since: ", nle));
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
+ if (replyFlusher != null) {
+ replyFlusher.stop();
+ }
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
@@ -620,7 +646,7 @@ class LeaderStateImpl implements LeaderState {
final boolean initializing = !isCaughtUp(follower);
final RaftPeerId targetId = follower.getId();
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(),
targetId, getCurrentTerm(), entries,
- ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(),
previous, entries.size()),
+ ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(),
previous, entries.size()),
initializing, previous, server.getCommitInfos(), callId);
}
@@ -1140,23 +1166,21 @@ class LeaderStateImpl implements LeaderState {
/**
* Obtain the current readIndex for read only requests. See Raft paper
section 6.4.
* 1. Leader makes sure at least one log from current term is committed.
- * 2. Leader record last committed index or applied index (depending on
configuration) as readIndex.
+ * 2. Leader record last committed index or applied index or replied index
(depending on configuration) as readIndex.
* 3. Leader broadcast heartbeats to followers and waits for
acknowledgements.
* 4. If majority respond success, returns readIndex.
* @return current readIndex.
*/
CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
- final long index = readIndexAppliedIndexEnabled ?
- server.getState().getLastAppliedIndex() :
server.getRaftLog().getLastCommittedIndex();
+ final long index = readIndexSupplier.get();
final long readIndex;
if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex
> index) {
readIndex = readAfterWriteConsistentIndex;
} else {
readIndex = index;
}
- LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})",
- readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit",
- index, readAfterWriteConsistentIndex);
+ LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})",
+ readIndex, readIndexType, index, readAfterWriteConsistentIndex);
// if group contains only one member, fast path
if (server.getRaftConf().isSingleton()) {
@@ -1217,8 +1241,22 @@ class LeaderStateImpl implements LeaderState {
&& (server.getRaftConf().isSingleton() || lease.isValid());
}
- void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
- pendingRequests.replyPendingRequest(termIndex, reply);
+ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply,
RetryCacheImpl.CacheEntry cacheEntry) {
+ final PendingRequest pending = pendingRequests.remove(termIndex);
+
+ final LongSupplier replyMethod = () -> {
+ cacheEntry.updateResult(reply);
+ if (pending != null) {
+ pending.setReply(reply);
+ }
+ return termIndex.getIndex();
+ };
+
+ if (readIndexType == Type.REPLIED_INDEX) {
+ replyFlusher.hold(replyMethod);
+ } else {
+ replyMethod.getAsLong();
+ }
}
TransactionContext getTransactionContext(TermIndex termIndex) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index c6a9dd279..f89d354e6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -264,12 +264,13 @@ class PendingRequests {
return pendingRequest != null ? pendingRequest.getEntry() : null;
}
- void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
+ /** @return the removed the {@link PendingRequest} for the given {@link
TermIndex}. */
+ PendingRequest remove(TermIndex termIndex) {
final PendingRequest pending = pendingRequests.remove(termIndex);
if (pending != null) {
Preconditions.assertEquals(termIndex, pending.getTermIndex(),
"termIndex");
- pending.setReply(reply);
}
+ return pending;
}
/**
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 958da846d..d4c6f164e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1837,8 +1837,12 @@ class RaftServerImpl implements RaftServer.Division,
}
// update pending request
- role.getLeaderState().ifPresent(leader ->
leader.replyPendingRequest(termIndex, r));
- cacheEntry.updateResult(r);
+ final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+ if (leader != null) {
+ leader.replyPendingRequest(termIndex, r, cacheEntry);
+ } else {
+ cacheEntry.updateResult(r);
+ }
});
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java
new file mode 100644
index 000000000..47e9967c1
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+
+/**
+ * Implements the reply flush logic as part of the leader batch write when
RepliedIndex is used.
+ */
+public class ReplyFlusher {
+ static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class);
+
+ private static final String CLASS_NAME =
JavaUtils.getClassSimpleName(RaftServerImpl.class);
+ public static final String FLUSH = CLASS_NAME + ".flush";
+
+ static class Replies {
+ /** When a {@link LongSupplier} is invoked, it completes a write reply and
return the log index. */
+ private LinkedList<LongSupplier> list = new LinkedList<>();
+
+ synchronized void add(LongSupplier replyMethod) {
+ list.add(replyMethod);
+ }
+
+ synchronized LinkedList<LongSupplier> getAndSetNewList() {
+ final LinkedList<LongSupplier> old = list;
+ list = new LinkedList<>();
+ return old;
+ }
+ }
+
+ private final Object id;
+ private final LifeCycle lifeCycle;
+ private final Daemon daemon;
+ private final Replies replies = new Replies();
+ private final RaftLogIndex repliedIndex;
+ /** The interval at which held write replies are flushed. */
+ private final TimeDuration batchInterval;
+
+ ReplyFlusher(Object id, long repliedIndex, TimeDuration batchInterval) {
+ this.id = id;
+ final String name = id + "-ReplyFlusher";
+ this.lifeCycle = new LifeCycle(name);
+ this.daemon = Daemon.newBuilder()
+ .setName(name)
+ .setRunnable(this::run)
+ .build();
+ this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex);
+ this.batchInterval = batchInterval;
+ }
+
+ long getRepliedIndex() {
+ return repliedIndex.get();
+ }
+
+ /** Hold a write reply for later batch flushing */
+ void hold(LongSupplier replyMethod) {
+ replies.add(replyMethod);
+ }
+
+ void start(long startIndex) {
+ repliedIndex.updateToMax(startIndex, s -> LOG.debug("{}: {}", id, s));
+ lifeCycle.transition(LifeCycle.State.STARTING);
+ // We need to transition to RUNNING first so that ReplyFlusher#run always
+ // see that the lifecycle state is in RUNNING state.
+ lifeCycle.transition(LifeCycle.State.RUNNING);
+ daemon.start();
+ }
+
+ /** The reply flusher daemon loop. */
+ private void run() {
+ try {
+ while (lifeCycle.getCurrentState() == LifeCycle.State.RUNNING) {
+ batchInterval.sleep();
+ flush();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("{}: Interrupted ", daemon.getName(), e);
+ } finally {
+ // Flush remaining on exit
+ flush();
+ }
+ }
+
+ /** Flush all held replies and advance {@link #repliedIndex}. */
+ private void flush() {
+ CodeInjectionForTesting.execute(FLUSH, id, null);
+
+ final LinkedList<LongSupplier> toFlush = replies.getAndSetNewList();
+ if (toFlush.isEmpty()) {
+ return;
+ }
+ long maxIndex = toFlush.removeLast().getAsLong();
+ for (LongSupplier held : toFlush) {
+ maxIndex = Math.max(maxIndex, held.getAsLong());
+ }
+ repliedIndex.updateToMax(maxIndex, s ->
+ LOG.debug("{}: flushed {} replies, {}", id, toFlush.size(), s));
+ }
+
+ /** Stop the reply flusher daemon. */
+ void stop() {
+ lifeCycle.checkStateAndClose();
+ daemon.interrupt();
+ try {
+ daemon.join(batchInterval.toLong(TimeUnit.MILLISECONDS )* 2);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
index b15ae3067..dd536508c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
@@ -18,6 +18,7 @@
package org.apache.ratis;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
@@ -27,6 +28,7 @@ import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
@@ -45,9 +47,12 @@ import static
org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine;
import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT;
import static org.apache.ratis.ReadOnlyRequestTests.QUERY;
import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT;
+import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast;
import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
import static
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
/** Test for the {@link RaftServerConfigKeys.Read.Option#LINEARIZABLE}
feature. */
public abstract class LinearizableReadTests<CLUSTER extends MiniRaftCluster>
@@ -56,15 +61,20 @@ public abstract class LinearizableReadTests<CLUSTER extends
MiniRaftCluster>
{
Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+ Slf4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
public abstract boolean isLeaderLeaseEnabled();
- public abstract boolean readIndexAppliedIndexEnabled();
+ public abstract Type readIndexType();
- public abstract void assertRaftProperties(RaftProperties properties);
+ public final void assertRaftProperties(RaftProperties p) {
+ assertOption(LINEARIZABLE, p);
+ assertEquals(isLeaderLeaseEnabled(),
RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
+ assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p));
+ }
- void runWithNewCluster(CheckedConsumer<CLUSTER, Exception> testCase) throws
Exception {
+ protected void runWithNewCluster(CheckedConsumer<CLUSTER, Exception>
testCase) throws Exception {
runWithNewCluster(3, 0, true, cluster -> {
assertRaftProperties(cluster.getProperties());
testCase.accept(cluster);
@@ -77,7 +87,11 @@ public abstract class LinearizableReadTests<CLUSTER extends
MiniRaftCluster>
CounterStateMachine.setProperties(p);
RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
- RaftServerConfigKeys.Read.ReadIndex.setAppliedIndexEnabled(p,
readIndexAppliedIndexEnabled());
+ RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
+ // Disable dummy request since currently the request is implemented as a
watch request
+ // which can cause follower client to trigger failover to leader which
will cause the
+ // all reads to be sent to the leader, making the follower read moot.
+ RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, false);
}
@Test
@@ -95,22 +109,34 @@ public abstract class LinearizableReadTests<CLUSTER
extends MiniRaftCluster>
runWithNewCluster(LinearizableReadTests::runTestFollowerLinearizableRead);
}
- static class Reply {
+ public static class Reply {
private final int count;
private final CompletableFuture<RaftClientReply> future;
- Reply(int count, CompletableFuture<RaftClientReply> future) {
+ public Reply(int count, CompletableFuture<RaftClientReply> future) {
this.count = count;
this.future = future;
}
- void assertExact() {
+ public boolean isDone() {
+ return future.isDone();
+ }
+
+ public void assertExact() {
assertReplyExact(count, future.join());
}
- void assertAtLeast() {
+ public void assertAtLeast() {
assertReplyAtLeast(count, future.join());
}
+
+ @Override
+ public String toString() {
+ return "Reply{" +
+ "count=" + count +
+ ", reply=" + (isDone() ? future.join() : "pending") +
+ '}';
+ }
}
static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C
cluster) throws Exception {
@@ -167,8 +193,9 @@ public abstract class LinearizableReadTests<CLUSTER extends
MiniRaftCluster>
count++;
writeReplies.add(new Reply(count,
leaderClient.async().send(WAIT_AND_INCREMENT)));
+ // sleep to let the commitIndex/appliedIndex get updated.
Thread.sleep(100);
-
+ // WAIT_AND_INCREMENT will delay 500ms to update the count, the read
must wait for it.
assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY,
f1)));
}
@@ -189,7 +216,7 @@ public abstract class LinearizableReadTests<CLUSTER extends
MiniRaftCluster>
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
final List<RaftServer.Division> followers = cluster.getFollowers();
- Assertions.assertEquals(2, followers.size());
+ assertEquals(2, followers.size());
final RaftPeerId f0 = followers.get(0).getId();
try (RaftClient leaderClient = cluster.createClient(leaderId);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index a17cdb0d5..9821126ce 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -300,8 +300,10 @@ public abstract class OutputStreamBaseTest<CLUSTER extends
MiniRaftCluster>
Thread.sleep(500);
running.set(false);
- latch.await(5, TimeUnit.SECONDS);
+ final boolean latchCompleted = latch.await(5, TimeUnit.SECONDS);
+ Assertions.assertTrue(latchCompleted, "Writer thread did not finish within
the timeout");
LOG.info("Writer success? " + success.get());
+ Assertions.assertNotNull(success.get(), "Writer thread completed but
success was not set");
Assertions.assertTrue(success.get());
// total number of tx should be >= result + 2, where 2 means two NoOp from
// leaders. It may be larger than result+2 because the client may resend
diff --git
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index aa77ee5c7..94e9433b1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -58,9 +58,9 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
static final String WAIT_AND_INCREMENT_STRING = "WAIT_AND_INCREMENT";
static final String QUERY_STRING = "QUERY";
- static final Message INCREMENT = new
RaftTestUtil.SimpleMessage(INCREMENT_STRING);
- static final Message WAIT_AND_INCREMENT = new
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING);
- static final Message QUERY = new RaftTestUtil.SimpleMessage(QUERY_STRING);
+ public static final Message INCREMENT = new
RaftTestUtil.SimpleMessage(INCREMENT_STRING);
+ public static final Message WAIT_AND_INCREMENT = new
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING);
+ public static final Message QUERY = new
RaftTestUtil.SimpleMessage(QUERY_STRING);
@BeforeEach
public void setup() {
@@ -144,7 +144,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
return
Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
}
- static void assertReplyExact(int expectedCount, RaftClientReply reply) {
+ public static void assertReplyExact(int expectedCount, RaftClientReply
reply) {
Assertions.assertTrue(reply.isSuccess());
final int retrieved = retrieve(reply);
Assertions.assertEquals(expectedCount, retrieved, () -> "reply=" + reply);
@@ -163,7 +163,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
* 2. get
* 3. waitAndIncrement
*/
- static class CounterStateMachine extends BaseStateMachine {
+ public static class CounterStateMachine extends BaseStateMachine {
static void setProperties(RaftProperties properties) {
properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
CounterStateMachine.class, StateMachine.class);
}
@@ -193,6 +193,10 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
}
}
+ public long getCount() {
+ return counter.get();
+ }
+
private long increment() {
return counter.incrementAndGet();
}
@@ -213,6 +217,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
final LogEntryProto logEntry = trx.getLogEntry();
final TermIndex ti = TermIndex.valueOf(logEntry);
updateLastAppliedTermIndex(ti);
+ LOG.info("{}: updateLastAppliedTermIndex {}", getId(), ti);
final String command =
logEntry.getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8);
@@ -224,7 +229,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
} else {
updatedCount = timeoutIncrement();
}
- LOG.info("Applied {} command {}, updatedCount={}", ti, command,
updatedCount);
+ LOG.info("{}: Applied {} command {}, updatedCount={}", getId(), ti,
command, updatedCount);
return toMessageFuture(updatedCount);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
index d637498d7..120cce48c 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
@@ -18,12 +18,7 @@
package org.apache.ratis.grpc;
import org.apache.ratis.LinearizableReadTests;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.RaftServerConfigKeys;
-
-import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
-import static
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
public class TestLinearizableLeaderLeaseReadWithGrpc
extends LinearizableReadTests<MiniRaftClusterWithGrpc>
@@ -35,14 +30,7 @@ public class TestLinearizableLeaderLeaseReadWithGrpc
}
@Override
- public boolean readIndexAppliedIndexEnabled() {
- return false;
- }
-
- @Override
- public void assertRaftProperties(RaftProperties p) {
- assertOption(LINEARIZABLE, p);
- assertTrue(RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
- assertTrue(isLeaderLeaseEnabled());
+ public Type readIndexType() {
+ return Type.COMMIT_INDEX;
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
index 9bf3e307b..3705fb3ff 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
@@ -17,11 +17,13 @@
*/
package org.apache.ratis.grpc;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
+
public class TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc
extends TestLinearizableLeaderLeaseReadWithGrpc {
@Override
- public boolean readIndexAppliedIndexEnabled() {
- return true;
+ public Type readIndexType() {
+ return Type.APPLIED_INDEX;
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
index c019aac16..b119f32a6 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
@@ -17,11 +17,13 @@
*/
package org.apache.ratis.grpc;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
+
public class TestLinearizableReadAppliedIndexWithGrpc
extends TestLinearizableReadWithGrpc {
@Override
- public boolean readIndexAppliedIndexEnabled() {
- return true;
+ public Type readIndexType() {
+ return Type.APPLIED_INDEX;
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java
similarity index 84%
copy from
ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
copy to
ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java
index c019aac16..bb50eafbf 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java
@@ -17,11 +17,11 @@
*/
package org.apache.ratis.grpc;
-public class TestLinearizableReadAppliedIndexWithGrpc
- extends TestLinearizableReadWithGrpc {
+public class TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc
+ extends TestLinearizableReadRepliedIndexWithGrpc {
@Override
- public boolean readIndexAppliedIndexEnabled() {
+ public boolean isLeaderLeaseEnabled() {
return true;
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java
new file mode 100644
index 000000000..f08346fc0
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReplyFlusher;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT;
+import static org.apache.ratis.ReadOnlyRequestTests.QUERY;
+import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestLinearizableReadRepliedIndexWithGrpc
+ extends TestLinearizableReadWithGrpc {
+
+ @Override
+ public Type readIndexType() {
+ return Type.REPLIED_INDEX;
+ }
+
+ @Test
+ @Override
+ public void testFollowerLinearizableReadParallel() throws Exception {
+
runWithNewCluster(TestLinearizableReadRepliedIndexWithGrpc::runTestFollowerReadOnlyParallelRepliedIndex);
+ }
+
+ static <C extends MiniRaftCluster> void
runTestFollowerReadOnlyParallelRepliedIndex(C cluster)
+ throws Exception {
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final CounterStateMachine leaderStateMachine =
(CounterStateMachine)leader.getStateMachine();
+
+ final List<RaftServer.Division> followers = cluster.getFollowers();
+ Assertions.assertEquals(2, followers.size());
+ final RaftPeerId f0 = followers.get(0).getId();
+ final RaftPeerId f1 = followers.get(1).getId();
+
+ final BlockingCode blockingReplyFlusher = new BlockingCode();
+
+ try (RaftClient leaderClient = cluster.createClient(leader.getId());
+ RaftClient f0Client = cluster.createClient(f0);
+ RaftClient f1Client = cluster.createClient(f1)) {
+ // Warm up the clients first before blocking the reply flusher
+ assertReplyExact(0, leaderClient.async().sendReadOnly(QUERY).get());
+ assertReplyExact(0, f0Client.async().sendReadOnly(QUERY, f0).get());
+ assertReplyExact(0, f1Client.async().sendReadOnly(QUERY, f1).get());
+
+ CodeInjectionForTesting.put(ReplyFlusher.FLUSH, blockingReplyFlusher);
+
+ final int n = 10;
+ final List<Reply> writeReplies = new ArrayList<>(n);
+ final List<Reply> f0Replies = new ArrayList<>(n);
+ final List<Reply> f1Replies = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ final int count = i + 1;
+ writeReplies.add(new Reply(count,
leaderClient.async().send(INCREMENT)));
+
+ // Read reply returns immediately, but they all should return 0 since
the repliedIndex has not been updated
+ // and write operations should not been applied by the followers
+ f0Replies.add(new Reply(0, f0Client.async().sendReadOnly(QUERY, f0)));
+ f1Replies.add(new Reply(0, f1Client.async().sendReadOnly(QUERY, f1)));
+
+ // sleep in order to make sure
+ // (1) the count is incremented, and
+ // (2) the reads will wait for the repliedIndex.
+ Thread.sleep(100);
+ assertEquals(count, leaderStateMachine.getCount());
+ }
+
+ for (int i = 0; i < n; i++) {
+ // Write reply should not yet complete since ReplyFlusher remains
blocked.
+ assertFalse(writeReplies.get(i).isDone(), "Received unexpected Write
reply " + writeReplies.get(i));
+
+ // Follower reads should be immediately served, but the read value
should return the value before the
+ // replyFlusher is blocked
+ assertTrue(f0Replies.get(i).isDone(), "Follower read should return
immediately");
+ f0Replies.get(i).assertExact();
+ assertTrue(f1Replies.get(i).isDone(), "Follower read should return
immediately");
+ f1Replies.get(i).assertExact();
+ }
+
+ // unblock ReplyFlusher
+ blockingReplyFlusher.complete();
+ assertReplyExact(n, f0Client.io().sendReadOnly(QUERY, f0));
+ assertReplyExact(n, f1Client.io().sendReadOnly(QUERY, f0));
+
+ for (int i = 0; i < n; i++) {
+ //write reply should get the exact count at the write time
+ writeReplies.get(i).assertExact();
+ }
+ }
+ }
+
+ static class BlockingCode implements CodeInjectionForTesting.Code {
+ private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+ void complete() {
+ future.complete(null);
+ }
+
+ @Override
+ public boolean execute(Object localId, Object remoteId, Object... args) {
+ final boolean blocked = !future.isDone();
+ if (blocked) {
+ LOG.info("{}: ReplyFlusher is blocked", localId, new Throwable());
+ }
+ future.join();
+ if (blocked) {
+ LOG.info("{}: ReplyFlusher is unblocked", localId);
+ }
+ return true;
+ }
+ }
+
+
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
index 3e8860dd1..77593ff85 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
@@ -18,12 +18,7 @@
package org.apache.ratis.grpc;
import org.apache.ratis.LinearizableReadTests;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.RaftServerConfigKeys;
-
-import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
-import static
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
public class TestLinearizableReadWithGrpc
extends LinearizableReadTests<MiniRaftClusterWithGrpc>
@@ -35,14 +30,7 @@ public class TestLinearizableReadWithGrpc
}
@Override
- public boolean readIndexAppliedIndexEnabled() {
- return false;
- }
-
- @Override
- public void assertRaftProperties(RaftProperties p) {
- assertOption(LINEARIZABLE, p);
- assertFalse(RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
- assertFalse(isLeaderLeaseEnabled());
+ public Type readIndexType() {
+ return Type.COMMIT_INDEX;
}
}