This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ab9fc535d RATIS-1701. Add new Server RPC: readIndex (#738)
ab9fc535d is described below
commit ab9fc535dc6f48ba1d93687fc46022989beb6b0a
Author: William Song <[email protected]>
AuthorDate: Sun Sep 11 15:21:48 2022 +0800
RATIS-1701. Add new Server RPC: readIndex (#738)
---
.../org/apache/ratis/client/api/BlockingApi.java | 8 ++
.../org/apache/ratis/client/impl/BlockingImpl.java | 5 +
.../apache/ratis/client/impl/ClientProtoUtils.java | 8 ++
.../org/apache/ratis/protocol/RaftClientReply.java | 7 +-
.../ratis/protocol/exceptions/ReadException.java | 4 +
.../grpc/server/GrpcServerProtocolClient.java | 5 +
.../grpc/server/GrpcServerProtocolService.java | 14 +++
.../org/apache/ratis/grpc/server/GrpcService.java | 45 ++++++++
ratis-proto/src/main/proto/Grpc.proto | 3 +
ratis-proto/src/main/proto/Raft.proto | 10 ++
.../apache/ratis/server/RaftServerConfigKeys.java | 6 +-
.../org/apache/ratis/server/RaftServerRpc.java | 7 ++
.../protocol/RaftServerAsynchronousProtocol.java | 5 +
.../apache/ratis/server/impl/RaftServerImpl.java | 61 +++++++++--
.../apache/ratis/server/impl/RaftServerProxy.java | 9 ++
.../apache/ratis/server/impl/ServerProtoUtils.java | 15 +++
.../org/apache/ratis/ReadOnlyRequestTests.java | 113 ++++++++++++++++++---
17 files changed, 299 insertions(+), 26 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index 84f9740e2..c238fa4c9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -48,6 +48,14 @@ public interface BlockingApi {
*/
RaftClientReply sendReadOnly(Message message) throws IOException;
+ /**
+ * Send the given readonly message to the given server (not the raft service)
+ * @param message The request message.
+ * @param server The target server
+ * @return the reply.
+ */
+ RaftClientReply sendReadOnly(Message message, RaftPeerId server) throws
IOException;
+
/**
* Send the given stale-read message to the given server (not the raft
service).
* If the server commit index is larger than or equal to the given
min-index, the request will be processed.
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 49bea5731..0c5458168 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -64,6 +64,11 @@ class BlockingImpl implements BlockingApi {
return send(RaftClientRequest.readRequestType(), message, null);
}
+ @Override
+ public RaftClientReply sendReadOnly(Message message, RaftPeerId server)
throws IOException {
+ return send(RaftClientRequest.readRequestType(), message, server);
+ }
+
@Override
public RaftClientReply sendStaleRead(Message message, long minIndex,
RaftPeerId server)
throws IOException {
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 1ac825850..4496d2092 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -29,6 +29,7 @@ import
org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.rpc.CallId;
@@ -47,6 +48,7 @@ import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDe
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERSTEPPINGDOWNEXCEPTION;
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
+import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.READEXCEPTION;
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.TRANSFERLEADERSHIPEXCEPTION;
@@ -303,6 +305,10 @@ public interface ClientProtoUtils {
.map(ProtoUtils::toThrowableProto)
.ifPresent(b::setTransferLeadershipException);
+ Optional.ofNullable(reply.getReadException())
+ .map(ProtoUtils::toThrowableProto)
+ .ifPresent(b::setReadException);
+
final RaftClientReplyProto serialized = b.build();
final RaftException e = reply.getException();
if (e != null) {
@@ -392,6 +398,8 @@ public interface ClientProtoUtils {
e = ProtoUtils.toThrowable(replyProto.getLeaderSteppingDownException(),
LeaderSteppingDownException.class);
} else if
(replyProto.getExceptionDetailsCase().equals(TRANSFERLEADERSHIPEXCEPTION)) {
e = ProtoUtils.toThrowable(replyProto.getTransferLeadershipException(),
TransferLeadershipException.class);
+ } else if (replyProto.getExceptionDetailsCase().equals(READEXCEPTION)) {
+ e = ProtoUtils.toThrowable(replyProto.getReadException(),
ReadException.class);
} else {
e = null;
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index ada67c931..6d29c452c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -25,6 +25,7 @@ import
org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.util.JavaUtils;
@@ -169,7 +170,7 @@ public class RaftClientReply extends RaftClientMessage {
AlreadyClosedException.class,
NotLeaderException.class, NotReplicatedException.class,
LeaderNotReadyException.class, StateMachineException.class,
DataStreamException.class,
- LeaderSteppingDownException.class,
TransferLeadershipException.class),
+ LeaderSteppingDownException.class,
TransferLeadershipException.class, ReadException.class),
() -> "Unexpected exception class: " + this);
}
}
@@ -245,6 +246,10 @@ public class RaftClientReply extends RaftClientMessage {
return JavaUtils.cast(exception, TransferLeadershipException.class);
}
+ public ReadException getReadException() {
+ return JavaUtils.cast(exception, ReadException.class);
+ }
+
/** @return the exception, if there is any; otherwise, return null. */
public RaftException getException() {
return exception;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
index 32828418d..1b1a20b8d 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
@@ -21,6 +21,10 @@ package org.apache.ratis.protocol.exceptions;
* This exception indicates the failure of a read request.
*/
public class ReadException extends RaftException {
+ public ReadException(String message) {
+ super(message);
+ }
+
public ReadException(Throwable cause) {
super(cause);
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 6f5d06c2e..a8ee5f30d 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -130,6 +130,11 @@ public class GrpcServerProtocolClient implements Closeable
{
return r;
}
+ void readIndex(ReadIndexRequestProto request,
StreamObserver<ReadIndexReplyProto> s) {
+ asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(),
requestTimeoutDuration.getUnit())
+ .readIndex(request, s);
+ }
+
StreamObserver<AppendEntriesRequestProto> appendEntries(
StreamObserver<AppendEntriesReplyProto> responseHandler, boolean
isHeartbeat) {
if (isHeartbeat && useSeparateHBChannel) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 5ad3909d5..6f98a6b8a 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -197,6 +197,20 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
}
}
+ @Override
+ public void readIndex(ReadIndexRequestProto request,
StreamObserver<ReadIndexReplyProto> responseObserver) {
+ try {
+ server.readIndexAsync(request).thenAccept(reply -> {
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ });
+ } catch (Throwable e) {
+ GrpcUtil.warn(LOG,
+ () -> getId() + ": Failed readIndex " +
ProtoUtils.toString(request.getServerRequest()), e);
+ responseObserver.onError(GrpcUtil.wrapException(e));
+ }
+ }
+
@Override
public StreamObserver<AppendEntriesRequestProto> appendEntries(
StreamObserver<AppendEntriesReplyProto> responseObserver) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 40e413915..f21619555 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -27,10 +27,12 @@ import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpcWithProxy;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
@@ -44,6 +46,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@@ -56,6 +59,41 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
public static final String GRPC_SEND_SERVER_REQUEST =
JavaUtils.getClassSimpleName(GrpcService.class) + ".sendRequest";
+ class AsyncService implements RaftServerAsynchronousProtocol {
+
+ @Override
+ public CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(AppendEntriesRequestProto request)
+ throws IOException {
+ throw new UnsupportedOperationException("This method is not supported");
+ }
+
+ @Override
+ public CompletableFuture<ReadIndexReplyProto>
readIndexAsync(ReadIndexRequestProto request) throws IOException {
+ CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), null,
request);
+
+ final CompletableFuture<ReadIndexReplyProto> f = new
CompletableFuture<>();
+ final StreamObserver<ReadIndexReplyProto> s = new
StreamObserver<ReadIndexReplyProto>() {
+ @Override
+ public void onNext(ReadIndexReplyProto reply) {
+ f.complete(reply);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ f.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+
+ final RaftPeerId target =
RaftPeerId.valueOf(request.getServerRequest().getReplyId());
+ getProxies().getProxy(target).readIndex(request, s);
+ return f;
+ }
+ }
+
public static final class Builder {
private RaftServer server;
private GrpcTlsConfig tlsConfig;
@@ -108,6 +146,8 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
private final Supplier<InetSocketAddress> clientServerAddressSupplier;
private final Supplier<InetSocketAddress> adminServerAddressSupplier;
+ private final AsyncService asyncService = new AsyncService();
+
private final ExecutorService executor;
private final GrpcClientProtocolService clientProtocolService;
@@ -312,6 +352,11 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
return adminServerAddressSupplier.get();
}
+ @Override
+ public RaftServerAsynchronousProtocol async() {
+ return asyncService;
+ }
+
@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto
request) {
throw new UnsupportedOperationException(
diff --git a/ratis-proto/src/main/proto/Grpc.proto
b/ratis-proto/src/main/proto/Grpc.proto
index 06af061e7..edcd863a1 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -45,6 +45,9 @@ service RaftServerProtocolService {
rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
returns(stream ratis.common.InstallSnapshotReplyProto) {}
+
+ rpc readIndex(ratis.common.ReadIndexRequestProto)
+ returns(stream ratis.common.ReadIndexReplyProto) {}
}
service AdminProtocolService {
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 9fe2494bf..1fd2a9d1e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -239,6 +239,15 @@ message InstallSnapshotReplyProto {
}
}
+message ReadIndexRequestProto {
+ RaftRpcRequestProto serverRequest = 1;
+}
+
+message ReadIndexReplyProto {
+ RaftRpcReplyProto serverReply = 1;
+ uint64 readIndex = 2;
+}
+
message ClientMessageEntryProto {
bytes content = 1;
}
@@ -399,6 +408,7 @@ message RaftClientReplyProto {
ThrowableProto dataStreamException = 8;
ThrowableProto leaderSteppingDownException = 9;
ThrowableProto transferLeadershipException = 10;
+ ThrowableProto readException = 11;
}
uint64 logIndex = 14; // When the request is a write request and the reply
is success, the log index of the transaction
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 625de2e1a..5ae1020c5 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -182,7 +182,11 @@ public interface RaftServerConfigKeys {
String OPTION_KEY = ".option";
Option OPTION_DEFAULT = Option.DEFAULT;
static Option option(RaftProperties properties) {
- return get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT,
getDefaultLog());
+ Option option = get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT,
getDefaultLog());
+ if (option != Option.DEFAULT && option != Option.LINEARIZABLE) {
+ throw new IllegalArgumentException("Unexpected read option: " +
option);
+ }
+ return option;
}
static void setOption(RaftProperties properties, Option option) {
set(properties::setEnum, OPTION_KEY, option);
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index 5c8f99c90..d81f9cc8b 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -21,7 +21,9 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.util.JavaUtils;
import java.io.Closeable;
import java.io.IOException;
@@ -53,4 +55,9 @@ public interface RaftServerRpc extends RaftServerProtocol,
RpcType.Get, RaftPeer
/** The server role changes from leader to a non-leader role. */
default void notifyNotLeader(RaftGroupId groupId) {
}
+
+ default RaftServerAsynchronousProtocol async() {
+ throw new UnsupportedOperationException(getClass().getName()
+ + " does not support " +
JavaUtils.getClassSimpleName(RaftServerAsynchronousProtocol.class));
+ }
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
index 2c7e1d5a5..8a904069b 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
@@ -18,6 +18,8 @@
package org.apache.ratis.server.protocol;
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
@@ -28,4 +30,7 @@ public interface RaftServerAsynchronousProtocol {
CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(AppendEntriesRequestProto request)
throws IOException;
+
+ CompletableFuture<ReadIndexReplyProto> readIndexAsync(ReadIndexRequestProto
request)
+ throws IOException;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f9094b51d..70eca4301 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -824,6 +824,8 @@ class RaftServerImpl implements RaftServer.Division,
if (request.is(TypeCase.STALEREAD)) {
replyFuture = staleReadAsync(request);
+ } else if (request.is(TypeCase.READ)) {
+ replyFuture = readAsync(request);
} else {
// first check the server's leader state
CompletableFuture<RaftClientReply> reply = checkLeaderState(request,
null,
@@ -845,11 +847,7 @@ class RaftServerImpl implements RaftServer.Division,
}
}
- if (type.is(TypeCase.READ)) {
- // TODO: We might not be the leader anymore by the time this completes.
- // See the RAFT paper section 8 (last part)
- replyFuture = readAsync(request);
- } else if (type.is(TypeCase.WATCH)) {
+ if (type.is(TypeCase.WATCH)) {
replyFuture = watchAsync(request);
} else if (type.is(TypeCase.MESSAGESTREAM)) {
replyFuture = streamAsync(request);
@@ -914,6 +912,16 @@ class RaftServerImpl implements RaftServer.Division,
return getState().getReadRequests();
}
+ private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync() {
+ final ReadIndexRequestProto request =
ServerProtoUtils.toReadIndexRequestProto(
+ getMemberId(), getInfo().getLeaderId());
+ try {
+ return getServerRpc().async().readIndexAsync(request);
+ } catch (IOException e) {
+ return JavaUtils.completeExceptionally(e);
+ }
+ }
+
private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest
request) {
if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
/*
@@ -923,16 +931,31 @@ class RaftServerImpl implements RaftServer.Division,
3. Finally, query the statemachine and return the result.
*/
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
- // TODO support follower linearizable read
- if (leader == null) {
- return JavaUtils.completeExceptionally(generateNotLeaderException());
+
+ final CompletableFuture<Long> replyFuture;
+ if (leader != null) {
+ replyFuture = leader.getReadIndex();
+ } else {
+ replyFuture = sendReadIndexAsync().thenApply(reply -> {
+ if (reply.getServerReply().getSuccess()) {
+ return reply.getReadIndex();
+ } else {
+ throw new CompletionException(new ReadException(getId() +
+ ": Failed to get read index from the leader: " + reply));
+ }
+ });
}
- return leader.getReadIndex()
+
+ return replyFuture
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
} else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
- return queryStateMachine(request);
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request,
null, false);
+ if (reply != null) {
+ return reply;
+ }
+ return queryStateMachine(request);
} else {
throw new IllegalStateException("Unexpected read option: " + readOption);
}
@@ -1354,6 +1377,24 @@ class RaftServerImpl implements RaftServer.Division,
}
}
+ @Override
+ public CompletableFuture<ReadIndexReplyProto>
readIndexAsync(ReadIndexRequestProto request) throws IOException {
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+
+ final RaftPeerId peerId =
RaftPeerId.valueOf(request.getServerRequest().getRequestorId());
+
+ final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+ if (leader == null) {
+ return CompletableFuture.completedFuture(
+ ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false,
INVALID_LOG_INDEX));
+ }
+
+ return leader.getReadIndex()
+ .thenApply(index -> ServerProtoUtils.toReadIndexReplyProto(peerId,
getMemberId(), true, index))
+ .exceptionally(throwable ->
+ ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(),
false, INVALID_LOG_INDEX));
+ }
+
static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
if (isHeartbeat) {
if (LOG.isTraceEnabled()) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0825e7d12..cd20c8cd4 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -21,6 +21,8 @@ import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
@@ -626,6 +628,13 @@ class RaftServerProxy implements RaftServer {
.thenCompose(impl -> impl.executeSubmitServerRequestAsync(() ->
impl.appendEntriesAsync(request)));
}
+ @Override
+ public CompletableFuture<ReadIndexReplyProto>
readIndexAsync(ReadIndexRequestProto request) throws IOException {
+ final RaftGroupId groupId =
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
+ return getImplFuture(groupId)
+ .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() ->
impl.readIndexAsync(request)));
+ }
+
@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto
request) throws IOException {
return getImpl(request.getServerRequest()).appendEntries(request);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index deae754c3..108b6c939 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -109,6 +109,21 @@ final class ServerProtoUtils {
return builder.build();
}
+ static ReadIndexRequestProto toReadIndexRequestProto(
+ RaftGroupMemberId requestorId, RaftPeerId replyId) {
+ return ReadIndexRequestProto.newBuilder()
+
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId,
replyId))
+ .build();
+ }
+
+ static ReadIndexReplyProto toReadIndexReplyProto(
+ RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long
index) {
+ return ReadIndexReplyProto.newBuilder()
+ .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId,
success))
+ .setReadIndex(index)
+ .build();
+ }
+
@SuppressWarnings("parameternumber")
static AppendEntriesReplyProto toAppendEntriesReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
diff --git
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 5c7383000..f611ac4d1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -24,6 +24,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -37,7 +38,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -163,25 +167,106 @@ public abstract class ReadOnlyRequestTests<CLUSTER
extends MiniRaftCluster>
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
- try (final RaftClient client = cluster.createClient(leaderId)) {
+ try (final RaftClient client = cluster.createClient(leaderId);
+ final RaftClient noRetry = cluster.createClient(leaderId,
RetryPolicies.noRetry())) {
- Semaphore canRead = new Semaphore(0);
+ CompletableFuture<RaftClientReply> result =
client.async().send(incrementMessage);
+ client.admin().transferLeadership(null, 200);
- Thread thread = new Thread(() -> {
- try (RaftClient noRetryClient = cluster.createClient(leaderId,
RetryPolicies.noRetry())) {
- canRead.acquire();
- // we still have to sleep for a while to guarantee that the async
write arrives at RaftServer
- Thread.sleep(100);
- RaftClientReply timeoutReply =
noRetryClient.io().sendReadOnly(queryMessage);
- Assert.assertNotNull(timeoutReply.getException());
- } catch (Exception ignored) {}
+ Assert.assertThrows(ReadException.class, () -> {
+ RaftClientReply timeoutReply =
noRetry.io().sendReadOnly(queryMessage);
+ Assert.assertNotNull(timeoutReply.getException());
+ Assert.assertTrue(timeoutReply.getException() instanceof
ReadException);
});
+ }
- thread.start();
- CompletableFuture<RaftClientReply> result =
client.async().send(timeoutIncrement);
- canRead.release();
+ } finally {
+ cluster.shutdown();
+ }
+ }
- thread.join();
+ @Test
+ public void testFollowerLinearizableRead() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadImpl);
+ }
+
+ private void testFollowerLinearizableReadImpl(CLUSTER cluster) throws
Exception {
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+
+ List<RaftServer.Division> followers = cluster.getFollowers();
+ Assert.assertEquals(2, followers.size());
+
+ try (RaftClient leaderClient =
cluster.createClient(cluster.getLeader().getId());
+ RaftClient followerClient1 =
cluster.createClient(followers.get(0).getId());
+ RaftClient followerClient2 =
cluster.createClient(followers.get(1).getId());) {
+ for (int i = 1; i <= 10; i++) {
+ RaftClientReply reply = leaderClient.io().send(incrementMessage);
+ Assert.assertTrue(reply.isSuccess());
+ RaftClientReply read1 =
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
+ Assert.assertEquals(retrieve(read1), i);
+ RaftClientReply read2 =
followerClient2.io().sendReadOnly(queryMessage, followers.get(1).getId());
+ Assert.assertEquals(retrieve(read2), i);
+ }
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFollowerLinearizableReadParallel() throws Exception {
+ runWithNewCluster(NUM_SERVERS,
this::testFollowerLinearizableReadParallelImpl);
+ }
+
+ private void testFollowerLinearizableReadParallelImpl(CLUSTER cluster)
throws Exception {
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+
+ List<RaftServer.Division> followers = cluster.getFollowers();
+ Assert.assertEquals(2, followers.size());
+
+ try (RaftClient leaderClient =
cluster.createClient(cluster.getLeader().getId());
+ RaftClient followerClient1 =
cluster.createClient(followers.get(0).getId())) {
+
+ leaderClient.io().send(incrementMessage);
+ leaderClient.async().send(waitAndIncrementMessage);
+
+ RaftClientReply clientReply =
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
+ Assert.assertEquals(2, retrieve(clientReply));
+ }
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testFollowerLinearizableReadFailWhenLeaderDown() throws
Exception {
+ runWithNewCluster(NUM_SERVERS,
this::testFollowerLinearizableReadFailWhenLeaderDownImpl);
+ }
+
+ private void testFollowerLinearizableReadFailWhenLeaderDownImpl(CLUSTER
cluster) throws Exception {
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+
+ List<RaftServer.Division> followers = cluster.getFollowers();
+ Assert.assertEquals(2, followers.size());
+
+ try (RaftClient leaderClient =
cluster.createClient(cluster.getLeader().getId());
+ RaftClient followerClient1 =
cluster.createClient(followers.get(0).getId(), RetryPolicies.noRetry())) {
+ leaderClient.io().send(incrementMessage);
+
+ RaftClientReply clientReply =
followerClient1.io().sendReadOnly(queryMessage);
+ Assert.assertEquals(1, retrieve(clientReply));
+
+ // kill the leader
+ // read timeout quicker than election timeout
+ leaderClient.admin().transferLeadership(null, 200);
+
+ Assert.assertThrows(ReadException.class, () -> {
+ followerClient1.io().sendReadOnly(queryMessage,
followers.get(0).getId());
+ });
}
} finally {