Repository: incubator-ratis Updated Branches: refs/heads/master 1f4b5727b -> 8cac9d54f
RATIS-189. AppendRequestStreamObserver close can be called twice. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8cac9d54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8cac9d54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8cac9d54 Branch: refs/heads/master Commit: 8cac9d54f1e3e713afe41c170da59a41efd81334 Parents: 1f4b572 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Fri Jan 12 09:16:54 2018 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Fri Jan 12 09:16:54 2018 +0800 ---------------------------------------------------------------------- .../ratis/grpc/client/RaftClientProtocolService.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8cac9d54/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java index 6d19920..1f604d8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -111,10 +112,12 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase private final StreamObserver<RaftClientReplyProto> responseObserver; private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow = new SlidingWindow.Server<>(name, COMPLETED); + private final AtomicBoolean isClosed; AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) { LOG.debug("new AppendRequestStreamObserver {}", name); this.responseObserver = ro; + this.isClosed = new AtomicBoolean(false); } void processClientRequestAsync(PendingAppend pending) { @@ -171,9 +174,11 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase } private void close() { - LOG.debug("{}: close", name); - responseObserver.onCompleted(); - slidingWindow.close(); + if (isClosed.compareAndSet(false, true)) { + LOG.debug("{}: close", name); + responseObserver.onCompleted(); + slidingWindow.close(); + } } void responseError(Throwable t, Supplier<String> message) {
