This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new be9a984 RATIS-569. StatusRuntimeException because Ratis clients do
not shutdown the observer cleanly. Contributed by Siddharth Wagle.
be9a984 is described below
commit be9a98457c07bd729e5598aa61632f24a7372c54
Author: Mukul Kumar Singh <[email protected]>
AuthorDate: Sat Sep 14 10:20:30 2019 +0530
RATIS-569. StatusRuntimeException because Ratis clients do not shutdown the
observer cleanly. Contributed by Siddharth Wagle.
---
.../apache/ratis/grpc/client/GrpcClientProtocolClient.java | 8 +++++++-
.../apache/ratis/grpc/server/GrpcServerProtocolClient.java | 11 ++++++++++-
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 8fe3ebf..45cfeed 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -66,6 +66,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -140,7 +141,12 @@ public class GrpcClientProtocolClient implements Closeable
{
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
scheduler.close();
- channel.shutdownNow();
+ channel.shutdown();
+ try {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while waiting for channel termination",
e);
+ }
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws
IOException {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 6eb6c38..8ca32e1 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -30,8 +30,11 @@ import
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProto
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
/**
* This is a RaftClient implementation that supports streaming data to the raft
@@ -42,6 +45,7 @@ public class GrpcServerProtocolClient implements Closeable {
private final TimeDuration requestTimeoutDuration;
private final RaftServerProtocolServiceBlockingStub blockingStub;
private final RaftServerProtocolServiceStub asyncStub;
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcServerProtocolClient.class);
public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) {
@@ -80,7 +84,12 @@ public class GrpcServerProtocolClient implements Closeable {
@Override
public void close() {
- channel.shutdownNow();
+ channel.shutdown();
+ try {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while waiting for channel termination",
e);
+ }
}
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {