This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3955dfed RATIS-1618. Resolve the stream client concurrent reconnection
problem (#676)
3955dfed is described below
commit 3955dfedb085f0f71f7c0b039bba51f4772be23d
Author: hao guo <[email protected]>
AuthorDate: Thu Jul 14 23:50:32 2022 +0800
RATIS-1618. Resolve the stream client concurrent reconnection problem (#676)
---
.../ratis/netty/client/NettyClientStreamRpc.java | 23 ++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index dce9b63b..37f09b8d 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -164,10 +164,11 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
return null; //closed
}
final Channel channel = future.syncUninterruptibly().channel();
- if (channel.isOpen()) {
+ if (channel.isActive()) {
return channel;
}
- return reconnect().syncUninterruptibly().channel();
+ ChannelFuture f = reconnect();
+ return f == null ? null : f.syncUninterruptibly().channel();
}
private EventLoopGroup getWorkerGroup() {
@@ -201,7 +202,16 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(),
RECONNECT.getUnit());
}
- private ChannelFuture reconnect() {
+ private synchronized ChannelFuture reconnect() {
+ // concurrent reconnect double check
+ ChannelFuture channelFuture = ref.get();
+ if (channelFuture != null) {
+ Channel channel = channelFuture.syncUninterruptibly().channel();
+ if (channel.isActive()) {
+ return channelFuture;
+ }
+ }
+
final MemoizedSupplier<ChannelFuture> supplier =
MemoizedSupplier.valueOf(this::connect);
final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null?
null: supplier.get());
if (previous != null) {
@@ -213,9 +223,9 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
void close() {
final ChannelFuture previous = ref.getAndSet(null);
if (previous != null) {
- previous.channel().close();
+ // wait channel closed, do shutdown workerGroup
+ previous.channel().close().addListener((future) ->
workerGroup.shutdownGracefully());
}
- workerGroup.shutdownGracefully();
}
boolean isClosed() {
@@ -293,8 +303,6 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
.map(replies::remove)
.orElse(ReplyQueue.EMPTY)
.forEach(f -> f.completeExceptionally(cause));
-
- LOG.warn(name + ": exceptionCaught", cause);
ctx.close();
}
@@ -303,7 +311,6 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
if (!connection.isClosed()) {
connection.scheduleReconnect("channel is inactive", null);
}
- super.channelInactive(ctx);
}
};
}