This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e7925ee RATIS-2403. Support leader batch write to improve 
linearizable follower read throughput (#1362)
32e7925ee is described below

commit 32e7925ee9aec86922d173f3922337618d918362
Author: Ivan Andika <[email protected]>
AuthorDate: Mon Apr 6 12:42:03 2026 +0800

    RATIS-2403. Support leader batch write to improve linearizable follower 
read throughput (#1362)
---
 .../org/apache/ratis/client/impl/OrderedAsync.java |   2 +-
 ratis-docs/src/site/markdown/configurations.md     |  37 +++++-
 .../apache/ratis/server/RaftServerConfigKeys.java  |  33 ++++-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  62 +++++++--
 .../apache/ratis/server/impl/PendingRequests.java  |   5 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |   8 +-
 .../org/apache/ratis/server/impl/ReplyFlusher.java | 137 +++++++++++++++++++
 .../org/apache/ratis/LinearizableReadTests.java    |  47 +++++--
 .../org/apache/ratis/OutputStreamBaseTest.java     |   4 +-
 .../org/apache/ratis/ReadOnlyRequestTests.java     |  17 ++-
 .../TestLinearizableLeaderLeaseReadWithGrpc.java   |  18 +--
 ...bleReadAppliedIndexLeaderLeaseReadWithGrpc.java |   6 +-
 .../TestLinearizableReadAppliedIndexWithGrpc.java  |   6 +-
 ...izableReadRepliedIndexLeaderLeaseWithGrpc.java} |   6 +-
 .../TestLinearizableReadRepliedIndexWithGrpc.java  | 145 +++++++++++++++++++++
 .../ratis/grpc/TestLinearizableReadWithGrpc.java   |  18 +--
 16 files changed, 468 insertions(+), 83 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index fbeb4b992..ecf4db3dc 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -198,7 +198,7 @@ public final class OrderedAsync {
       return;
     }
 
-    if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
+    if (getSlidingWindow(request).isFirst(pending.getSeqNum())) {
       pending.setFirstRequest();
     }
     LOG.debug("{}: send* {}", client.getId(), request);
diff --git a/ratis-docs/src/site/markdown/configurations.md 
b/ratis-docs/src/site/markdown/configurations.md
index 67e988348..f5189ed86 100644
--- a/ratis-docs/src/site/markdown/configurations.md
+++ b/ratis-docs/src/site/markdown/configurations.md
@@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer 
within this specified ti
 
 ### Read Index - Configurations related to ReadIndex used in linearizable read 
 
-| **Property**    | `raft.server.read.read-index.applied-index.enabled`        
           |
-|:----------------|:----------------------------------------------------------------------|
-| **Description** | whether applied index (instead of commit index) is used 
for ReadIndex |
-| **Type**        | boolean                                                    
           |
-| **Default**     | false                                                      
           |
+| **Property**    | `raft.server.read.read-index.type`                         
                  |
+|:----------------|:-----------------------------------------------------------------------------|
+| **Description** | type of read index returned                                
                  |
+| **Type**        | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`, 
`APPLIED_INDEX`, `REPLIED_INDEX` |
+| **Default**     | `Read.ReadIndex.Type.COMMIT_INDEX`                         
                  |
+
+* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft 
Paper section 6.4)
+    * The safest type as it is specified in the Raft dissertation
+    * This ReadIndex type can be chosen if the base linearizable read from 
followers performance already meets expectations.
+
+* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex
+    * Allow leader to return AppliedIndex (instead of CommitIndex) as the 
ReadIndex
+    * This reduces the time follower applying logs up to ReadIndex since 
AppliedIndex ≤ CommitIndex
+    * This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` 
read latency is too high.
+
+* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex
+    * RepliedIndex is defined as the last AppliedIndex of the leader when 
returning the last batch.
+    * Leader delays replying write requests and only reply them every write 
batch boundary configurable by 
`raft.server.read.read-index.replied-index.batch-interval`.
+    * This allows the ReadIndex to advance in a coarser, less frequent steps, 
so followers are more likely to have already applied past the ReadIndex when a 
read arrives.
+    * This is most effective on read-heavy, follower-read workloads which 
prioritizes overall read throughput without consistency sacrifice.
+    * There is a trade-off in increased write latency (up to one 
`raft.server.read.read-index.replied-index.batch-interval`) per write.
+    * RepliedIndex still guarantees linearizability (no stale read) since by 
definition each ReadIndex returns the index of the last replied request.
+    * If the RepliedIndex is set to 0, the behavior is identical to 
`Read.ReadIndex.Type.APPLIED_INDEX`
+
+Note that theoretically all the ReadIndex types still guarantee 
linearizability, 
+but there are tradeoffs (e.g. Write and Read performance) between different 
types.
+
+| **Property**    | `raft.server.read.read-index.replied-index.batch-interval` 
                                                                                
 |
+|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------|
+| **Description** | if `Read.ReadIndex.Type` is `REPLIED_INDEX`, the interval 
at which held write replies are flushed to clients and `repliedIndex` is 
advanced |
+| **Type**        | TimeDuration                                               
                                                                                
 |
+| **Default**     | 10ms                                                       
                                                                                
 |
 
 | **Property**    | `raft.server.read.leader.heartbeat-check.enabled` |
 |:----------------|:--------------------------------------------------|
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index ef16f67f6..2d5559478 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -280,15 +280,34 @@ public interface RaftServerConfigKeys {
     interface ReadIndex {
       String PREFIX = Read.PREFIX + ".read-index";
 
-      String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled";
-      boolean APPLIED_INDEX_ENABLED_DEFAULT = false;
-      static boolean appliedIndexEnabled(RaftProperties properties) {
-        return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY,
-            APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog());
+      enum Type {
+        /** ReadIndex returns leader's commitIndex (see Raft Paper section 
6.4). */
+        COMMIT_INDEX,
+
+        /** ReadIndex returns leader's appliedIndex to reduce the ReadIndex 
latency. */
+        APPLIED_INDEX,
+
+        /** ReadIndex returns leader's repliedIndex, the index of the last 
replied request. */
+        REPLIED_INDEX
+      }
+
+      String TYPE_KEY = PREFIX + ".type";
+      Type TYPE_DEFAULT = Type.COMMIT_INDEX;
+      static Type type(RaftProperties properties) {
+        return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT, 
getDefaultLog());
+      }
+      static void setType(RaftProperties properties, Type type) {
+        set(properties::setEnum, TYPE_KEY, type);
       }
 
-      static void setAppliedIndexEnabled(RaftProperties properties, boolean 
enabled) {
-        setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled);
+      String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX + 
".replied-index.batch-interval";
+      TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT = 
TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
+      static TimeDuration repliedIndexBatchInterval(RaftProperties properties) 
{
+        return 
getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()),
+            REPLIED_INDEX_BATCH_INTERVAL_KEY, 
REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog());
+      }
+      static void setRepliedIndexBatchInterval(RaftProperties properties, 
TimeDuration interval) {
+        setTimeDuration(properties::setTimeDuration, 
REPLIED_INDEX_BATCH_INTERVAL_KEY, interval);
       }
     }
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index ef0bb6b70..1c986ca63 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.ReadIndexException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
 import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
@@ -82,6 +83,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -353,10 +355,13 @@ class LeaderStateImpl implements LeaderState {
   private final PendingStepDown pendingStepDown;
 
   private final ReadIndexHeartbeats readIndexHeartbeats;
-  private final boolean readIndexAppliedIndexEnabled;
+  private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType;
+  private final Supplier<Long> readIndexSupplier;
   private final boolean leaderHeartbeatCheckEnabled;
   private final LeaderLease lease;
 
+  private ReplyFlusher replyFlusher;
+
   LeaderStateImpl(RaftServerImpl server) {
     this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), 
getClass());
     this.server = server;
@@ -391,8 +396,21 @@ class LeaderStateImpl implements LeaderState {
     } else {
       this.followerMaxGapThreshold = (long) (followerGapRatioMax * 
maxPendingRequests);
     }
-    this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex
-        .appliedIndexEnabled(properties);
+
+    this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties);
+    switch (readIndexType) {
+    case REPLIED_INDEX:
+      this.replyFlusher = new ReplyFlusher(server.getId(), 
state.getLastAppliedIndex(),
+          
RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties));
+      readIndexSupplier = replyFlusher::getRepliedIndex;
+      break;
+    case APPLIED_INDEX:
+      readIndexSupplier = () -> server.getState().getLastAppliedIndex();
+      break;
+    case COMMIT_INDEX:
+    default:
+      readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex();
+    }
     this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read
         .leaderHeartbeatCheckEnabled(properties);
 
@@ -418,6 +436,11 @@ class LeaderStateImpl implements LeaderState {
     // Initialize startup log entry and append it to the RaftLog
     startupLogEntry.get();
     processor.start();
+
+    if (replyFlusher != null) {
+      replyFlusher.start(startupLogEntry.get().startIndex);
+    }
+
     senders.forEach(LogAppender::start);
   }
 
@@ -453,6 +476,9 @@ class LeaderStateImpl implements LeaderState {
     startupLogEntry.get().getAppliedIndexFuture().completeExceptionally(
         new ReadIndexException("failed to obtain read index since: ", nle));
     server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
+    if (replyFlusher != null) {
+      replyFlusher.stop();
+    }
     logAppenderMetrics.unregister();
     raftServerMetrics.unregister();
     pendingRequests.close();
@@ -620,7 +646,7 @@ class LeaderStateImpl implements LeaderState {
     final boolean initializing = !isCaughtUp(follower);
     final RaftPeerId targetId = follower.getId();
     return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), 
targetId, getCurrentTerm(), entries,
-        ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), 
previous, entries.size()),
+        ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(), 
previous, entries.size()),
         initializing, previous, server.getCommitInfos(), callId);
   }
 
@@ -1140,23 +1166,21 @@ class LeaderStateImpl implements LeaderState {
   /**
    * Obtain the current readIndex for read only requests. See Raft paper 
section 6.4.
    * 1. Leader makes sure at least one log from current term is committed.
-   * 2. Leader record last committed index or applied index (depending on 
configuration) as readIndex.
+   * 2. Leader record last committed index or applied index or replied index 
(depending on configuration) as readIndex.
    * 3. Leader broadcast heartbeats to followers and waits for 
acknowledgements.
    * 4. If majority respond success, returns readIndex.
    * @return current readIndex.
    */
   CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
-    final long index = readIndexAppliedIndexEnabled ?
-        server.getState().getLastAppliedIndex() : 
server.getRaftLog().getLastCommittedIndex();
+    final long index = readIndexSupplier.get();
     final long readIndex;
     if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex 
> index) {
       readIndex = readAfterWriteConsistentIndex;
     } else {
       readIndex = index;
     }
-    LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})",
-        readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit",
-        index, readAfterWriteConsistentIndex);
+    LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})",
+        readIndex, readIndexType, index, readAfterWriteConsistentIndex);
 
     // if group contains only one member, fast path
     if (server.getRaftConf().isSingleton()) {
@@ -1217,8 +1241,22 @@ class LeaderStateImpl implements LeaderState {
         && (server.getRaftConf().isSingleton() || lease.isValid());
   }
 
-  void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
-    pendingRequests.replyPendingRequest(termIndex, reply);
+  void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, 
RetryCacheImpl.CacheEntry cacheEntry) {
+    final PendingRequest pending = pendingRequests.remove(termIndex);
+
+    final LongSupplier replyMethod = () -> {
+      cacheEntry.updateResult(reply);
+      if (pending != null) {
+        pending.setReply(reply);
+      }
+      return termIndex.getIndex();
+    };
+
+    if (readIndexType == Type.REPLIED_INDEX) {
+      replyFlusher.hold(replyMethod);
+    } else {
+      replyMethod.getAsLong();
+    }
   }
 
   TransactionContext getTransactionContext(TermIndex termIndex) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index c6a9dd279..f89d354e6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -264,12 +264,13 @@ class PendingRequests {
     return pendingRequest != null ? pendingRequest.getEntry() : null;
   }
 
-  void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
+  /** @return the removed the {@link PendingRequest} for the given {@link 
TermIndex}. */
+  PendingRequest remove(TermIndex termIndex) {
     final PendingRequest pending = pendingRequests.remove(termIndex);
     if (pending != null) {
       Preconditions.assertEquals(termIndex, pending.getTermIndex(), 
"termIndex");
-      pending.setReply(reply);
     }
+    return pending;
   }
 
   /**
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 958da846d..d4c6f164e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1837,8 +1837,12 @@ class RaftServerImpl implements RaftServer.Division,
       }
 
       // update pending request
-      role.getLeaderState().ifPresent(leader -> 
leader.replyPendingRequest(termIndex, r));
-      cacheEntry.updateResult(r);
+      final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+      if (leader != null) {
+        leader.replyPendingRequest(termIndex, r, cacheEntry);
+      } else {
+        cacheEntry.updateResult(r);
+      }
     });
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java
new file mode 100644
index 000000000..47e9967c1
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+
+/**
+ * Implements the reply flush logic as part of the leader batch write when 
RepliedIndex is used.
+ */
+public class ReplyFlusher {
+  static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class);
+
+  private static final String CLASS_NAME = 
JavaUtils.getClassSimpleName(RaftServerImpl.class);
+  public static final String FLUSH = CLASS_NAME + ".flush";
+
+  static class Replies {
+    /** When a {@link LongSupplier} is invoked, it completes a write reply and 
return the log index. */
+    private LinkedList<LongSupplier> list = new LinkedList<>();
+
+    synchronized void add(LongSupplier replyMethod) {
+      list.add(replyMethod);
+    }
+
+    synchronized LinkedList<LongSupplier> getAndSetNewList() {
+      final LinkedList<LongSupplier> old = list;
+      list = new LinkedList<>();
+      return old;
+    }
+  }
+
+  private final Object id;
+  private final LifeCycle lifeCycle;
+  private final Daemon daemon;
+  private final Replies replies = new Replies();
+  private final RaftLogIndex repliedIndex;
+  /** The interval at which held write replies are flushed. */
+  private final TimeDuration batchInterval;
+
+  ReplyFlusher(Object id, long repliedIndex, TimeDuration batchInterval) {
+    this.id = id;
+    final String name = id + "-ReplyFlusher";
+    this.lifeCycle = new LifeCycle(name);
+    this.daemon = Daemon.newBuilder()
+        .setName(name)
+        .setRunnable(this::run)
+        .build();
+    this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex);
+    this.batchInterval = batchInterval;
+  }
+
+  long getRepliedIndex() {
+    return repliedIndex.get();
+  }
+
+  /** Hold a write reply for later batch flushing */
+  void hold(LongSupplier replyMethod) {
+    replies.add(replyMethod);
+  }
+
+  void start(long startIndex) {
+    repliedIndex.updateToMax(startIndex, s -> LOG.debug("{}: {}", id, s));
+    lifeCycle.transition(LifeCycle.State.STARTING);
+    // We need to transition to RUNNING first so that ReplyFlusher#run always
+    // see that the lifecycle state is in RUNNING state.
+    lifeCycle.transition(LifeCycle.State.RUNNING);
+    daemon.start();
+  }
+
+  /** The reply flusher daemon loop. */
+  private void run() {
+    try {
+      while (lifeCycle.getCurrentState() == LifeCycle.State.RUNNING) {
+        batchInterval.sleep();
+        flush();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("{}: Interrupted ", daemon.getName(), e);
+    } finally {
+      // Flush remaining on exit
+      flush();
+    }
+  }
+
+  /** Flush all held replies and advance {@link #repliedIndex}. */
+  private void flush() {
+    CodeInjectionForTesting.execute(FLUSH, id, null);
+
+    final LinkedList<LongSupplier> toFlush = replies.getAndSetNewList();
+    if (toFlush.isEmpty()) {
+      return;
+    }
+    long maxIndex = toFlush.removeLast().getAsLong();
+    for (LongSupplier held : toFlush) {
+      maxIndex = Math.max(maxIndex, held.getAsLong());
+    }
+    repliedIndex.updateToMax(maxIndex, s ->
+        LOG.debug("{}: flushed {} replies, {}", id, toFlush.size(), s));
+  }
+
+  /** Stop the reply flusher daemon. */
+  void stop() {
+    lifeCycle.checkStateAndClose();
+    daemon.interrupt();
+    try {
+      daemon.join(batchInterval.toLong(TimeUnit.MILLISECONDS )* 2);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java 
b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
index b15ae3067..dd536508c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
@@ -18,6 +18,7 @@
 package org.apache.ratis;
 
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -27,6 +28,7 @@ import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.util.Slf4jUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -45,9 +47,12 @@ import static 
org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine;
 import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT;
 import static org.apache.ratis.ReadOnlyRequestTests.QUERY;
 import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT;
+import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
 import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast;
 import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
 import static 
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
 
 /** Test for the {@link RaftServerConfigKeys.Read.Option#LINEARIZABLE} 
feature. */
 public abstract class LinearizableReadTests<CLUSTER extends MiniRaftCluster>
@@ -56,15 +61,20 @@ public abstract class LinearizableReadTests<CLUSTER extends 
MiniRaftCluster>
 
   {
     Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+    Slf4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
   public abstract boolean isLeaderLeaseEnabled();
 
-  public abstract boolean readIndexAppliedIndexEnabled();
+  public abstract Type readIndexType();
 
-  public abstract void assertRaftProperties(RaftProperties properties);
+  public final void assertRaftProperties(RaftProperties p) {
+    assertOption(LINEARIZABLE, p);
+    assertEquals(isLeaderLeaseEnabled(), 
RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
+    assertSame(readIndexType(), RaftServerConfigKeys.Read.ReadIndex.type(p));
+  }
 
-  void runWithNewCluster(CheckedConsumer<CLUSTER, Exception> testCase) throws 
Exception {
+  protected void runWithNewCluster(CheckedConsumer<CLUSTER, Exception> 
testCase) throws Exception {
     runWithNewCluster(3, 0, true, cluster -> {
       assertRaftProperties(cluster.getProperties());
       testCase.accept(cluster);
@@ -77,7 +87,11 @@ public abstract class LinearizableReadTests<CLUSTER extends 
MiniRaftCluster>
     CounterStateMachine.setProperties(p);
     RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
     RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
-    RaftServerConfigKeys.Read.ReadIndex.setAppliedIndexEnabled(p, 
readIndexAppliedIndexEnabled());
+    RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
+    // Disable dummy request since currently the request is implemented as a 
watch request
+    // which can cause follower client to trigger failover to leader which 
will cause the
+    // all reads to be sent to the leader, making the follower read moot.
+    RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, false);
   }
 
   @Test
@@ -95,22 +109,34 @@ public abstract class LinearizableReadTests<CLUSTER 
extends MiniRaftCluster>
     runWithNewCluster(LinearizableReadTests::runTestFollowerLinearizableRead);
   }
 
-  static class Reply {
+  public static class Reply {
     private final int count;
     private final CompletableFuture<RaftClientReply> future;
 
-    Reply(int count, CompletableFuture<RaftClientReply> future) {
+    public Reply(int count, CompletableFuture<RaftClientReply> future) {
       this.count = count;
       this.future = future;
     }
 
-    void assertExact() {
+    public boolean isDone() {
+      return future.isDone();
+    }
+
+    public void assertExact() {
       assertReplyExact(count, future.join());
     }
 
-    void assertAtLeast() {
+    public void assertAtLeast() {
       assertReplyAtLeast(count, future.join());
     }
+
+    @Override
+    public String toString() {
+      return "Reply{" +
+          "count=" + count +
+          ", reply=" + (isDone() ? future.join() : "pending") +
+          '}';
+    }
   }
 
   static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C 
cluster) throws Exception {
@@ -167,8 +193,9 @@ public abstract class LinearizableReadTests<CLUSTER extends 
MiniRaftCluster>
 
         count++;
         writeReplies.add(new Reply(count, 
leaderClient.async().send(WAIT_AND_INCREMENT)));
+        // sleep to let the commitIndex/appliedIndex get updated.
         Thread.sleep(100);
-
+        // WAIT_AND_INCREMENT will delay 500ms to update the count, the read 
must wait for it.
         assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
         f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, 
f1)));
       }
@@ -189,7 +216,7 @@ public abstract class LinearizableReadTests<CLUSTER extends 
MiniRaftCluster>
     final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
 
     final List<RaftServer.Division> followers = cluster.getFollowers();
-    Assertions.assertEquals(2, followers.size());
+    assertEquals(2, followers.size());
     final RaftPeerId f0 = followers.get(0).getId();
 
     try (RaftClient leaderClient = cluster.createClient(leaderId);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index a17cdb0d5..9821126ce 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -300,8 +300,10 @@ public abstract class OutputStreamBaseTest<CLUSTER extends 
MiniRaftCluster>
     Thread.sleep(500);
 
     running.set(false);
-    latch.await(5, TimeUnit.SECONDS);
+    final boolean latchCompleted = latch.await(5, TimeUnit.SECONDS);
+    Assertions.assertTrue(latchCompleted, "Writer thread did not finish within 
the timeout");
     LOG.info("Writer success? " + success.get());
+    Assertions.assertNotNull(success.get(), "Writer thread completed but 
success was not set");
     Assertions.assertTrue(success.get());
     // total number of tx should be >= result + 2, where 2 means two NoOp from
     // leaders. It may be larger than result+2 because the client may resend
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index aa77ee5c7..94e9433b1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -58,9 +58,9 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
   static final String WAIT_AND_INCREMENT_STRING = "WAIT_AND_INCREMENT";
   static final String QUERY_STRING = "QUERY";
 
-  static final Message INCREMENT = new 
RaftTestUtil.SimpleMessage(INCREMENT_STRING);
-  static final Message WAIT_AND_INCREMENT = new 
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING);
-  static final Message QUERY = new RaftTestUtil.SimpleMessage(QUERY_STRING);
+  public static final Message INCREMENT = new 
RaftTestUtil.SimpleMessage(INCREMENT_STRING);
+  public static final Message WAIT_AND_INCREMENT = new 
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING);
+  public static final Message QUERY = new 
RaftTestUtil.SimpleMessage(QUERY_STRING);
 
   @BeforeEach
   public void setup() {
@@ -144,7 +144,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
     return 
Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
   }
 
-  static void assertReplyExact(int expectedCount, RaftClientReply reply) {
+  public static void assertReplyExact(int expectedCount, RaftClientReply 
reply) {
     Assertions.assertTrue(reply.isSuccess());
     final int retrieved = retrieve(reply);
     Assertions.assertEquals(expectedCount, retrieved, () -> "reply=" + reply);
@@ -163,7 +163,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
    * 2. get
    * 3. waitAndIncrement
    */
-  static class CounterStateMachine extends BaseStateMachine {
+  public static class CounterStateMachine extends BaseStateMachine {
     static void setProperties(RaftProperties properties) {
       properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, 
CounterStateMachine.class, StateMachine.class);
     }
@@ -193,6 +193,10 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
       }
     }
 
+    public long getCount() {
+      return counter.get();
+    }
+
     private long increment() {
       return counter.incrementAndGet();
     }
@@ -213,6 +217,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
       final LogEntryProto logEntry = trx.getLogEntry();
       final TermIndex ti = TermIndex.valueOf(logEntry);
       updateLastAppliedTermIndex(ti);
+      LOG.info("{}: updateLastAppliedTermIndex {}", getId(), ti);
 
       final String command = 
logEntry.getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8);
 
@@ -224,7 +229,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
       } else {
         updatedCount = timeoutIncrement();
       }
-      LOG.info("Applied {} command {}, updatedCount={}", ti, command, 
updatedCount);
+      LOG.info("{}: Applied {} command {}, updatedCount={}", getId(), ti, 
command, updatedCount);
 
       return toMessageFuture(updatedCount);
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
index d637498d7..120cce48c 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
@@ -18,12 +18,7 @@
 package org.apache.ratis.grpc;
 
 import org.apache.ratis.LinearizableReadTests;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.RaftServerConfigKeys;
-
-import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
-import static 
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
 
 public class TestLinearizableLeaderLeaseReadWithGrpc
   extends LinearizableReadTests<MiniRaftClusterWithGrpc>
@@ -35,14 +30,7 @@ public class TestLinearizableLeaderLeaseReadWithGrpc
   }
 
   @Override
-  public boolean readIndexAppliedIndexEnabled() {
-    return false;
-  }
-
-  @Override
-  public void assertRaftProperties(RaftProperties p) {
-    assertOption(LINEARIZABLE, p);
-    assertTrue(RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
-    assertTrue(isLeaderLeaseEnabled());
+  public Type readIndexType() {
+    return Type.COMMIT_INDEX;
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
index 9bf3e307b..3705fb3ff 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java
@@ -17,11 +17,13 @@
  */
 package org.apache.ratis.grpc;
 
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
+
 public class TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc
     extends TestLinearizableLeaderLeaseReadWithGrpc {
 
   @Override
-  public boolean readIndexAppliedIndexEnabled() {
-    return true;
+  public Type readIndexType() {
+    return Type.APPLIED_INDEX;
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
index c019aac16..b119f32a6 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
@@ -17,11 +17,13 @@
  */
 package org.apache.ratis.grpc;
 
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
+
 public class TestLinearizableReadAppliedIndexWithGrpc
   extends TestLinearizableReadWithGrpc {
 
   @Override
-  public boolean readIndexAppliedIndexEnabled() {
-    return true;
+  public Type readIndexType() {
+    return Type.APPLIED_INDEX;
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java
similarity index 84%
copy from 
ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
copy to 
ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java
index c019aac16..bb50eafbf 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java
@@ -17,11 +17,11 @@
  */
 package org.apache.ratis.grpc;
 
-public class TestLinearizableReadAppliedIndexWithGrpc
-  extends TestLinearizableReadWithGrpc {
+public class TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc
+    extends TestLinearizableReadRepliedIndexWithGrpc {
 
   @Override
-  public boolean readIndexAppliedIndexEnabled() {
+  public boolean isLeaderLeaseEnabled() {
     return true;
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java
new file mode 100644
index 000000000..f08346fc0
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReplyFlusher;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT;
+import static org.apache.ratis.ReadOnlyRequestTests.QUERY;
+import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestLinearizableReadRepliedIndexWithGrpc
+  extends TestLinearizableReadWithGrpc {
+
+  @Override
+  public Type readIndexType() {
+    return Type.REPLIED_INDEX;
+  }
+
+  @Test
+  @Override
+  public void testFollowerLinearizableReadParallel() throws Exception {
+    
runWithNewCluster(TestLinearizableReadRepliedIndexWithGrpc::runTestFollowerReadOnlyParallelRepliedIndex);
+  }
+
+  static <C extends MiniRaftCluster> void 
runTestFollowerReadOnlyParallelRepliedIndex(C cluster)
+      throws Exception {
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final CounterStateMachine leaderStateMachine = 
(CounterStateMachine)leader.getStateMachine();
+
+    final List<RaftServer.Division> followers = cluster.getFollowers();
+    Assertions.assertEquals(2, followers.size());
+    final RaftPeerId f0 = followers.get(0).getId();
+    final RaftPeerId f1 = followers.get(1).getId();
+
+    final BlockingCode blockingReplyFlusher = new BlockingCode();
+
+    try (RaftClient leaderClient = cluster.createClient(leader.getId());
+         RaftClient f0Client = cluster.createClient(f0);
+         RaftClient f1Client = cluster.createClient(f1)) {
+      // Warm up the clients first before blocking the reply flusher
+      assertReplyExact(0, leaderClient.async().sendReadOnly(QUERY).get());
+      assertReplyExact(0, f0Client.async().sendReadOnly(QUERY, f0).get());
+      assertReplyExact(0, f1Client.async().sendReadOnly(QUERY, f1).get());
+
+      CodeInjectionForTesting.put(ReplyFlusher.FLUSH, blockingReplyFlusher);
+
+      final int n = 10;
+      final List<Reply> writeReplies = new ArrayList<>(n);
+      final List<Reply> f0Replies = new ArrayList<>(n);
+      final List<Reply> f1Replies = new ArrayList<>(n);
+      for (int i = 0; i < n; i++) {
+        final int count = i + 1;
+        writeReplies.add(new Reply(count, 
leaderClient.async().send(INCREMENT)));
+
+        // Read reply returns immediately, but they all should return 0 since 
the repliedIndex has not been updated
+        // and write operations should not been applied by the followers
+        f0Replies.add(new Reply(0, f0Client.async().sendReadOnly(QUERY, f0)));
+        f1Replies.add(new Reply(0, f1Client.async().sendReadOnly(QUERY, f1)));
+
+        // sleep in order to make sure
+        // (1) the count is incremented, and
+        // (2) the reads will wait for the repliedIndex.
+        Thread.sleep(100);
+        assertEquals(count, leaderStateMachine.getCount());
+      }
+
+      for (int i = 0; i < n; i++) {
+        // Write reply should not yet complete since ReplyFlusher remains 
blocked.
+        assertFalse(writeReplies.get(i).isDone(), "Received unexpected Write 
reply " + writeReplies.get(i));
+
+        // Follower reads should be immediately served, but the read value 
should return the value before the
+        // replyFlusher is blocked
+        assertTrue(f0Replies.get(i).isDone(), "Follower read should return 
immediately");
+        f0Replies.get(i).assertExact();
+        assertTrue(f1Replies.get(i).isDone(), "Follower read should return 
immediately");
+        f1Replies.get(i).assertExact();
+      }
+
+      // unblock ReplyFlusher
+      blockingReplyFlusher.complete();
+      assertReplyExact(n, f0Client.io().sendReadOnly(QUERY, f0));
+      assertReplyExact(n, f1Client.io().sendReadOnly(QUERY, f0));
+
+      for (int i = 0; i < n; i++) {
+        //write reply should get the exact count at the write time
+        writeReplies.get(i).assertExact();
+      }
+    }
+  }
+
+  static class BlockingCode implements CodeInjectionForTesting.Code {
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    void complete() {
+      future.complete(null);
+    }
+
+    @Override
+    public boolean execute(Object localId, Object remoteId, Object... args) {
+      final boolean blocked = !future.isDone();
+      if (blocked) {
+        LOG.info("{}: ReplyFlusher is blocked", localId, new Throwable());
+      }
+      future.join();
+      if (blocked) {
+        LOG.info("{}: ReplyFlusher is unblocked", localId);
+      }
+      return true;
+    }
+  }
+
+
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
index 3e8860dd1..77593ff85 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
@@ -18,12 +18,7 @@
 package org.apache.ratis.grpc;
 
 import org.apache.ratis.LinearizableReadTests;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.RaftServerConfigKeys;
-
-import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
-import static 
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
 
 public class TestLinearizableReadWithGrpc
   extends LinearizableReadTests<MiniRaftClusterWithGrpc>
@@ -35,14 +30,7 @@ public class TestLinearizableReadWithGrpc
   }
 
   @Override
-  public boolean readIndexAppliedIndexEnabled() {
-    return false;
-  }
-
-  @Override
-  public void assertRaftProperties(RaftProperties p) {
-    assertOption(LINEARIZABLE, p);
-    assertFalse(RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
-    assertFalse(isLeaderLeaseEnabled());
+  public Type readIndexType() {
+    return Type.COMMIT_INDEX;
   }
 }


Reply via email to