This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 684a19aa1 RATIS-1759. Support client use linearizable read per request
(#798)
684a19aa1 is described below
commit 684a19aa1af4c467de2be068234170a6a2ba292f
Author: Yaolong Liu <[email protected]>
AuthorDate: Tue Dec 20 11:56:47 2022 +0800
RATIS-1759. Support client use linearizable read per request (#798)
* Support client use linearizable read per request
* add new interface: sendReadOnlyNonLinearizable
---
.../src/main/java/org/apache/ratis/client/api/AsyncApi.java | 11 +++++++++++
.../main/java/org/apache/ratis/client/api/BlockingApi.java | 11 +++++++++++
.../src/main/java/org/apache/ratis/client/impl/AsyncImpl.java | 5 +++++
.../main/java/org/apache/ratis/client/impl/BlockingImpl.java | 5 +++++
.../java/org/apache/ratis/protocol/RaftClientRequest.java | 8 +++++++-
ratis-proto/src/main/proto/Raft.proto | 1 +
.../java/org/apache/ratis/server/impl/RaftServerImpl.java | 6 ++++--
7 files changed, 44 insertions(+), 3 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 84a4f5437..c6f5e4181 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -55,6 +55,17 @@ public interface AsyncApi {
*/
CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeerId
server);
+ /**
+ * Send the given readonly message asynchronously to the raft service using
non-linearizable read.
+ * This method is useful when linearizable read is enabled
+ * but this client prefers not using it for performance reason.
+ * When linearizable read is disabled, this method is the same as {@link
#sendReadOnly(Message)}.
+ *
+ * @param message The request message.
+ * @return a future of the reply.
+ */
+ CompletableFuture<RaftClientReply> sendReadOnlyNonLinearizable(Message
message);
+
/** The same as sendReadOnlyUnordered(message, null). */
default CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message
message) {
return sendReadOnlyUnordered(message, null);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index 4a5237afc..dc03e1b8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -54,6 +54,17 @@ public interface BlockingApi {
*/
RaftClientReply sendReadOnly(Message message, RaftPeerId server) throws
IOException;
+ /**
+ * Send the given readonly message to the raft service using
non-linearizable read.
+ * This method is useful when linearizable read is enabled
+ * but this client prefers not using it for performance reason.
+ * When linearizable read is disabled, this method is the same as {@link
#sendReadOnly(Message)}.
+ *
+ * @param message The request message.
+ * @return the reply.
+ */
+ RaftClientReply sendReadOnlyNonLinearizable(Message message) throws
IOException;
+
/**
* Send the given stale-read message to the given server (not the raft
service).
* If the server commit index is larger than or equal to the given
min-index, the request will be processed.
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 4672f5ecf..9bdc9d50a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -51,6 +51,11 @@ class AsyncImpl implements AsyncRpcApi {
return send(RaftClientRequest.readRequestType(), message, server);
}
+ @Override
+ public CompletableFuture<RaftClientReply>
sendReadOnlyNonLinearizable(Message message) {
+ return send(RaftClientRequest.readRequestType(true), message, null);
+ }
+
@Override
public CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message
message, RaftPeerId server) {
return UnorderedAsync.send(RaftClientRequest.readRequestType(), message,
server, client);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index ea06cdca8..7e81baf8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -64,6 +64,11 @@ class BlockingImpl implements BlockingApi {
return send(RaftClientRequest.readRequestType(), message, server);
}
+ @Override
+ public RaftClientReply sendReadOnlyNonLinearizable(Message message) throws
IOException {
+ return send(RaftClientRequest.readRequestType(true), message, null);
+ }
+
@Override
public RaftClientReply sendStaleRead(Message message, long minIndex,
RaftPeerId server)
throws IOException {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 8fef42d8a..ae76607ff 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -36,6 +36,8 @@ public class RaftClientRequest extends RaftClientMessage {
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
private static final Type READ_DEFAULT = new
Type(ReadRequestTypeProto.getDefaultInstance());
+ private static final Type
+ READ_NONLINEARIZABLE_DEFAULT = new
Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build());
private static final Type STALE_READ_DEFAULT = new
Type(StaleReadRequestTypeProto.getDefaultInstance());
public static Type writeRequestType() {
@@ -62,6 +64,10 @@ public class RaftClientRequest extends RaftClientMessage {
return READ_DEFAULT;
}
+ public static Type readRequestType(boolean nonLinearizable) {
+ return nonLinearizable? READ_NONLINEARIZABLE_DEFAULT: readRequestType();
+ }
+
public static Type staleReadRequestType(long minIndex) {
return minIndex == 0L? STALE_READ_DEFAULT
: new
Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
@@ -89,7 +95,7 @@ public class RaftClientRequest extends RaftClientMessage {
}
public static Type valueOf(ReadRequestTypeProto read) {
- return READ_DEFAULT;
+ return read.getPreferNonLinearizable()? READ_NONLINEARIZABLE_DEFAULT:
READ_DEFAULT;
}
public static Type valueOf(StaleReadRequestTypeProto staleRead) {
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index d90e57932..b83c375c6 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -294,6 +294,7 @@ message ForwardRequestTypeProto {
}
message ReadRequestTypeProto {
+ bool preferNonLinearizable = 1;
}
message StaleReadRequestTypeProto {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d6224bf44..d04f3126d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -935,7 +935,8 @@ class RaftServerImpl implements RaftServer.Division,
}
private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest
request) {
- if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+ if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE
+ && !request.getType().getRead().getPreferNonLinearizable()) {
/*
Linearizable read using ReadIndex. See Raft paper section 6.4.
1. First obtain readIndex from Leader.
@@ -962,7 +963,8 @@ class RaftServerImpl implements RaftServer.Division,
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
- } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
+ } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT
+ || request.getType().getRead().getPreferNonLinearizable()) {
CompletableFuture<RaftClientReply> reply = checkLeaderState(request,
null, false);
if (reply != null) {
return reply;