This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 979b38a7a RATIS-1888. Handle exception of readIndexAsync in gRPC
readIndex impl (#920)
979b38a7a is described below
commit 979b38a7ab09493689bf167e04e521ed18d76d0e
Author: tison <[email protected]>
AuthorDate: Sat Sep 16 00:55:00 2023 +0800
RATIS-1888. Handle exception of readIndexAsync in gRPC readIndex impl (#920)
---
.../src/main/java/org/apache/ratis/grpc/GrpcUtil.java | 18 ++++++++++++++----
.../ratis/grpc/server/GrpcServerProtocolService.java | 15 +++++----------
2 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index ee1c28dd3..22653b6ef 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.grpc;
-import org.apache.ratis.protocol.RaftClientReply;
+import java.util.function.Consumer;
import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.security.TlsConf.TrustManagerConf;
@@ -47,7 +47,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
public interface GrpcUtil {
- static final Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
+ Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
Metadata.Key<String> EXCEPTION_TYPE_KEY =
Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
@@ -163,13 +163,22 @@ public interface GrpcUtil {
return e;
}
- static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall(
+ static <REPLY, REPLY_PROTO> void asyncCall(
StreamObserver<REPLY_PROTO> responseObserver,
CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
Function<REPLY, REPLY_PROTO> toProto) {
+ asyncCall(responseObserver, supplier, toProto, throwable -> {});
+ }
+
+ static <REPLY, REPLY_PROTO> void asyncCall(
+ StreamObserver<REPLY_PROTO> responseObserver,
+ CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
+ Function<REPLY, REPLY_PROTO> toProto,
+ Consumer<Throwable> warning) {
try {
- supplier.get().whenCompleteAsync((reply, exception) -> {
+ supplier.get().whenComplete((reply, exception) -> {
if (exception != null) {
+ warning.accept(exception);
responseObserver.onError(GrpcUtil.wrapException(exception));
} else {
responseObserver.onNext(toProto.apply(reply));
@@ -177,6 +186,7 @@ public interface GrpcUtil {
}
});
} catch (Exception e) {
+ warning.accept(e);
responseObserver.onError(GrpcUtil.wrapException(e));
}
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 398f2bc96..766e14321 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.grpc.server;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
@@ -212,16 +214,9 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
@Override
public void readIndex(ReadIndexRequestProto request,
StreamObserver<ReadIndexReplyProto> responseObserver) {
- try {
- server.readIndexAsync(request).thenAccept(reply -> {
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- });
- } catch (Throwable e) {
- GrpcUtil.warn(LOG,
- () -> getId() + ": Failed readIndex " +
ProtoUtils.toString(request.getServerRequest()), e);
- responseObserver.onError(GrpcUtil.wrapException(e));
- }
+ final Consumer<Throwable> warning = e -> GrpcUtil.warn(LOG,
+ () -> getId() + ": Failed readIndex " +
ProtoUtils.toString(request.getServerRequest()), e);
+ GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request),
Function.identity(), warning);
}
@Override