This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9052dadbb RATIS-2353. Refactor ReadOnlyRequestTests. (#1308)
9052dadbb is described below

commit 9052dadbb1c384155414eed171b541b83dae5c5b
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Nov 11 07:41:34 2025 -0800

    RATIS-2353. Refactor ReadOnlyRequestTests. (#1308)
---
 .../org/apache/ratis/LinearizableReadTests.java    | 253 ++++++++++++++++
 .../org/apache/ratis/ReadOnlyRequestTests.java     | 321 ++++++---------------
 .../apache/ratis/server/impl/MiniRaftCluster.java  |   3 +
 .../TestLinearizableLeaderLeaseReadWithGrpc.java   |  43 +++
 .../ratis/grpc/TestLinearizableReadWithGrpc.java   |  43 +++
 5 files changed, 425 insertions(+), 238 deletions(-)

diff --git 
a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java 
b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
new file mode 100644
index 000000000..49176b18a
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.retry.ExceptionDependentRetry;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.util.Slf4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.event.Level;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.ratis.ReadOnlyRequestTests.CounterStateMachine;
+import static org.apache.ratis.ReadOnlyRequestTests.INCREMENT;
+import static org.apache.ratis.ReadOnlyRequestTests.QUERY;
+import static org.apache.ratis.ReadOnlyRequestTests.WAIT_AND_INCREMENT;
+import static org.apache.ratis.ReadOnlyRequestTests.assertReplyAtLeast;
+import static org.apache.ratis.ReadOnlyRequestTests.assertReplyExact;
+import static org.apache.ratis.ReadOnlyRequestTests.retrieve;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
+
+/** Test for the {@link RaftServerConfigKeys.Read.Option#LINEARIZABLE} 
feature. */
+public abstract class LinearizableReadTests<CLUSTER extends MiniRaftCluster>
+  extends BaseTest
+  implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  {
+    Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+  }
+
+  public abstract boolean isLeaderLeaseEnabled();
+
+  public abstract void assertRaftProperties(RaftProperties properties);
+
+  void runWithNewCluster(CheckedConsumer<CLUSTER, Exception> testCase) throws 
Exception {
+    runWithNewCluster(3, 0, true, cluster -> {
+      assertRaftProperties(cluster.getProperties());
+      testCase.accept(cluster);
+    });
+  }
+
+  @BeforeEach
+  public void setup() {
+    final RaftProperties p = getProperties();
+    CounterStateMachine.setProperties(p);
+    RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
+    RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
+  }
+
+  @Test
+  public void testLinearizableRead() throws Exception {
+    runWithNewCluster(ReadOnlyRequestTests::runTestReadOnly);
+  }
+
+  @Test
+  public void testLinearizableReadTimeout() throws Exception {
+    runWithNewCluster(cluster -> 
ReadOnlyRequestTests.runTestReadTimeout(ReadIndexException.class, cluster));
+  }
+
+  @Test
+  public void testFollowerLinearizableRead() throws Exception {
+    runWithNewCluster(LinearizableReadTests::runTestFollowerLinearizableRead);
+  }
+
+  static class Reply {
+    private final int count;
+    private final CompletableFuture<RaftClientReply> future;
+
+    Reply(int count, CompletableFuture<RaftClientReply> future) {
+      this.count = count;
+      this.future = future;
+    }
+
+    void assertExact() {
+      assertReplyExact(count, future.join());
+    }
+
+    void assertAtLeast() {
+      assertReplyAtLeast(count, future.join());
+    }
+  }
+
+  static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C 
cluster) throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+
+    final List<RaftServer.Division> followers = cluster.getFollowers();
+    Assertions.assertEquals(2, followers.size());
+
+    final RaftPeerId f0 = followers.get(0).getId();
+    final RaftPeerId f1 = followers.get(1).getId();
+
+    final int n = 100;
+    final List<Reply> f0Replies = new ArrayList<>(n);
+    final List<Reply> f1Replies = new ArrayList<>(n);
+    try (RaftClient client = cluster.createClient(leaderId)) {
+      for (int i = 0; i < n; i++) {
+        final int count = i + 1;
+        assertReplyExact(count, client.io().send(INCREMENT));
+
+        f0Replies.add(new Reply(count, client.async().sendReadOnly(QUERY, 
f0)));
+        f1Replies.add(new Reply(count, client.async().sendReadOnly(QUERY, 
f1)));
+      }
+
+      for (int i = 0; i < n; i++) {
+        f0Replies.get(i).assertAtLeast();
+        f1Replies.get(i).assertAtLeast();
+      }
+    }
+  }
+
+  @Test
+  public void testFollowerLinearizableReadParallel() throws Exception {
+    runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel);
+  }
+
+  static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(C 
cluster) throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+
+    final List<RaftServer.Division> followers = cluster.getFollowers();
+    Assertions.assertEquals(2, followers.size());
+    final RaftPeerId f0 = followers.get(0).getId();
+    final RaftPeerId f1 = followers.get(1).getId();
+
+    try (RaftClient leaderClient = cluster.createClient(leaderId);
+         RaftClient f0Client = cluster.createClient(f0);
+         RaftClient f1Client = cluster.createClient(f1)) {
+
+      final int n = 10;
+      final List<Reply> writeReplies = new ArrayList<>(n);
+      final List<Reply> f1Replies = new ArrayList<>(n);
+      for (int i = 0; i < n; i++) {
+        int count = 2*i + 1;
+        assertReplyExact(count, leaderClient.io().send(INCREMENT));
+
+        count++;
+        writeReplies.add(new Reply(count, 
leaderClient.async().send(WAIT_AND_INCREMENT)));
+        Thread.sleep(100);
+
+        assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
+        f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, 
f1)));
+      }
+
+      for (int i = 0; i < n; i++) {
+        writeReplies.get(i).assertExact();
+        f1Replies.get(i).assertAtLeast();
+      }
+    }
+  }
+
+  @Test
+  public void testLinearizableReadFailWhenLeaderDown() throws Exception {
+    
runWithNewCluster(LinearizableReadTests::runTestLinearizableReadFailWhenLeaderDown);
+  }
+
+  static <C extends MiniRaftCluster> void 
runTestLinearizableReadFailWhenLeaderDown(C cluster) throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+
+    final List<RaftServer.Division> followers = cluster.getFollowers();
+    Assertions.assertEquals(2, followers.size());
+    final RaftPeerId f0 = followers.get(0).getId();
+
+    try (RaftClient leaderClient = cluster.createClient(leaderId);
+         RaftClient f0Client = cluster.createClient(f0, 
RetryPolicies.noRetry())) {
+      assertReplyExact(1, leaderClient.io().send(INCREMENT));
+      assertReplyExact(1, f0Client.io().sendReadOnly(QUERY));
+
+      // kill the leader
+      // read timeout quicker than election timeout
+      final RaftClientReply reply = 
leaderClient.admin().transferLeadership(null, 200);
+      Assertions.assertTrue(reply.isSuccess());
+
+      // client should fail and won't retry
+      Assertions.assertThrows(ReadIndexException.class, () -> 
f0Client.io().sendReadOnly(QUERY, f0));
+    }
+  }
+
+  @Test
+  public void testFollowerReadOnlyRetryWhenLeaderDown() throws Exception {
+    // only retry on ReadIndexException
+    final RetryPolicy retryPolicy = ExceptionDependentRetry
+        .newBuilder()
+        .setDefaultPolicy(RetryPolicies.noRetry())
+        .setExceptionToPolicy(ReadIndexException.class,
+            RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS)))
+        .build();
+
+    runWithNewCluster(cluster -> 
ReadOnlyRequestTests.runTestReadOnlyRetryWhenLeaderDown(retryPolicy, cluster));
+  }
+
+
+  @Test
+  public void testReadAfterWrite() throws Exception {
+    runWithNewCluster(LinearizableReadTests::runTestReadAfterWrite);
+  }
+
+  static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) 
throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+
+    try (RaftClient client = cluster.createClient(leaderId)) {
+      // test blocking read-after-write
+      assertReplyExact(1, client.io().send(INCREMENT));
+      assertReplyExact(1, client.io().sendReadAfterWrite(QUERY));
+
+      // test asynchronous read-after-write
+      client.async().send(INCREMENT);
+      final CompletableFuture<RaftClientReply> asyncReply = 
client.async().sendReadAfterWrite(QUERY);
+
+      for (int i = 0; i < 20; i++) {
+        client.async().send(INCREMENT);
+      }
+
+      // read-after-write is more consistent than linearizable read
+      final CompletableFuture<RaftClientReply> linearizable = 
client.async().sendReadOnly(QUERY);
+      final CompletableFuture<RaftClientReply> readAfterWrite = 
client.async().sendReadAfterWrite(QUERY);
+      final int r = retrieve(readAfterWrite.get());
+      final int l = retrieve(linearizable.get());
+      Assertions.assertTrue(r >= l, () -> "readAfterWrite = " + r + " < 
linearizable = " + l);
+
+      assertReplyAtLeast(2, asyncReply.join());
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 17fd28cbe..aa77ee5c7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -19,31 +19,29 @@ package org.apache.ratis;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
 import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.ReadIndexException;
-import org.apache.ratis.retry.ExceptionDependentRetry;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.Slf4jUtils;
-import org.apache.ratis.util.TimeDuration;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.event.Level;
 
 import java.nio.charset.StandardCharsets;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
@@ -56,270 +54,109 @@ public abstract class ReadOnlyRequestTests<CLUSTER 
extends MiniRaftCluster>
 
   static final int NUM_SERVERS = 3;
 
-  static final String INCREMENT = "INCREMENT";
-  static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
-  static final String QUERY = "QUERY";
-  final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
-  final Message waitAndIncrementMessage = new 
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
-  final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
+  static final String INCREMENT_STRING = "INCREMENT";
+  static final String WAIT_AND_INCREMENT_STRING = "WAIT_AND_INCREMENT";
+  static final String QUERY_STRING = "QUERY";
+
+  static final Message INCREMENT = new 
RaftTestUtil.SimpleMessage(INCREMENT_STRING);
+  static final Message WAIT_AND_INCREMENT = new 
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT_STRING);
+  static final Message QUERY = new RaftTestUtil.SimpleMessage(QUERY_STRING);
 
   @BeforeEach
   public void setup() {
     final RaftProperties p = getProperties();
-    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        CounterStateMachine.class, StateMachine.class);
+    CounterStateMachine.setProperties(p);
   }
 
-  @Test
-  public void testLinearizableRead() throws Exception {
-    getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
-    runWithNewCluster(NUM_SERVERS, this::testReadOnlyImpl);
+  public static void assertOption(RaftServerConfigKeys.Read.Option expected, 
RaftProperties properties) {
+    final RaftServerConfigKeys.Read.Option computed = 
RaftServerConfigKeys.Read.option(properties);
+    Assertions.assertEquals(expected, computed);
   }
 
   @Test
-  public void testLeaseRead() throws Exception {
-    
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, 
true);
-    runWithNewCluster(NUM_SERVERS, this::testReadOnlyImpl);
+  public void testReadOnly() throws Exception {
+    assertOption(RaftServerConfigKeys.Read.Option.DEFAULT,  getProperties());
+    runWithNewCluster(NUM_SERVERS, ReadOnlyRequestTests::runTestReadOnly);
   }
 
-  private void testReadOnlyImpl(CLUSTER cluster) throws Exception {
+  static <C extends MiniRaftCluster> void runTestReadOnly(C cluster) throws 
Exception {
     try {
       RaftTestUtil.waitForLeader(cluster);
       final RaftPeerId leaderId = cluster.getLeader().getId();
 
       try (final RaftClient client = cluster.createClient(leaderId)) {
         for (int i = 1; i <= 10; i++) {
-          RaftClientReply reply = client.io().send(incrementMessage);
-          Assertions.assertTrue(reply.isSuccess());
-          reply = client.io().sendReadOnly(queryMessage);
-          Assertions.assertEquals(i, retrieve(reply));
-        }
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testLinearizableReadTimeout() throws Exception {
-    getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
-    runWithNewCluster(NUM_SERVERS, this::testReadOnlyTimeoutImpl);
-  }
-
-  @Test
-  public void testLeaseReadTimeout() throws Exception {
-    
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, 
true);
-    runWithNewCluster(NUM_SERVERS, this::testReadOnlyTimeoutImpl);
-  }
-
-  private void testReadOnlyTimeoutImpl(CLUSTER cluster) throws Exception {
-    try {
-      RaftTestUtil.waitForLeader(cluster);
-      final RaftPeerId leaderId = cluster.getLeader().getId();
-
-      try (final RaftClient client = cluster.createClient(leaderId);
-           final RaftClient noRetry = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
-
-        CompletableFuture<RaftClientReply> result = 
client.async().send(incrementMessage);
-        client.admin().transferLeadership(null, 200);
-
-        Assertions.assertThrows(ReadIndexException.class, () -> {
-          RaftClientReply timeoutReply = 
noRetry.io().sendReadOnly(queryMessage);
-          Assertions.assertNotNull(timeoutReply.getException());
-          Assertions.assertTrue(timeoutReply.getException() instanceof 
ReadException);
-        });
-      }
-
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testFollowerLinearizableRead() throws Exception {
-    getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
-    runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyImpl);
-  }
-
-  @Test
-  public void testFollowerLeaseRead() throws Exception {
-    
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, 
true);
-    runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyImpl);
-  }
-
-  private void testFollowerReadOnlyImpl(CLUSTER cluster) throws Exception {
-    try {
-      RaftTestUtil.waitForLeader(cluster);
-
-      List<RaftServer.Division> followers = cluster.getFollowers();
-      Assertions.assertEquals(2, followers.size());
-
-      final RaftPeerId f0 = followers.get(0).getId();
-      final RaftPeerId f1 = followers.get(1).getId();
-      try (RaftClient client = 
cluster.createClient(cluster.getLeader().getId())) {
-        for (int i = 1; i <= 10; i++) {
-          final RaftClientReply reply = client.io().send(incrementMessage);
-          Assertions.assertTrue(reply.isSuccess());
-          final RaftClientReply read1 = client.io().sendReadOnly(queryMessage, 
f0);
-          Assertions.assertEquals(i, retrieve(read1));
-          final CompletableFuture<RaftClientReply> read2 = 
client.async().sendReadOnly(queryMessage, f1);
-          Assertions.assertEquals(i, retrieve(read2.get(1, TimeUnit.SECONDS)));
+          assertReplyExact(i, client.io().send(INCREMENT));
+          assertReplyExact(i, client.io().sendReadOnly(QUERY));
         }
       }
     } finally {
       cluster.shutdown();
     }
   }
-
-  @Test
-  public void testFollowerLinearizableReadParallel() throws Exception {
-    getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
-    runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyParallelImpl);
-  }
-
-  @Test
-  public void testFollowerLeaseReadParallel() throws Exception {
-    
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, 
true);
-    runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyParallelImpl);
-  }
-
-  private void testFollowerReadOnlyParallelImpl(CLUSTER cluster) throws 
Exception {
-    try {
-      RaftTestUtil.waitForLeader(cluster);
-
-      List<RaftServer.Division> followers = cluster.getFollowers();
-      Assertions.assertEquals(2, followers.size());
-
-      try (RaftClient leaderClient = 
cluster.createClient(cluster.getLeader().getId());
-           RaftClient followerClient1 = 
cluster.createClient(followers.get(0).getId())) {
-
-        leaderClient.io().send(incrementMessage);
-        leaderClient.async().send(waitAndIncrementMessage);
-        Thread.sleep(100);
-
-        RaftClientReply clientReply = 
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
-        Assertions.assertEquals(2, retrieve(clientReply));
-      }
-
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testFollowerLinearizableReadFailWhenLeaderDown() throws 
Exception {
-    getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
-    runWithNewCluster(NUM_SERVERS, 
this::testFollowerReadOnlyFailWhenLeaderDownImpl);
-  }
-
   @Test
-  public void testFollowerLeaseReadWhenLeaderDown() throws Exception {
-    
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, 
true);
-    runWithNewCluster(NUM_SERVERS, 
this::testFollowerReadOnlyFailWhenLeaderDownImpl);
+  public void testReadTimeout() throws Exception {
+    runWithNewCluster(NUM_SERVERS, cluster -> 
runTestReadTimeout(RaftRetryFailureException.class, cluster));
   }
 
-  private void testFollowerReadOnlyFailWhenLeaderDownImpl(CLUSTER cluster) 
throws Exception {
-    try {
-      RaftTestUtil.waitForLeader(cluster);
+  static <C extends MiniRaftCluster> void runTestReadTimeout(Class<? extends 
Throwable> exceptionClass, C cluster)
+      throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
 
-      List<RaftServer.Division> followers = cluster.getFollowers();
-      Assertions.assertEquals(2, followers.size());
+    try (final RaftClient client = cluster.createClient(leaderId);
+         final RaftClient noRetry = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
 
-      try (RaftClient leaderClient = 
cluster.createClient(cluster.getLeader().getId());
-           RaftClient followerClient1 = 
cluster.createClient(followers.get(0).getId(), RetryPolicies.noRetry())) {
-         leaderClient.io().send(incrementMessage);
-
-         RaftClientReply clientReply = 
followerClient1.io().sendReadOnly(queryMessage);
-         Assertions.assertEquals(1, retrieve(clientReply));
-
-         // kill the leader
-         // read timeout quicker than election timeout
-         leaderClient.admin().transferLeadership(null, 200);
-
-         Assertions.assertThrows(ReadIndexException.class, () -> {
-           followerClient1.io().sendReadOnly(queryMessage, 
followers.get(0).getId());
-         });
-      }
+      assertReplyExact(1, client.io().send(INCREMENT));
+      client.admin().transferLeadership(null, 200);
 
-    } finally {
-      cluster.shutdown();
+      Assertions.assertThrows(exceptionClass, () -> {
+        final RaftClientReply timeoutReply = noRetry.io().sendReadOnly(QUERY);
+        Assertions.assertFalse(timeoutReply.isSuccess());
+        Assertions.assertNotNull(timeoutReply.getException());
+        Assertions.assertInstanceOf(ReadException.class, 
timeoutReply.getException());
+      });
     }
   }
 
   @Test
-  public void testFollowerReadOnlyRetryWhenLeaderDown() throws Exception {
-    getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
-    runWithNewCluster(NUM_SERVERS, 
this::testFollowerReadOnlyRetryWhenLeaderDown);
-  }
-
-  @Test
-  public void testFollowerLeaseReadRetryWhenLeaderDown() throws Exception {
-    
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, 
true);
-    runWithNewCluster(NUM_SERVERS, 
this::testFollowerReadOnlyRetryWhenLeaderDown);
+  public void testReadOnlyRetryWhenLeaderDown() throws Exception {
+    runWithNewCluster(NUM_SERVERS, cluster -> 
runTestReadOnlyRetryWhenLeaderDown(null, cluster));
   }
 
-  private void testFollowerReadOnlyRetryWhenLeaderDown(CLUSTER cluster) throws 
Exception {
-    // only retry on readIndexException
-    final RetryPolicy retryPolicy = ExceptionDependentRetry
-        .newBuilder()
-        .setDefaultPolicy(RetryPolicies.noRetry())
-        .setExceptionToPolicy(ReadIndexException.class,
-            RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(500, 
TimeUnit.MILLISECONDS)))
-        .build();
+  static <C extends MiniRaftCluster> void 
runTestReadOnlyRetryWhenLeaderDown(RetryPolicy retryPolicy, C cluster)
+      throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
 
-    RaftTestUtil.waitForLeader(cluster);
-
-    try (RaftClient client = cluster.createClient(cluster.getLeader().getId(), 
retryPolicy)) {
-      client.io().send(incrementMessage);
-
-      final RaftClientReply clientReply = 
client.io().sendReadOnly(queryMessage);
-      Assertions.assertEquals(1, retrieve(clientReply));
+    try (RaftClient client = cluster.createClient(leaderId, retryPolicy)) {
+      assertReplyExact(1, client.io().send(INCREMENT));
+      assertReplyExact(1, client.io().sendReadOnly(QUERY));
 
       // kill the leader
       client.admin().transferLeadership(null, 200);
 
       // readOnly will success after re-election
-      final RaftClientReply replySuccess = 
client.io().sendReadOnly(queryMessage);
-      Assertions.assertEquals(1, retrieve(clientReply));
+      assertReplyExact(1, client.io().sendReadOnly(QUERY));
     }
   }
 
-  @Test
-  public void testReadAfterWrite() throws Exception {
-    runWithNewCluster(NUM_SERVERS, this::testReadAfterWriteImpl);
+  static int retrieve(RaftClientReply reply) {
+    Assertions.assertTrue(reply.isSuccess());
+    return 
Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
   }
 
-  private void testReadAfterWriteImpl(CLUSTER cluster) throws Exception {
-    RaftTestUtil.waitForLeader(cluster);
-    try (RaftClient client = cluster.createClient()) {
-      // test blocking read-after-write
-      client.io().send(incrementMessage);
-      final RaftClientReply blockReply = 
client.io().sendReadAfterWrite(queryMessage);
-      Assertions.assertEquals(1, retrieve(blockReply));
-
-      // test asynchronous read-after-write
-      client.async().send(incrementMessage);
-      client.async().sendReadAfterWrite(queryMessage).thenAccept(reply -> {
-        Assertions.assertEquals(2, retrieve(reply));
-      });
-
-      for (int i = 0; i < 20; i++) {
-        client.async().send(incrementMessage);
-      }
-      final CompletableFuture<RaftClientReply> linearizable = 
client.async().sendReadOnly(queryMessage);
-      final CompletableFuture<RaftClientReply> readAfterWrite = 
client.async().sendReadAfterWrite(queryMessage);
-
-      CompletableFuture.allOf(linearizable, readAfterWrite).get();
-      // read-after-write is more consistent than linearizable read
-      Assertions.assertTrue(retrieve(readAfterWrite.get()) >= 
retrieve(linearizable.get()));
-    }
+  static void assertReplyExact(int expectedCount, RaftClientReply reply) {
+    Assertions.assertTrue(reply.isSuccess());
+    final int retrieved = retrieve(reply);
+    Assertions.assertEquals(expectedCount, retrieved, () -> "reply=" + reply);
   }
 
-  static int retrieve(RaftClientReply reply) {
-    return 
Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
+  static void assertReplyAtLeast(int minCount, RaftClientReply reply) {
+    Assertions.assertTrue(reply.isSuccess());
+    final int retrieved = retrieve(reply);
+    Assertions.assertTrue(retrieved >= minCount,
+        () -> "retrieved = " + retrieved + " < minCount = " + minCount + ", 
reply=" + reply);
   }
 
-
   /**
    * CounterStateMachine support 3 operations
    * 1. increment
@@ -327,12 +164,19 @@ public abstract class ReadOnlyRequestTests<CLUSTER 
extends MiniRaftCluster>
    * 3. waitAndIncrement
    */
   static class CounterStateMachine extends BaseStateMachine {
+    static void setProperties(RaftProperties properties) {
+      properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, 
CounterStateMachine.class, StateMachine.class);
+    }
+
     private final AtomicLong counter = new AtomicLong(0L);
 
     @Override
     public CompletableFuture<Message> query(Message request) {
-      return CompletableFuture.completedFuture(
-          Message.valueOf(String.valueOf(counter.get())));
+      return toMessageFuture(counter.get());
+    }
+
+    static CompletableFuture<Message> toMessageFuture(long count) {
+      return 
CompletableFuture.completedFuture(Message.valueOf(String.valueOf(count)));
     }
 
     @Override
@@ -349,39 +193,40 @@ public abstract class ReadOnlyRequestTests<CLUSTER 
extends MiniRaftCluster>
       }
     }
 
-    private void increment() {
-      counter.incrementAndGet();
+    private long increment() {
+      return counter.incrementAndGet();
     }
 
-    private void waitAndIncrement() {
+    private long waitAndIncrement() {
       sleepQuietly(500);
-      increment();
+      return increment();
     }
 
-    private void timeoutIncrement() {
+    private long timeoutIncrement() {
       sleepQuietly(5000);
-      increment();
+      return increment();
     }
 
 
     @Override
     public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
-      LOG.debug("apply trx with index=" + trx.getLogEntry().getIndex());
-      updateLastAppliedTermIndex(trx.getLogEntry().getTerm(), 
trx.getLogEntry().getIndex());
+      final LogEntryProto logEntry = trx.getLogEntry();
+      final TermIndex ti = TermIndex.valueOf(logEntry);
+      updateLastAppliedTermIndex(ti);
 
-      String command = trx.getLogEntry().getStateMachineLogEntry()
-          .getLogData().toString(StandardCharsets.UTF_8);
+      final String command = 
logEntry.getStateMachineLogEntry().getLogData().toString(StandardCharsets.UTF_8);
 
-      LOG.info("receive command: {}", command);
-      if (command.equals(INCREMENT)) {
-        increment();
-      } else if (command.equals(WAIT_AND_INCREMENT)) {
-        waitAndIncrement();
+      final long updatedCount;
+      if (command.equals(INCREMENT_STRING)) {
+        updatedCount = increment();
+      } else if (command.equals(WAIT_AND_INCREMENT_STRING)) {
+        updatedCount = waitAndIncrement();
       } else {
-        timeoutIncrement();
+        updatedCount = timeoutIncrement();
       }
+      LOG.info("Applied {} command {}, updatedCount={}", ti, command, 
updatedCount);
 
-      return CompletableFuture.completedFuture(Message.valueOf("OK"));
+      return toMessageFuture(updatedCount);
     }
   }
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 253b2cdeb..87ffa8d19 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -727,6 +727,9 @@ public abstract class MiniRaftCluster implements Closeable {
   }
 
   public RaftClient createClient(RaftPeerId leaderId, RetryPolicy retryPolicy) 
{
+    if (retryPolicy == null) {
+      retryPolicy = getDefaultRetryPolicy();
+    }
     return createClient(leaderId, group, retryPolicy);
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
new file mode 100644
index 000000000..e45d8f4ff
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.LinearizableReadTests;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+
+import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestLinearizableLeaderLeaseReadWithGrpc
+  extends LinearizableReadTests<MiniRaftClusterWithGrpc>
+  implements MiniRaftClusterWithGrpc.FactoryGet {
+
+  @Override
+  public boolean isLeaderLeaseEnabled() {
+    return true;
+  }
+
+  @Override
+  public void assertRaftProperties(RaftProperties p) {
+    assertOption(LINEARIZABLE, p);
+    assertTrue(RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
+    assertTrue(isLeaderLeaseEnabled());
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
new file mode 100644
index 000000000..a434fe000
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.LinearizableReadTests;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+
+import static org.apache.ratis.ReadOnlyRequestTests.assertOption;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestLinearizableReadWithGrpc
+  extends LinearizableReadTests<MiniRaftClusterWithGrpc>
+  implements MiniRaftClusterWithGrpc.FactoryGet {
+
+  @Override
+  public boolean isLeaderLeaseEnabled() {
+    return false;
+  }
+
+  @Override
+  public void assertRaftProperties(RaftProperties p) {
+    assertOption(LINEARIZABLE, p);
+    assertFalse(RaftServerConfigKeys.Read.leaderLeaseEnabled(p));
+    assertFalse(isLeaderLeaseEnabled());
+  }
+}


Reply via email to