This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new be9a984  RATIS-569. StatusRuntimeException because Ratis clients do 
not shutdown the observer cleanly. Contributed by Siddharth Wagle.
be9a984 is described below

commit be9a98457c07bd729e5598aa61632f24a7372c54
Author: Mukul Kumar Singh <[email protected]>
AuthorDate: Sat Sep 14 10:20:30 2019 +0530

    RATIS-569. StatusRuntimeException because Ratis clients do not shutdown the 
observer cleanly. Contributed by Siddharth Wagle.
---
 .../apache/ratis/grpc/client/GrpcClientProtocolClient.java    |  8 +++++++-
 .../apache/ratis/grpc/server/GrpcServerProtocolClient.java    | 11 ++++++++++-
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 8fe3ebf..45cfeed 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -66,6 +66,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -140,7 +141,12 @@ public class GrpcClientProtocolClient implements Closeable 
{
     
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     scheduler.close();
-    channel.shutdownNow();
+    channel.shutdown();
+    try {
+      channel.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception while waiting for channel termination", 
e);
+    }
   }
 
   RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws 
IOException {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 6eb6c38..8ca32e1 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -30,8 +30,11 @@ import 
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProto
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This is a RaftClient implementation that supports streaming data to the raft
@@ -42,6 +45,7 @@ public class GrpcServerProtocolClient implements Closeable {
   private final TimeDuration requestTimeoutDuration;
   private final RaftServerProtocolServiceBlockingStub blockingStub;
   private final RaftServerProtocolServiceStub asyncStub;
+  private static final Logger LOG = 
LoggerFactory.getLogger(GrpcServerProtocolClient.class);
 
   public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
       TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) {
@@ -80,7 +84,12 @@ public class GrpcServerProtocolClient implements Closeable {
 
   @Override
   public void close() {
-    channel.shutdownNow();
+    channel.shutdown();
+    try {
+      channel.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception while waiting for channel termination", 
e);
+    }
   }
 
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {

Reply via email to