This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 979b38a7a RATIS-1888. Handle exception of readIndexAsync in gRPC 
readIndex impl (#920)
979b38a7a is described below

commit 979b38a7ab09493689bf167e04e521ed18d76d0e
Author: tison <[email protected]>
AuthorDate: Sat Sep 16 00:55:00 2023 +0800

    RATIS-1888. Handle exception of readIndexAsync in gRPC readIndex impl (#920)
---
 .../src/main/java/org/apache/ratis/grpc/GrpcUtil.java  | 18 ++++++++++++++----
 .../ratis/grpc/server/GrpcServerProtocolService.java   | 15 +++++----------
 2 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index ee1c28dd3..22653b6ef 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.grpc;
 
-import org.apache.ratis.protocol.RaftClientReply;
+import java.util.function.Consumer;
 import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.security.TlsConf.TrustManagerConf;
@@ -47,7 +47,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 public interface GrpcUtil {
-  static final Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
+  Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
 
   Metadata.Key<String> EXCEPTION_TYPE_KEY =
       Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
@@ -163,13 +163,22 @@ public interface GrpcUtil {
     return e;
   }
 
-  static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall(
+  static <REPLY, REPLY_PROTO> void asyncCall(
       StreamObserver<REPLY_PROTO> responseObserver,
       CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
       Function<REPLY, REPLY_PROTO> toProto) {
+    asyncCall(responseObserver, supplier, toProto, throwable -> {});
+  }
+
+  static <REPLY, REPLY_PROTO> void asyncCall(
+          StreamObserver<REPLY_PROTO> responseObserver,
+          CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
+          Function<REPLY, REPLY_PROTO> toProto,
+          Consumer<Throwable> warning) {
     try {
-      supplier.get().whenCompleteAsync((reply, exception) -> {
+      supplier.get().whenComplete((reply, exception) -> {
         if (exception != null) {
+          warning.accept(exception);
           responseObserver.onError(GrpcUtil.wrapException(exception));
         } else {
           responseObserver.onNext(toProto.apply(reply));
@@ -177,6 +186,7 @@ public interface GrpcUtil {
         }
       });
     } catch (Exception e) {
+      warning.accept(e);
       responseObserver.onError(GrpcUtil.wrapException(e));
     }
   }
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 398f2bc96..766e14321 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.grpc.server;
 
+import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
@@ -212,16 +214,9 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
 
   @Override
   public void readIndex(ReadIndexRequestProto request, 
StreamObserver<ReadIndexReplyProto> responseObserver) {
-    try {
-      server.readIndexAsync(request).thenAccept(reply -> {
-        responseObserver.onNext(reply);
-        responseObserver.onCompleted();
-      });
-    } catch (Throwable e) {
-      GrpcUtil.warn(LOG,
-          () -> getId() + ": Failed readIndex " + 
ProtoUtils.toString(request.getServerRequest()), e);
-      responseObserver.onError(GrpcUtil.wrapException(e));
-    }
+    final Consumer<Throwable> warning = e -> GrpcUtil.warn(LOG,
+            () -> getId() + ": Failed readIndex " + 
ProtoUtils.toString(request.getServerRequest()), e);
+    GrpcUtil.asyncCall(responseObserver, () -> server.readIndexAsync(request), 
Function.identity(), warning);
   }
 
   @Override

Reply via email to