This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 256b16bac0a2c159fd81bd6165118ca72c20f75c Author: hao guo <[email protected]> AuthorDate: Thu Jul 14 23:50:32 2022 +0800 RATIS-1618. Resolve the stream client concurrent reconnection problem (#676) --- .../ratis/netty/client/NettyClientStreamRpc.java | 23 ++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index dce9b63b..37f09b8d 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -164,10 +164,11 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { return null; //closed } final Channel channel = future.syncUninterruptibly().channel(); - if (channel.isOpen()) { + if (channel.isActive()) { return channel; } - return reconnect().syncUninterruptibly().channel(); + ChannelFuture f = reconnect(); + return f == null ? null : f.syncUninterruptibly().channel(); } private EventLoopGroup getWorkerGroup() { @@ -201,7 +202,16 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), RECONNECT.getUnit()); } - private ChannelFuture reconnect() { + private synchronized ChannelFuture reconnect() { + // concurrent reconnect double check + ChannelFuture channelFuture = ref.get(); + if (channelFuture != null) { + Channel channel = channelFuture.syncUninterruptibly().channel(); + if (channel.isActive()) { + return channelFuture; + } + } + final MemoizedSupplier<ChannelFuture> supplier = MemoizedSupplier.valueOf(this::connect); final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? null: supplier.get()); if (previous != null) { @@ -213,9 +223,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { void close() { final ChannelFuture previous = ref.getAndSet(null); if (previous != null) { - previous.channel().close(); + // wait channel closed, do shutdown workerGroup + previous.channel().close().addListener((future) -> workerGroup.shutdownGracefully()); } - workerGroup.shutdownGracefully(); } boolean isClosed() { @@ -293,8 +303,6 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { .map(replies::remove) .orElse(ReplyQueue.EMPTY) .forEach(f -> f.completeExceptionally(cause)); - - LOG.warn(name + ": exceptionCaught", cause); ctx.close(); } @@ -303,7 +311,6 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { if (!connection.isClosed()) { connection.scheduleReconnect("channel is inactive", null); } - super.channelInactive(ctx); } }; }
