This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9df2c8933 RATIS-1715. Support linearizable read in AsyncApi. (#754)
9df2c8933 is described below

commit 9df2c89335cc94858e9e5395e2ac955b631cb36e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Oct 7 17:58:49 2022 +0800

    RATIS-1715. Support linearizable read in AsyncApi. (#754)
---
 .../java/org/apache/ratis/client/api/AsyncApi.java |  8 +-
 .../org/apache/ratis/client/api/BlockingApi.java   | 14 ++--
 .../org/apache/ratis/client/impl/AsyncImpl.java    |  4 +-
 .../org/apache/ratis/client/impl/BlockingImpl.java |  5 --
 .../org/apache/ratis/client/impl/OrderedAsync.java |  2 +-
 .../org/apache/ratis/ReadOnlyRequestTests.java     | 72 +++-------------
 .../ratis/ReadOnlyRequestWithLongTimeoutTests.java | 95 ++++++++++++++++++++++
 ...hGrpc.java => TestReadOnlyRequestWithGrpc.java} |  2 +-
 ...estReadOnlyRequestWithLongTimeoutWithGrpc.java} |  6 +-
 9 files changed, 125 insertions(+), 83 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 1e8da9bde..7041bd2b6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -40,13 +40,19 @@ public interface AsyncApi {
    */
   CompletableFuture<RaftClientReply> send(Message message);
 
+  /** The same as sendReadOnly(message, null). */
+  default CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
+    return sendReadOnly(message, null);
+  }
+
   /**
    * Send the given readonly message asynchronously to the raft service.
    *
    * @param message The request message.
+   * @param server The target server.  When server == null, send the message 
to the leader.
    * @return a future of the reply.
    */
-  CompletableFuture<RaftClientReply> sendReadOnly(Message message);
+  CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeerId 
server);
 
   /**
    * Send the given stale-read message asynchronously to the given server (not 
the raft service).
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index c238fa4c9..7493929b0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -40,18 +40,16 @@ public interface BlockingApi {
    */
   RaftClientReply send(Message message) throws IOException;
 
+  /** The same as sendReadOnly(message, null). */
+  default RaftClientReply sendReadOnly(Message message) throws IOException {
+    return sendReadOnly(message, null);
+  }
+
   /**
    * Send the given readonly message to the raft service.
    *
    * @param message The request message.
-   * @return the reply.
-   */
-  RaftClientReply sendReadOnly(Message message) throws IOException;
-
-  /**
-   * Send the given readonly message to the given server (not the raft service)
-   * @param message The request message.
-   * @param server The target server
+   * @param server The target server.  When server == null, send the message 
to the leader.
    * @return the reply.
    */
   RaftClientReply sendReadOnly(Message message, RaftPeerId server) throws 
IOException;
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index e063eebe2..abaca2b0f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -47,8 +47,8 @@ class AsyncImpl implements AsyncRpcApi {
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
-    return send(RaftClientRequest.readRequestType(), message, null);
+  public CompletableFuture<RaftClientReply> sendReadOnly(Message message, 
RaftPeerId server) {
+    return send(RaftClientRequest.readRequestType(), message, server);
   }
 
   @Override
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 0c5458168..ea06cdca8 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -59,11 +59,6 @@ class BlockingImpl implements BlockingApi {
     return send(RaftClientRequest.writeRequestType(), message, null);
   }
 
-  @Override
-  public RaftClientReply sendReadOnly(Message message) throws IOException {
-    return send(RaftClientRequest.readRequestType(), message, null);
-  }
-
   @Override
   public RaftClientReply sendReadOnly(Message message, RaftPeerId server) 
throws IOException {
     return send(RaftClientRequest.readRequestType(), message, server);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 225156620..5405d4906 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -136,7 +136,7 @@ public final class OrderedAsync {
   }
 
   private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> 
getSlidingWindow(RaftClientRequest request) {
-    return getSlidingWindow(request.is(TypeCase.STALEREAD) ? 
request.getServerId() : null);
+    return getSlidingWindow(request.isToLeader()? null: request.getServerId());
   }
 
   private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> 
getSlidingWindow(RaftPeerId target) {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 61d43d85b..11a34bf52 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -19,7 +19,6 @@ package org.apache.ratis;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -33,17 +32,13 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.Log4jUtils;
-import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,11 +54,9 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
 
   static final String INCREMENT = "INCREMENT";
   static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
-  static final String TIMEOUT_INCREMENT = "TIMEOUT_INCREMENT";
   static final String QUERY = "QUERY";
   final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
   final Message waitAndIncrementMessage = new 
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
-  final Message timeoutIncrement = new 
RaftTestUtil.SimpleMessage(TIMEOUT_INCREMENT);
   final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
 
   @Before
@@ -73,18 +66,6 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
         CounterStateMachine.class, StateMachine.class);
 
     p.setEnum(RaftServerConfigKeys.Read.OPTION_KEY, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
-    p.setTimeDuration(RaftServerConfigKeys.Read.TIMEOUT_KEY, 
TimeDuration.valueOf(1, TimeUnit.SECONDS));
-    p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MIN_KEY,
-        TimeDuration.valueOf(150, TimeUnit.MILLISECONDS));
-    p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MAX_KEY,
-        TimeDuration.valueOf(300, TimeUnit.MILLISECONDS));
-    p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MIN_KEY, 
TimeDuration.valueOf(3, TimeUnit.SECONDS));
-    p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MAX_KEY, 
TimeDuration.valueOf(6, TimeUnit.SECONDS));
-    p.setTimeDuration(RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
-        TimeDuration.valueOf(10, TimeUnit.SECONDS));
-
-    p.setTimeDuration(RaftClientConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
-        TimeDuration.valueOf(10, TimeUnit.SECONDS));
   }
 
   @Test
@@ -102,7 +83,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
           RaftClientReply reply = client.io().send(incrementMessage);
           Assert.assertTrue(reply.isSuccess());
           reply = client.io().sendReadOnly(queryMessage);
-          Assert.assertEquals(retrieve(reply), i);
+          Assert.assertEquals(i, retrieve(reply));
         }
       }
     } finally {
@@ -110,39 +91,6 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
-  @Test
-  public void testLinearizableReadParallel() throws Exception {
-    runWithNewCluster(NUM_SERVERS, this::testLinearizableReadParallelImpl);
-  }
-
-  private void testLinearizableReadParallelImpl(CLUSTER cluster) throws 
Exception {
-    try {
-      RaftTestUtil.waitForLeader(cluster);
-      final RaftPeerId leaderId = cluster.getLeader().getId();
-
-      try (RaftClient client = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
-
-        RaftClientReply reply = client.io().send(incrementMessage);
-        Assert.assertTrue(reply.isSuccess());
-
-        CompletableFuture<RaftClientReply> result = 
client.async().send(waitAndIncrementMessage);
-        Thread.sleep(100);
-
-        RaftClientReply staleValueBefore = client.io()
-                .sendStaleRead(queryMessage, 0, leaderId);
-
-        Assert.assertEquals(retrieve(staleValueBefore), 1);
-
-        RaftClientReply linearizableReadValue = client.io()
-            .sendReadOnly(queryMessage);
-        Assert.assertEquals(retrieve(linearizableReadValue), 2);
-
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
   @Test
   public void testLinearizableReadTimeout() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl);
@@ -183,16 +131,16 @@ public abstract class ReadOnlyRequestTests<CLUSTER 
extends MiniRaftCluster>
       List<RaftServer.Division> followers = cluster.getFollowers();
       Assert.assertEquals(2, followers.size());
 
-      try (RaftClient leaderClient = 
cluster.createClient(cluster.getLeader().getId());
-           RaftClient followerClient1 = 
cluster.createClient(followers.get(0).getId());
-           RaftClient followerClient2 = 
cluster.createClient(followers.get(1).getId());) {
+      final RaftPeerId f0 = followers.get(0).getId();
+      final RaftPeerId f1 = followers.get(1).getId();
+      try (RaftClient client = 
cluster.createClient(cluster.getLeader().getId())) {
         for (int i = 1; i <= 10; i++) {
-          RaftClientReply reply = leaderClient.io().send(incrementMessage);
+          final RaftClientReply reply = client.io().send(incrementMessage);
           Assert.assertTrue(reply.isSuccess());
-          RaftClientReply read1 = 
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
-          Assert.assertEquals(retrieve(read1), i);
-          RaftClientReply read2 = 
followerClient2.io().sendReadOnly(queryMessage, followers.get(1).getId());
-          Assert.assertEquals(retrieve(read2), i);
+          final RaftClientReply read1 = client.io().sendReadOnly(queryMessage, 
f0);
+          Assert.assertEquals(i, retrieve(read1));
+          final CompletableFuture<RaftClientReply> read2 = 
client.async().sendReadOnly(queryMessage, f1);
+          Assert.assertEquals(i, retrieve(read2.get(1, TimeUnit.SECONDS)));
         }
       }
     } finally {
@@ -261,7 +209,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
-  private int retrieve(RaftClientReply reply) {
+  static int retrieve(RaftClientReply reply) {
     return 
Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
new file mode 100644
index 000000000..737b8452c
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends 
MiniRaftCluster>
+  extends BaseTest
+  implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+  }
+
+  static final int NUM_SERVERS = 3;
+
+  static final String INCREMENT = "INCREMENT";
+  static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
+  static final String QUERY = "QUERY";
+  final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
+  final Message waitAndIncrementMessage = new 
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
+  final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
+
+  @Before
+  public void setup() {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        ReadOnlyRequestTests.CounterStateMachine.class, StateMachine.class);
+
+    RaftServerConfigKeys.Read.setOption(p, 
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
+    RaftServerConfigKeys.Read.setTimeout(p, TimeDuration.ONE_SECOND);
+    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(p, 
TimeDuration.valueOf(150, TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(p, 
TimeDuration.valueOf(300, TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Rpc.setTimeoutMin(p, TimeDuration.valueOf(3, 
TimeUnit.SECONDS));
+    RaftServerConfigKeys.Rpc.setTimeoutMax(p, TimeDuration.valueOf(6, 
TimeUnit.SECONDS));
+    RaftServerConfigKeys.Rpc.setRequestTimeout(p, TimeDuration.valueOf(10, 
TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testLinearizableReadParallel() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testLinearizableReadParallelImpl);
+  }
+
+  private void testLinearizableReadParallelImpl(CLUSTER cluster) throws 
Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+
+    try (RaftClient client = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
+      final RaftClientReply reply = client.io().send(incrementMessage);
+      Assert.assertTrue(reply.isSuccess());
+
+      client.async().send(waitAndIncrementMessage);
+      Thread.sleep(100);
+
+      RaftClientReply staleValueBefore = 
client.io().sendStaleRead(queryMessage, 0, leaderId);
+
+      Assert.assertEquals(1, ReadOnlyRequestTests.retrieve(staleValueBefore));
+
+      RaftClientReply linearizableReadValue = 
client.io().sendReadOnly(queryMessage);
+      Assert.assertEquals(2, 
ReadOnlyRequestTests.retrieve(linearizableReadValue));
+    }
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithGrpc.java
similarity index 95%
copy from 
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
copy to 
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithGrpc.java
index b89523c61..6b92a541b 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithGrpc.java
@@ -19,7 +19,7 @@ package org.apache.ratis.grpc;
 
 import org.apache.ratis.ReadOnlyRequestTests;
 
-public class TestReadOnlyRequestsWithGrpc
+public class TestReadOnlyRequestWithGrpc
   extends ReadOnlyRequestTests<MiniRaftClusterWithGrpc>
   implements MiniRaftClusterWithGrpc.FactoryGet {
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithLongTimeoutWithGrpc.java
similarity index 82%
rename from 
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
rename to 
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithLongTimeoutWithGrpc.java
index b89523c61..e2c8ae577 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithLongTimeoutWithGrpc.java
@@ -17,9 +17,9 @@
  */
 package org.apache.ratis.grpc;
 
-import org.apache.ratis.ReadOnlyRequestTests;
+import org.apache.ratis.ReadOnlyRequestWithLongTimeoutTests;
 
-public class TestReadOnlyRequestsWithGrpc
-  extends ReadOnlyRequestTests<MiniRaftClusterWithGrpc>
+public class TestReadOnlyRequestWithLongTimeoutWithGrpc
+  extends ReadOnlyRequestWithLongTimeoutTests<MiniRaftClusterWithGrpc>
   implements MiniRaftClusterWithGrpc.FactoryGet {
 }

Reply via email to