This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 12d80f534 RATIS-1834. ServerRequestStreamObserver is not properly 
closed. (#876)
12d80f534 is described below

commit 12d80f534e02299cb59cc7b25b9e24e55fb97318
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Apr 26 08:13:57 2023 -0700

    RATIS-1834. ServerRequestStreamObserver is not properly closed. (#876)
---
 .../grpc/server/GrpcServerProtocolService.java      | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 08386f83c..398f2bc96 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -61,7 +61,11 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
   abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements 
StreamObserver<REQUEST> {
     private final RaftServer.Op op;
     private final StreamObserver<REPLY> responseObserver;
+    /** For ordered {@link #onNext(Object)} requests. */
     private final AtomicReference<PendingServerRequest<REQUEST>> 
previousOnNext = new AtomicReference<>();
+    /** For both ordered and unordered {@link #onNext(Object)} requests. */
+    private final AtomicReference<CompletableFuture<REPLY>> requestFuture
+        = new AtomicReference<>(CompletableFuture.completedFuture(null));
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
     ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY> 
responseObserver) {
@@ -97,20 +101,25 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
       }
     }
 
-    private synchronized void handleReply(REPLY reply) {
+    private synchronized REPLY handleReply(REPLY reply) {
       if (!isClosed.get()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: reply {}", getId(), replyToString(reply));
         }
         responseObserver.onNext(reply);
       }
+      return reply;
+    }
+
+    void composeRequest(CompletableFuture<REPLY> current) {
+      requestFuture.updateAndGet(previous -> previous.thenCompose(reply -> 
current));
     }
 
     @Override
     public void onNext(REQUEST request) {
       if (!replyInOrder(request)) {
         try {
-          process(request).thenAccept(this::handleReply);
+          composeRequest(process(request).thenApply(this::handleReply));
         } catch (Exception e) {
           handleError(e, request);
         }
@@ -123,7 +132,7 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
           .map(PendingServerRequest::getFuture)
           .orElse(CompletableFuture.completedFuture(null));
       try {
-        process(request).exceptionally(e -> {
+        final CompletableFuture<REPLY> f = process(request).exceptionally(e -> 
{
           // Handle cases, such as RaftServer is paused
           handleError(e, request);
           current.getFuture().completeExceptionally(e);
@@ -133,6 +142,7 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
           current.getFuture().complete(null);
           return null;
         });
+        composeRequest(f);
       } catch (Exception e) {
         handleError(e, request);
         current.getFuture().completeExceptionally(e);
@@ -143,7 +153,10 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
     public void onCompleted() {
       if (isClosed.compareAndSet(false, true)) {
         LOG.info("{}: Completed {}, lastRequest: {}", getId(), op, 
getPreviousRequestString());
-        responseObserver.onCompleted();
+        requestFuture.get().thenAccept(reply -> {
+          LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
+          responseObserver.onCompleted();
+        });
       }
     }
     @Override

Reply via email to