This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9df2c8933 RATIS-1715. Support linearizable read in AsyncApi. (#754)
9df2c8933 is described below
commit 9df2c89335cc94858e9e5395e2ac955b631cb36e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Oct 7 17:58:49 2022 +0800
RATIS-1715. Support linearizable read in AsyncApi. (#754)
---
.../java/org/apache/ratis/client/api/AsyncApi.java | 8 +-
.../org/apache/ratis/client/api/BlockingApi.java | 14 ++--
.../org/apache/ratis/client/impl/AsyncImpl.java | 4 +-
.../org/apache/ratis/client/impl/BlockingImpl.java | 5 --
.../org/apache/ratis/client/impl/OrderedAsync.java | 2 +-
.../org/apache/ratis/ReadOnlyRequestTests.java | 72 +++-------------
.../ratis/ReadOnlyRequestWithLongTimeoutTests.java | 95 ++++++++++++++++++++++
...hGrpc.java => TestReadOnlyRequestWithGrpc.java} | 2 +-
...estReadOnlyRequestWithLongTimeoutWithGrpc.java} | 6 +-
9 files changed, 125 insertions(+), 83 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 1e8da9bde..7041bd2b6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -40,13 +40,19 @@ public interface AsyncApi {
*/
CompletableFuture<RaftClientReply> send(Message message);
+ /** The same as sendReadOnly(message, null). */
+ default CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
+ return sendReadOnly(message, null);
+ }
+
/**
* Send the given readonly message asynchronously to the raft service.
*
* @param message The request message.
+ * @param server The target server. When server == null, send the message
to the leader.
* @return a future of the reply.
*/
- CompletableFuture<RaftClientReply> sendReadOnly(Message message);
+ CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeerId
server);
/**
* Send the given stale-read message asynchronously to the given server (not
the raft service).
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index c238fa4c9..7493929b0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -40,18 +40,16 @@ public interface BlockingApi {
*/
RaftClientReply send(Message message) throws IOException;
+ /** The same as sendReadOnly(message, null). */
+ default RaftClientReply sendReadOnly(Message message) throws IOException {
+ return sendReadOnly(message, null);
+ }
+
/**
* Send the given readonly message to the raft service.
*
* @param message The request message.
- * @return the reply.
- */
- RaftClientReply sendReadOnly(Message message) throws IOException;
-
- /**
- * Send the given readonly message to the given server (not the raft service)
- * @param message The request message.
- * @param server The target server
+ * @param server The target server. When server == null, send the message
to the leader.
* @return the reply.
*/
RaftClientReply sendReadOnly(Message message, RaftPeerId server) throws
IOException;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index e063eebe2..abaca2b0f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -47,8 +47,8 @@ class AsyncImpl implements AsyncRpcApi {
}
@Override
- public CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
- return send(RaftClientRequest.readRequestType(), message, null);
+ public CompletableFuture<RaftClientReply> sendReadOnly(Message message,
RaftPeerId server) {
+ return send(RaftClientRequest.readRequestType(), message, server);
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 0c5458168..ea06cdca8 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -59,11 +59,6 @@ class BlockingImpl implements BlockingApi {
return send(RaftClientRequest.writeRequestType(), message, null);
}
- @Override
- public RaftClientReply sendReadOnly(Message message) throws IOException {
- return send(RaftClientRequest.readRequestType(), message, null);
- }
-
@Override
public RaftClientReply sendReadOnly(Message message, RaftPeerId server)
throws IOException {
return send(RaftClientRequest.readRequestType(), message, server);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 225156620..5405d4906 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -136,7 +136,7 @@ public final class OrderedAsync {
}
private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>
getSlidingWindow(RaftClientRequest request) {
- return getSlidingWindow(request.is(TypeCase.STALEREAD) ?
request.getServerId() : null);
+ return getSlidingWindow(request.isToLeader()? null: request.getServerId());
}
private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>
getSlidingWindow(RaftPeerId target) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 61d43d85b..11a34bf52 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -19,7 +19,6 @@ package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@@ -33,17 +32,13 @@ import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.Log4jUtils;
-import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,11 +54,9 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
static final String INCREMENT = "INCREMENT";
static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
- static final String TIMEOUT_INCREMENT = "TIMEOUT_INCREMENT";
static final String QUERY = "QUERY";
final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
final Message waitAndIncrementMessage = new
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
- final Message timeoutIncrement = new
RaftTestUtil.SimpleMessage(TIMEOUT_INCREMENT);
final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
@Before
@@ -73,18 +66,6 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
CounterStateMachine.class, StateMachine.class);
p.setEnum(RaftServerConfigKeys.Read.OPTION_KEY,
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
- p.setTimeDuration(RaftServerConfigKeys.Read.TIMEOUT_KEY,
TimeDuration.valueOf(1, TimeUnit.SECONDS));
- p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MIN_KEY,
- TimeDuration.valueOf(150, TimeUnit.MILLISECONDS));
- p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MAX_KEY,
- TimeDuration.valueOf(300, TimeUnit.MILLISECONDS));
- p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MIN_KEY,
TimeDuration.valueOf(3, TimeUnit.SECONDS));
- p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MAX_KEY,
TimeDuration.valueOf(6, TimeUnit.SECONDS));
- p.setTimeDuration(RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
- TimeDuration.valueOf(10, TimeUnit.SECONDS));
-
- p.setTimeDuration(RaftClientConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
- TimeDuration.valueOf(10, TimeUnit.SECONDS));
}
@Test
@@ -102,7 +83,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
RaftClientReply reply = client.io().send(incrementMessage);
Assert.assertTrue(reply.isSuccess());
reply = client.io().sendReadOnly(queryMessage);
- Assert.assertEquals(retrieve(reply), i);
+ Assert.assertEquals(i, retrieve(reply));
}
}
} finally {
@@ -110,39 +91,6 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
}
}
- @Test
- public void testLinearizableReadParallel() throws Exception {
- runWithNewCluster(NUM_SERVERS, this::testLinearizableReadParallelImpl);
- }
-
- private void testLinearizableReadParallelImpl(CLUSTER cluster) throws
Exception {
- try {
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
-
- try (RaftClient client = cluster.createClient(leaderId,
RetryPolicies.noRetry())) {
-
- RaftClientReply reply = client.io().send(incrementMessage);
- Assert.assertTrue(reply.isSuccess());
-
- CompletableFuture<RaftClientReply> result =
client.async().send(waitAndIncrementMessage);
- Thread.sleep(100);
-
- RaftClientReply staleValueBefore = client.io()
- .sendStaleRead(queryMessage, 0, leaderId);
-
- Assert.assertEquals(retrieve(staleValueBefore), 1);
-
- RaftClientReply linearizableReadValue = client.io()
- .sendReadOnly(queryMessage);
- Assert.assertEquals(retrieve(linearizableReadValue), 2);
-
- }
- } finally {
- cluster.shutdown();
- }
- }
-
@Test
public void testLinearizableReadTimeout() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl);
@@ -183,16 +131,16 @@ public abstract class ReadOnlyRequestTests<CLUSTER
extends MiniRaftCluster>
List<RaftServer.Division> followers = cluster.getFollowers();
Assert.assertEquals(2, followers.size());
- try (RaftClient leaderClient =
cluster.createClient(cluster.getLeader().getId());
- RaftClient followerClient1 =
cluster.createClient(followers.get(0).getId());
- RaftClient followerClient2 =
cluster.createClient(followers.get(1).getId());) {
+ final RaftPeerId f0 = followers.get(0).getId();
+ final RaftPeerId f1 = followers.get(1).getId();
+ try (RaftClient client =
cluster.createClient(cluster.getLeader().getId())) {
for (int i = 1; i <= 10; i++) {
- RaftClientReply reply = leaderClient.io().send(incrementMessage);
+ final RaftClientReply reply = client.io().send(incrementMessage);
Assert.assertTrue(reply.isSuccess());
- RaftClientReply read1 =
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
- Assert.assertEquals(retrieve(read1), i);
- RaftClientReply read2 =
followerClient2.io().sendReadOnly(queryMessage, followers.get(1).getId());
- Assert.assertEquals(retrieve(read2), i);
+ final RaftClientReply read1 = client.io().sendReadOnly(queryMessage,
f0);
+ Assert.assertEquals(i, retrieve(read1));
+ final CompletableFuture<RaftClientReply> read2 =
client.async().sendReadOnly(queryMessage, f1);
+ Assert.assertEquals(i, retrieve(read2.get(1, TimeUnit.SECONDS)));
}
}
} finally {
@@ -261,7 +209,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends
MiniRaftCluster>
}
}
- private int retrieve(RaftClientReply reply) {
+ static int retrieve(RaftClientReply reply) {
return
Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
new file mode 100644
index 000000000..737b8452c
--- /dev/null
+++
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends
MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+ {
+ Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+ }
+
+ static final int NUM_SERVERS = 3;
+
+ static final String INCREMENT = "INCREMENT";
+ static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
+ static final String QUERY = "QUERY";
+ final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
+ final Message waitAndIncrementMessage = new
RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
+ final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
+
+ @Before
+ public void setup() {
+ final RaftProperties p = getProperties();
+ p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ ReadOnlyRequestTests.CounterStateMachine.class, StateMachine.class);
+
+ RaftServerConfigKeys.Read.setOption(p,
RaftServerConfigKeys.Read.Option.LINEARIZABLE);
+ RaftServerConfigKeys.Read.setTimeout(p, TimeDuration.ONE_SECOND);
+ RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(p,
TimeDuration.valueOf(150, TimeUnit.MILLISECONDS));
+ RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(p,
TimeDuration.valueOf(300, TimeUnit.MILLISECONDS));
+ RaftServerConfigKeys.Rpc.setTimeoutMin(p, TimeDuration.valueOf(3,
TimeUnit.SECONDS));
+ RaftServerConfigKeys.Rpc.setTimeoutMax(p, TimeDuration.valueOf(6,
TimeUnit.SECONDS));
+ RaftServerConfigKeys.Rpc.setRequestTimeout(p, TimeDuration.valueOf(10,
TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testLinearizableReadParallel() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::testLinearizableReadParallelImpl);
+ }
+
+ private void testLinearizableReadParallelImpl(CLUSTER cluster) throws
Exception {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (RaftClient client = cluster.createClient(leaderId,
RetryPolicies.noRetry())) {
+ final RaftClientReply reply = client.io().send(incrementMessage);
+ Assert.assertTrue(reply.isSuccess());
+
+ client.async().send(waitAndIncrementMessage);
+ Thread.sleep(100);
+
+ RaftClientReply staleValueBefore =
client.io().sendStaleRead(queryMessage, 0, leaderId);
+
+ Assert.assertEquals(1, ReadOnlyRequestTests.retrieve(staleValueBefore));
+
+ RaftClientReply linearizableReadValue =
client.io().sendReadOnly(queryMessage);
+ Assert.assertEquals(2,
ReadOnlyRequestTests.retrieve(linearizableReadValue));
+ }
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithGrpc.java
similarity index 95%
copy from
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
copy to
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithGrpc.java
index b89523c61..6b92a541b 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithGrpc.java
@@ -19,7 +19,7 @@ package org.apache.ratis.grpc;
import org.apache.ratis.ReadOnlyRequestTests;
-public class TestReadOnlyRequestsWithGrpc
+public class TestReadOnlyRequestWithGrpc
extends ReadOnlyRequestTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithLongTimeoutWithGrpc.java
similarity index 82%
rename from
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
rename to
ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithLongTimeoutWithGrpc.java
index b89523c61..e2c8ae577 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestWithLongTimeoutWithGrpc.java
@@ -17,9 +17,9 @@
*/
package org.apache.ratis.grpc;
-import org.apache.ratis.ReadOnlyRequestTests;
+import org.apache.ratis.ReadOnlyRequestWithLongTimeoutTests;
-public class TestReadOnlyRequestsWithGrpc
- extends ReadOnlyRequestTests<MiniRaftClusterWithGrpc>
+public class TestReadOnlyRequestWithLongTimeoutWithGrpc
+ extends ReadOnlyRequestWithLongTimeoutTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
}