This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 12d80f534 RATIS-1834. ServerRequestStreamObserver is not properly
closed. (#876)
12d80f534 is described below
commit 12d80f534e02299cb59cc7b25b9e24e55fb97318
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Apr 26 08:13:57 2023 -0700
RATIS-1834. ServerRequestStreamObserver is not properly closed. (#876)
---
.../grpc/server/GrpcServerProtocolService.java | 21 +++++++++++++++++----
1 file changed, 17 insertions(+), 4 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 08386f83c..398f2bc96 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -61,7 +61,11 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements
StreamObserver<REQUEST> {
private final RaftServer.Op op;
private final StreamObserver<REPLY> responseObserver;
+ /** For ordered {@link #onNext(Object)} requests. */
private final AtomicReference<PendingServerRequest<REQUEST>>
previousOnNext = new AtomicReference<>();
+ /** For both ordered and unordered {@link #onNext(Object)} requests. */
+ private final AtomicReference<CompletableFuture<REPLY>> requestFuture
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
private final AtomicBoolean isClosed = new AtomicBoolean(false);
ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY>
responseObserver) {
@@ -97,20 +101,25 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
}
}
- private synchronized void handleReply(REPLY reply) {
+ private synchronized REPLY handleReply(REPLY reply) {
if (!isClosed.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: reply {}", getId(), replyToString(reply));
}
responseObserver.onNext(reply);
}
+ return reply;
+ }
+
+ void composeRequest(CompletableFuture<REPLY> current) {
+ requestFuture.updateAndGet(previous -> previous.thenCompose(reply ->
current));
}
@Override
public void onNext(REQUEST request) {
if (!replyInOrder(request)) {
try {
- process(request).thenAccept(this::handleReply);
+ composeRequest(process(request).thenApply(this::handleReply));
} catch (Exception e) {
handleError(e, request);
}
@@ -123,7 +132,7 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
.map(PendingServerRequest::getFuture)
.orElse(CompletableFuture.completedFuture(null));
try {
- process(request).exceptionally(e -> {
+ final CompletableFuture<REPLY> f = process(request).exceptionally(e ->
{
// Handle cases, such as RaftServer is paused
handleError(e, request);
current.getFuture().completeExceptionally(e);
@@ -133,6 +142,7 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
current.getFuture().complete(null);
return null;
});
+ composeRequest(f);
} catch (Exception e) {
handleError(e, request);
current.getFuture().completeExceptionally(e);
@@ -143,7 +153,10 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
public void onCompleted() {
if (isClosed.compareAndSet(false, true)) {
LOG.info("{}: Completed {}, lastRequest: {}", getId(), op,
getPreviousRequestString());
- responseObserver.onCompleted();
+ requestFuture.get().thenAccept(reply -> {
+ LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
+ responseObserver.onCompleted();
+ });
}
}
@Override