This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 256b16bac0a2c159fd81bd6165118ca72c20f75c
Author: hao guo <[email protected]>
AuthorDate: Thu Jul 14 23:50:32 2022 +0800

    RATIS-1618. Resolve the stream client concurrent reconnection problem (#676)
---
 .../ratis/netty/client/NettyClientStreamRpc.java   | 23 ++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index dce9b63b..37f09b8d 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -164,10 +164,11 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         return null; //closed
       }
       final Channel channel = future.syncUninterruptibly().channel();
-      if (channel.isOpen()) {
+      if (channel.isActive()) {
         return channel;
       }
-      return reconnect().syncUninterruptibly().channel();
+      ChannelFuture f = reconnect();
+      return f == null ? null : f.syncUninterruptibly().channel();
     }
 
     private EventLoopGroup getWorkerGroup() {
@@ -201,7 +202,16 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
       getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), 
RECONNECT.getUnit());
     }
 
-    private ChannelFuture reconnect() {
+    private synchronized ChannelFuture reconnect() {
+      // concurrent reconnect double check
+      ChannelFuture channelFuture = ref.get();
+      if (channelFuture != null) {
+        Channel channel = channelFuture.syncUninterruptibly().channel();
+        if (channel.isActive()) {
+          return channelFuture;
+        }
+      }
+
       final MemoizedSupplier<ChannelFuture> supplier = 
MemoizedSupplier.valueOf(this::connect);
       final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? 
null: supplier.get());
       if (previous != null) {
@@ -213,9 +223,9 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     void close() {
       final ChannelFuture previous = ref.getAndSet(null);
       if (previous != null) {
-        previous.channel().close();
+        // wait channel closed, do shutdown workerGroup
+        previous.channel().close().addListener((future) -> 
workerGroup.shutdownGracefully());
       }
-      workerGroup.shutdownGracefully();
     }
 
     boolean isClosed() {
@@ -293,8 +303,6 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
             .map(replies::remove)
             .orElse(ReplyQueue.EMPTY)
             .forEach(f -> f.completeExceptionally(cause));
-
-        LOG.warn(name + ": exceptionCaught", cause);
         ctx.close();
       }
 
@@ -303,7 +311,6 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         if (!connection.isClosed()) {
           connection.scheduleReconnect("channel is inactive", null);
         }
-        super.channelInactive(ctx);
       }
     };
   }

Reply via email to