szetszwo commented on code in PR #738:
URL: https://github.com/apache/ratis/pull/738#discussion_r966798672
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java:
##########
@@ -342,4 +342,12 @@ public StartLeaderElectionReplyProto
startLeaderElection(StartLeaderElectionRequ
return getProxies().getProxy(target).startLeaderElection(request);
}
+ @Override
+ public ReadIndexReplyProto readIndex(ReadIndexRequestProto request) throws
IOException {
Review Comment:
Implements `readIndexAsync` instead.
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java:
##########
@@ -130,6 +130,13 @@ public StartLeaderElectionReplyProto
startLeaderElection(StartLeaderElectionRequ
return r;
}
+ public ReadIndexReplyProto readIndex(ReadIndexRequestProto request) {
+ ReadIndexReplyProto r =
+ blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(),
requestTimeoutDuration.getUnit())
Review Comment:
It should use asyncStub
##########
ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java:
##########
@@ -38,4 +40,6 @@ enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}
InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto
request) throws IOException;
StartLeaderElectionReplyProto
startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
+
+ ReadIndexReplyProto readIndex(ReadIndexRequestProto request) throws
IOException;
Review Comment:
Let's add only the async method. This sync'ed method is going to cause
problem. Let's don't add it. We may skip Netty and SimulatedRPC for the
moment, i.e. they won't support non-leader readIndex for now.
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java:
##########
@@ -197,6 +197,19 @@ public void
startLeaderElection(StartLeaderElectionRequestProto request,
}
}
+ @Override
+ public void readIndex(ReadIndexRequestProto request,
StreamObserver<ReadIndexReplyProto> responseObserver) {
+ try {
+ final ReadIndexReplyProto reply = server.readIndex(request);
Review Comment:
use `readIndexAsync`.
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1638,6 +1638,32 @@ public StartLeaderElectionReplyProto
startLeaderElection(StartLeaderElectionRequ
}
}
+ @Override
+ public ReadIndexReplyProto readIndex(ReadIndexRequestProto request) throws
IOException {
+ try {
+ return readIndexAsync(request).join();
+ } catch (CompletionException e) {
+ throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e));
+ }
+ }
+
+ @Override
+ public CompletableFuture<ReadIndexReplyProto>
readIndexAsync(ReadIndexRequestProto request) throws IOException {
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+
+ RaftPeerId followerId =
RaftPeerId.valueOf(request.getServerRequest().getRequestorId());
Review Comment:
Let's call it `peerId`. It could be a Listener.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]