This is an automated email from the ASF dual-hosted git repository. adoroszlai pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 262e8cb5fccf63e2a5b9e07fe2fe66d3c9ff0dcb Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Apr 26 08:13:57 2023 -0700 RATIS-1834. ServerRequestStreamObserver is not properly closed. (#876) (cherry picked from commit 12d80f534e02299cb59cc7b25b9e24e55fb97318) --- .../grpc/server/GrpcServerProtocolService.java | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 08386f83c..398f2bc96 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -61,7 +61,11 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> { private final RaftServer.Op op; private final StreamObserver<REPLY> responseObserver; + /** For ordered {@link #onNext(Object)} requests. */ private final AtomicReference<PendingServerRequest<REQUEST>> previousOnNext = new AtomicReference<>(); + /** For both ordered and unordered {@link #onNext(Object)} requests. */ + private final AtomicReference<CompletableFuture<REPLY>> requestFuture + = new AtomicReference<>(CompletableFuture.completedFuture(null)); private final AtomicBoolean isClosed = new AtomicBoolean(false); ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY> responseObserver) { @@ -97,20 +101,25 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { } } - private synchronized void handleReply(REPLY reply) { + private synchronized REPLY handleReply(REPLY reply) { if (!isClosed.get()) { if (LOG.isDebugEnabled()) { LOG.debug("{}: reply {}", getId(), replyToString(reply)); } responseObserver.onNext(reply); } + return reply; + } + + void composeRequest(CompletableFuture<REPLY> current) { + requestFuture.updateAndGet(previous -> previous.thenCompose(reply -> current)); } @Override public void onNext(REQUEST request) { if (!replyInOrder(request)) { try { - process(request).thenAccept(this::handleReply); + composeRequest(process(request).thenApply(this::handleReply)); } catch (Exception e) { handleError(e, request); } @@ -123,7 +132,7 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { .map(PendingServerRequest::getFuture) .orElse(CompletableFuture.completedFuture(null)); try { - process(request).exceptionally(e -> { + final CompletableFuture<REPLY> f = process(request).exceptionally(e -> { // Handle cases, such as RaftServer is paused handleError(e, request); current.getFuture().completeExceptionally(e); @@ -133,6 +142,7 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { current.getFuture().complete(null); return null; }); + composeRequest(f); } catch (Exception e) { handleError(e, request); current.getFuture().completeExceptionally(e); @@ -143,7 +153,10 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { public void onCompleted() { if (isClosed.compareAndSet(false, true)) { LOG.info("{}: Completed {}, lastRequest: {}", getId(), op, getPreviousRequestString()); - responseObserver.onCompleted(); + requestFuture.get().thenAccept(reply -> { + LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply); + responseObserver.onCompleted(); + }); } } @Override
