This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ab9fc535d RATIS-1701. Add new Server RPC: readIndex (#738)
ab9fc535d is described below

commit ab9fc535dc6f48ba1d93687fc46022989beb6b0a
Author: William Song <[email protected]>
AuthorDate: Sun Sep 11 15:21:48 2022 +0800

    RATIS-1701. Add new Server RPC: readIndex (#738)
---
 .../org/apache/ratis/client/api/BlockingApi.java   |   8 ++
 .../org/apache/ratis/client/impl/BlockingImpl.java |   5 +
 .../apache/ratis/client/impl/ClientProtoUtils.java |   8 ++
 .../org/apache/ratis/protocol/RaftClientReply.java |   7 +-
 .../ratis/protocol/exceptions/ReadException.java   |   4 +
 .../grpc/server/GrpcServerProtocolClient.java      |   5 +
 .../grpc/server/GrpcServerProtocolService.java     |  14 +++
 .../org/apache/ratis/grpc/server/GrpcService.java  |  45 ++++++++
 ratis-proto/src/main/proto/Grpc.proto              |   3 +
 ratis-proto/src/main/proto/Raft.proto              |  10 ++
 .../apache/ratis/server/RaftServerConfigKeys.java  |   6 +-
 .../org/apache/ratis/server/RaftServerRpc.java     |   7 ++
 .../protocol/RaftServerAsynchronousProtocol.java   |   5 +
 .../apache/ratis/server/impl/RaftServerImpl.java   |  61 +++++++++--
 .../apache/ratis/server/impl/RaftServerProxy.java  |   9 ++
 .../apache/ratis/server/impl/ServerProtoUtils.java |  15 +++
 .../org/apache/ratis/ReadOnlyRequestTests.java     | 113 ++++++++++++++++++---
 17 files changed, 299 insertions(+), 26 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index 84f9740e2..c238fa4c9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -48,6 +48,14 @@ public interface BlockingApi {
    */
   RaftClientReply sendReadOnly(Message message) throws IOException;
 
+  /**
+   * Send the given readonly message to the given server (not the raft service)
+   * @param message The request message.
+   * @param server The target server
+   * @return the reply.
+   */
+  RaftClientReply sendReadOnly(Message message, RaftPeerId server) throws 
IOException;
+
   /**
    * Send the given stale-read message to the given server (not the raft 
service).
    * If the server commit index is larger than or equal to the given 
min-index, the request will be processed.
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 49bea5731..0c5458168 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -64,6 +64,11 @@ class BlockingImpl implements BlockingApi {
     return send(RaftClientRequest.readRequestType(), message, null);
   }
 
+  @Override
+  public RaftClientReply sendReadOnly(Message message, RaftPeerId server) 
throws IOException {
+    return send(RaftClientRequest.readRequestType(), message, server);
+  }
+
   @Override
   public RaftClientReply sendStaleRead(Message message, long minIndex, 
RaftPeerId server)
       throws IOException {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 1ac825850..4496d2092 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -29,6 +29,7 @@ import 
org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.rpc.CallId;
@@ -47,6 +48,7 @@ import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDe
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERSTEPPINGDOWNEXCEPTION;
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
+import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.READEXCEPTION;
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
 import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.TRANSFERLEADERSHIPEXCEPTION;
 
@@ -303,6 +305,10 @@ public interface ClientProtoUtils {
           .map(ProtoUtils::toThrowableProto)
           .ifPresent(b::setTransferLeadershipException);
 
+      Optional.ofNullable(reply.getReadException())
+          .map(ProtoUtils::toThrowableProto)
+          .ifPresent(b::setReadException);
+
       final RaftClientReplyProto serialized = b.build();
       final RaftException e = reply.getException();
       if (e != null) {
@@ -392,6 +398,8 @@ public interface ClientProtoUtils {
       e = ProtoUtils.toThrowable(replyProto.getLeaderSteppingDownException(), 
LeaderSteppingDownException.class);
     } else if 
(replyProto.getExceptionDetailsCase().equals(TRANSFERLEADERSHIPEXCEPTION)) {
       e = ProtoUtils.toThrowable(replyProto.getTransferLeadershipException(), 
TransferLeadershipException.class);
+    } else if (replyProto.getExceptionDetailsCase().equals(READEXCEPTION)) {
+      e = ProtoUtils.toThrowable(replyProto.getReadException(), 
ReadException.class);
     } else {
       e = null;
     }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index ada67c931..6d29c452c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -25,6 +25,7 @@ import 
org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.util.JavaUtils;
@@ -169,7 +170,7 @@ public class RaftClientReply extends RaftClientMessage {
           AlreadyClosedException.class,
           NotLeaderException.class, NotReplicatedException.class,
           LeaderNotReadyException.class, StateMachineException.class, 
DataStreamException.class,
-          LeaderSteppingDownException.class, 
TransferLeadershipException.class),
+          LeaderSteppingDownException.class, 
TransferLeadershipException.class, ReadException.class),
           () -> "Unexpected exception class: " + this);
     }
   }
@@ -245,6 +246,10 @@ public class RaftClientReply extends RaftClientMessage {
     return JavaUtils.cast(exception, TransferLeadershipException.class);
   }
 
+  public ReadException getReadException() {
+    return JavaUtils.cast(exception, ReadException.class);
+  }
+
   /** @return the exception, if there is any; otherwise, return null. */
   public RaftException getException() {
     return exception;
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
index 32828418d..1b1a20b8d 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
@@ -21,6 +21,10 @@ package org.apache.ratis.protocol.exceptions;
  * This exception indicates the failure of a read request.
  */
 public class ReadException extends RaftException {
+  public ReadException(String message) {
+    super(message);
+  }
+
   public ReadException(Throwable cause) {
     super(cause);
   }
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 6f5d06c2e..a8ee5f30d 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -130,6 +130,11 @@ public class GrpcServerProtocolClient implements Closeable 
{
     return r;
   }
 
+  void readIndex(ReadIndexRequestProto request, 
StreamObserver<ReadIndexReplyProto> s) {
+    asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), 
requestTimeoutDuration.getUnit())
+        .readIndex(request, s);
+  }
+
   StreamObserver<AppendEntriesRequestProto> appendEntries(
       StreamObserver<AppendEntriesReplyProto> responseHandler, boolean 
isHeartbeat) {
     if (isHeartbeat && useSeparateHBChannel) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 5ad3909d5..6f98a6b8a 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -197,6 +197,20 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
     }
   }
 
+  @Override
+  public void readIndex(ReadIndexRequestProto request, 
StreamObserver<ReadIndexReplyProto> responseObserver) {
+    try {
+      server.readIndexAsync(request).thenAccept(reply -> {
+        responseObserver.onNext(reply);
+        responseObserver.onCompleted();
+      });
+    } catch (Throwable e) {
+      GrpcUtil.warn(LOG,
+          () -> getId() + ": Failed readIndex " + 
ProtoUtils.toString(request.getServerRequest()), e);
+      responseObserver.onError(GrpcUtil.wrapException(e));
+    }
+  }
+
   @Override
   public StreamObserver<AppendEntriesRequestProto> appendEntries(
       StreamObserver<AppendEntriesReplyProto> responseObserver) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 40e413915..f21619555 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -27,10 +27,12 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpcWithProxy;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
 import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
@@ -44,6 +46,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
@@ -56,6 +59,41 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
   public static final String GRPC_SEND_SERVER_REQUEST =
       JavaUtils.getClassSimpleName(GrpcService.class) + ".sendRequest";
 
+  class AsyncService implements RaftServerAsynchronousProtocol {
+
+    @Override
+    public CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request)
+        throws IOException {
+      throw new UnsupportedOperationException("This method is not supported");
+    }
+
+    @Override
+    public CompletableFuture<ReadIndexReplyProto> 
readIndexAsync(ReadIndexRequestProto request) throws IOException {
+      CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), null, 
request);
+
+      final CompletableFuture<ReadIndexReplyProto> f = new 
CompletableFuture<>();
+      final StreamObserver<ReadIndexReplyProto> s = new 
StreamObserver<ReadIndexReplyProto>() {
+        @Override
+        public void onNext(ReadIndexReplyProto reply) {
+          f.complete(reply);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+          f.completeExceptionally(throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+        }
+      };
+
+      final RaftPeerId target = 
RaftPeerId.valueOf(request.getServerRequest().getReplyId());
+      getProxies().getProxy(target).readIndex(request, s);
+      return f;
+    }
+  }
+
   public static final class Builder {
     private RaftServer server;
     private GrpcTlsConfig tlsConfig;
@@ -108,6 +146,8 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
   private final Supplier<InetSocketAddress> clientServerAddressSupplier;
   private final Supplier<InetSocketAddress> adminServerAddressSupplier;
 
+  private final AsyncService asyncService = new AsyncService();
+
   private final ExecutorService executor;
   private final GrpcClientProtocolService clientProtocolService;
 
@@ -312,6 +352,11 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
     return adminServerAddressSupplier.get();
   }
 
+  @Override
+  public RaftServerAsynchronousProtocol async() {
+    return asyncService;
+  }
+
   @Override
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request) {
     throw new UnsupportedOperationException(
diff --git a/ratis-proto/src/main/proto/Grpc.proto 
b/ratis-proto/src/main/proto/Grpc.proto
index 06af061e7..edcd863a1 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -45,6 +45,9 @@ service RaftServerProtocolService {
 
   rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
       returns(stream ratis.common.InstallSnapshotReplyProto) {}
+
+  rpc readIndex(ratis.common.ReadIndexRequestProto)
+      returns(stream ratis.common.ReadIndexReplyProto) {}
 }
 
 service AdminProtocolService {
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 9fe2494bf..1fd2a9d1e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -239,6 +239,15 @@ message InstallSnapshotReplyProto {
   }
 }
 
+message ReadIndexRequestProto {
+  RaftRpcRequestProto serverRequest = 1;
+}
+
+message ReadIndexReplyProto {
+  RaftRpcReplyProto serverReply = 1;
+  uint64 readIndex = 2;
+}
+
 message ClientMessageEntryProto {
   bytes content = 1;
 }
@@ -399,6 +408,7 @@ message RaftClientReplyProto {
     ThrowableProto dataStreamException = 8;
     ThrowableProto leaderSteppingDownException = 9;
     ThrowableProto transferLeadershipException = 10;
+    ThrowableProto readException = 11;
   }
 
   uint64 logIndex = 14; // When the request is a write request and the reply 
is success, the log index of the transaction
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 625de2e1a..5ae1020c5 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -182,7 +182,11 @@ public interface RaftServerConfigKeys {
     String OPTION_KEY = ".option";
     Option OPTION_DEFAULT = Option.DEFAULT;
     static Option option(RaftProperties properties) {
-      return get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT, 
getDefaultLog());
+      Option option =  get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT, 
getDefaultLog());
+      if (option != Option.DEFAULT && option != Option.LINEARIZABLE) {
+        throw new IllegalArgumentException("Unexpected read option: " + 
option);
+      }
+      return option;
     }
     static void setOption(RaftProperties properties, Option option) {
       set(properties::setEnum, OPTION_KEY, option);
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index 5c8f99c90..d81f9cc8b 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -21,7 +21,9 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -53,4 +55,9 @@ public interface RaftServerRpc extends RaftServerProtocol, 
RpcType.Get, RaftPeer
   /** The server role changes from leader to a non-leader role. */
   default void notifyNotLeader(RaftGroupId groupId) {
   }
+
+  default RaftServerAsynchronousProtocol async() {
+    throw new UnsupportedOperationException(getClass().getName()
+        + " does not support " + 
JavaUtils.getClassSimpleName(RaftServerAsynchronousProtocol.class));
+  }
 }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
index 2c7e1d5a5..8a904069b 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
@@ -18,6 +18,8 @@
 
 package org.apache.ratis.server.protocol;
 
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 
@@ -28,4 +30,7 @@ public interface RaftServerAsynchronousProtocol {
 
   CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request)
       throws IOException;
+
+  CompletableFuture<ReadIndexReplyProto> readIndexAsync(ReadIndexRequestProto 
request)
+      throws IOException;
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f9094b51d..70eca4301 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -824,6 +824,8 @@ class RaftServerImpl implements RaftServer.Division,
 
     if (request.is(TypeCase.STALEREAD)) {
       replyFuture = staleReadAsync(request);
+    } else if (request.is(TypeCase.READ)) {
+      replyFuture = readAsync(request);
     } else {
       // first check the server's leader state
       CompletableFuture<RaftClientReply> reply = checkLeaderState(request, 
null,
@@ -845,11 +847,7 @@ class RaftServerImpl implements RaftServer.Division,
         }
       }
 
-      if (type.is(TypeCase.READ)) {
-        // TODO: We might not be the leader anymore by the time this completes.
-        // See the RAFT paper section 8 (last part)
-        replyFuture = readAsync(request);
-      } else if (type.is(TypeCase.WATCH)) {
+      if (type.is(TypeCase.WATCH)) {
         replyFuture = watchAsync(request);
       } else if (type.is(TypeCase.MESSAGESTREAM)) {
         replyFuture = streamAsync(request);
@@ -914,6 +912,16 @@ class RaftServerImpl implements RaftServer.Division,
     return getState().getReadRequests();
   }
 
+  private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync() {
+    final ReadIndexRequestProto request = 
ServerProtoUtils.toReadIndexRequestProto(
+        getMemberId(),  getInfo().getLeaderId());
+    try {
+      return getServerRpc().async().readIndexAsync(request);
+    } catch (IOException e) {
+      return JavaUtils.completeExceptionally(e);
+    }
+  }
+
   private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest 
request) {
     if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
       /*
@@ -923,16 +931,31 @@ class RaftServerImpl implements RaftServer.Division,
         3. Finally, query the statemachine and return the result.
        */
       final LeaderStateImpl leader = role.getLeaderState().orElse(null);
-      // TODO support follower linearizable read
-      if (leader == null) {
-        return JavaUtils.completeExceptionally(generateNotLeaderException());
+
+      final CompletableFuture<Long> replyFuture;
+      if (leader != null) {
+        replyFuture = leader.getReadIndex();
+      } else {
+        replyFuture = sendReadIndexAsync().thenApply(reply -> {
+          if (reply.getServerReply().getSuccess()) {
+            return reply.getReadIndex();
+          } else {
+            throw new CompletionException(new ReadException(getId() +
+                ": Failed to get read index from the leader: " + reply));
+          }
+        });
       }
-      return leader.getReadIndex()
+
+      return replyFuture
           .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
           .thenCompose(readIndex -> queryStateMachine(request))
           .exceptionally(e -> readException2Reply(request, e));
     } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
-      return queryStateMachine(request);
+       CompletableFuture<RaftClientReply> reply = checkLeaderState(request, 
null, false);
+       if (reply != null) {
+         return reply;
+       }
+       return queryStateMachine(request);
     } else {
       throw new IllegalStateException("Unexpected read option: " + readOption);
     }
@@ -1354,6 +1377,24 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
+  @Override
+  public CompletableFuture<ReadIndexReplyProto> 
readIndexAsync(ReadIndexRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+
+    final RaftPeerId peerId = 
RaftPeerId.valueOf(request.getServerRequest().getRequestorId());
+
+    final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+    if (leader == null) {
+      return CompletableFuture.completedFuture(
+          ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false, 
INVALID_LOG_INDEX));
+    }
+
+    return leader.getReadIndex()
+        .thenApply(index -> ServerProtoUtils.toReadIndexReplyProto(peerId, 
getMemberId(), true, index))
+        .exceptionally(throwable ->
+            ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), 
false, INVALID_LOG_INDEX));
+  }
+
   static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
     if (isHeartbeat) {
       if (LOG.isTraceEnabled()) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0825e7d12..cd20c8cd4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -21,6 +21,8 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
@@ -626,6 +628,13 @@ class RaftServerProxy implements RaftServer {
         .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> 
impl.appendEntriesAsync(request)));
   }
 
+  @Override
+  public CompletableFuture<ReadIndexReplyProto> 
readIndexAsync(ReadIndexRequestProto request) throws IOException {
+    final RaftGroupId groupId = 
ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
+    return getImplFuture(groupId)
+        .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> 
impl.readIndexAsync(request)));
+  }
+
   @Override
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request) throws IOException {
     return getImpl(request.getServerRequest()).appendEntries(request);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index deae754c3..108b6c939 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -109,6 +109,21 @@ final class ServerProtoUtils {
     return builder.build();
   }
 
+  static ReadIndexRequestProto toReadIndexRequestProto(
+      RaftGroupMemberId requestorId, RaftPeerId replyId) {
+    return ReadIndexRequestProto.newBuilder()
+        
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, 
replyId))
+        .build();
+  }
+
+  static ReadIndexReplyProto toReadIndexReplyProto(
+      RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long 
index) {
+    return ReadIndexReplyProto.newBuilder()
+        .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, 
success))
+        .setReadIndex(index)
+        .build();
+  }
+
   @SuppressWarnings("parameternumber")
   static AppendEntriesReplyProto toAppendEntriesReplyProto(
       RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 5c7383000..f611ac4d1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -24,6 +24,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ReadException;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -37,7 +38,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -163,25 +167,106 @@ public abstract class ReadOnlyRequestTests<CLUSTER 
extends MiniRaftCluster>
       RaftTestUtil.waitForLeader(cluster);
       final RaftPeerId leaderId = cluster.getLeader().getId();
 
-      try (final RaftClient client = cluster.createClient(leaderId)) {
+      try (final RaftClient client = cluster.createClient(leaderId);
+           final RaftClient noRetry = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
 
-        Semaphore canRead = new Semaphore(0);
+        CompletableFuture<RaftClientReply> result = 
client.async().send(incrementMessage);
+        client.admin().transferLeadership(null, 200);
 
-        Thread thread = new Thread(() -> {
-          try (RaftClient noRetryClient = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
-            canRead.acquire();
-            // we still have to sleep for a while to guarantee that the async 
write arrives at RaftServer
-            Thread.sleep(100);
-            RaftClientReply timeoutReply = 
noRetryClient.io().sendReadOnly(queryMessage);
-            Assert.assertNotNull(timeoutReply.getException());
-          } catch (Exception ignored) {}
+        Assert.assertThrows(ReadException.class, () -> {
+          RaftClientReply timeoutReply = 
noRetry.io().sendReadOnly(queryMessage);
+          Assert.assertNotNull(timeoutReply.getException());
+          Assert.assertTrue(timeoutReply.getException() instanceof 
ReadException);
         });
+      }
 
-        thread.start();
-        CompletableFuture<RaftClientReply> result = 
client.async().send(timeoutIncrement);
-        canRead.release();
+    } finally {
+      cluster.shutdown();
+    }
+  }
 
-        thread.join();
+  @Test
+  public void testFollowerLinearizableRead() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadImpl);
+  }
+
+  private void testFollowerLinearizableReadImpl(CLUSTER cluster) throws 
Exception {
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+
+      List<RaftServer.Division> followers = cluster.getFollowers();
+      Assert.assertEquals(2, followers.size());
+
+      try (RaftClient leaderClient = 
cluster.createClient(cluster.getLeader().getId());
+           RaftClient followerClient1 = 
cluster.createClient(followers.get(0).getId());
+           RaftClient followerClient2 = 
cluster.createClient(followers.get(1).getId());) {
+        for (int i = 1; i <= 10; i++) {
+          RaftClientReply reply = leaderClient.io().send(incrementMessage);
+          Assert.assertTrue(reply.isSuccess());
+          RaftClientReply read1 = 
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
+          Assert.assertEquals(retrieve(read1), i);
+          RaftClientReply read2 = 
followerClient2.io().sendReadOnly(queryMessage, followers.get(1).getId());
+          Assert.assertEquals(retrieve(read2), i);
+        }
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFollowerLinearizableReadParallel() throws Exception {
+    runWithNewCluster(NUM_SERVERS, 
this::testFollowerLinearizableReadParallelImpl);
+  }
+
+  private void testFollowerLinearizableReadParallelImpl(CLUSTER cluster) 
throws Exception {
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+
+      List<RaftServer.Division> followers = cluster.getFollowers();
+      Assert.assertEquals(2, followers.size());
+
+      try (RaftClient leaderClient = 
cluster.createClient(cluster.getLeader().getId());
+           RaftClient followerClient1 = 
cluster.createClient(followers.get(0).getId())) {
+
+        leaderClient.io().send(incrementMessage);
+        leaderClient.async().send(waitAndIncrementMessage);
+
+        RaftClientReply clientReply = 
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
+        Assert.assertEquals(2, retrieve(clientReply));
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFollowerLinearizableReadFailWhenLeaderDown() throws 
Exception {
+    runWithNewCluster(NUM_SERVERS, 
this::testFollowerLinearizableReadFailWhenLeaderDownImpl);
+  }
+
+  private void testFollowerLinearizableReadFailWhenLeaderDownImpl(CLUSTER 
cluster) throws Exception {
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+
+      List<RaftServer.Division> followers = cluster.getFollowers();
+      Assert.assertEquals(2, followers.size());
+
+      try (RaftClient leaderClient = 
cluster.createClient(cluster.getLeader().getId());
+           RaftClient followerClient1 = 
cluster.createClient(followers.get(0).getId(), RetryPolicies.noRetry())) {
+         leaderClient.io().send(incrementMessage);
+
+         RaftClientReply clientReply = 
followerClient1.io().sendReadOnly(queryMessage);
+         Assert.assertEquals(1, retrieve(clientReply));
+
+         // kill the leader
+         // read timeout quicker than election timeout
+         leaderClient.admin().transferLeadership(null, 200);
+
+         Assert.assertThrows(ReadException.class, () -> {
+           followerClient1.io().sendReadOnly(queryMessage, 
followers.get(0).getId());
+         });
       }
 
     } finally {

Reply via email to