This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 26582bf RATIS-1549. If the stream client is abnormally disconnected,
reconnect. (#620)
26582bf is described below
commit 26582bf4de6ce9cd2e61e69a1c235298f4635fe7
Author: hao guo <[email protected]>
AuthorDate: Mon Mar 14 23:02:13 2022 +0800
RATIS-1549. If the stream client is abnormally disconnected, reconnect.
(#620)
---
.../ratis/netty/client/NettyClientStreamRpc.java | 163 +++++++++++++++++----
1 file changed, 137 insertions(+), 26 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 57b44c2..b2714c7 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -29,23 +29,34 @@ import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.security.TlsConf;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.*;
-import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.Channel;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
+import
org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -54,6 +65,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -64,7 +76,9 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP =
new AtomicReference<>();
static EventLoopGroup newWorkerGroup(RaftProperties properties) {
- return new
NioEventLoopGroup(NettyConfigKeys.DataStream.clientWorkerGroupSize(properties));
+ return NettyUtils.newEventLoopGroup(
+ JavaUtils.getClassSimpleName(NettyClientStreamRpc.class) +
"-workerGroup",
+ NettyConfigKeys.DataStream.clientWorkerGroupSize(properties), false);
}
private final EventLoopGroup workerGroup;
@@ -125,30 +139,109 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
}
+ static class Connection {
+ static final TimeDuration RECONNECT = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
+
+ private final String address;
+ private final WorkerGroupGetter workerGroup;
+ private final Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier;
+
+ /** The {@link ChannelFuture} is null when this connection is closed. */
+ private final AtomicReference<ChannelFuture> ref;
+
+ Connection(String address, WorkerGroupGetter workerGroup,
+ Supplier<ChannelInitializer<SocketChannel>>
channelInitializerSupplier) {
+ this.address = address;
+ this.workerGroup = workerGroup;
+ this.channelInitializerSupplier = channelInitializerSupplier;
+ this.ref = new AtomicReference<>(connect());
+ }
+
+ Channel getChannelUninterruptibly() {
+ final ChannelFuture future = ref.get();
+ if (future == null) {
+ return null; //closed
+ }
+ final Channel channel = future.syncUninterruptibly().channel();
+ if (channel.isOpen()) {
+ return channel;
+ }
+ return reconnect().syncUninterruptibly().channel();
+ }
+
+ private EventLoopGroup getWorkerGroup() {
+ return workerGroup.get();
+ }
+
+ private ChannelFuture connect() {
+ return new Bootstrap()
+ .group(getWorkerGroup())
+ .channel(NioSocketChannel.class)
+ .handler(channelInitializerSupplier.get())
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .connect(NetUtils.createSocketAddr(address))
+ .addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ scheduleReconnect(this + " failed", future.cause());
+ } else {
+ LOG.trace("{} succeed.", this);
+ }
+ }
+ });
+ }
+
+ void scheduleReconnect(String message, Throwable cause) {
+ LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message,
address, RECONNECT);
+ if (cause != null) {
+ LOG.warn("", cause);
+ }
+ getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(),
RECONNECT.getUnit());
+ }
+
+ private ChannelFuture reconnect() {
+ final MemoizedSupplier<ChannelFuture> supplier =
MemoizedSupplier.valueOf(this::connect);
+ final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null?
null: supplier.get());
+ if (previous != null) {
+ previous.channel().close();
+ }
+ return supplier.isInitialized() ? supplier.get() : null;
+ }
+
+ void close() {
+ final ChannelFuture previous = ref.getAndSet(null);
+ if (previous != null) {
+ previous.channel().close();
+ }
+ workerGroup.shutdownGracefully();
+ }
+
+ boolean isClosed() {
+ return ref.get() == null;
+ }
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass()) + "-" + address;
+ }
+ }
+
private final String name;
- private final WorkerGroupGetter workerGroup;
- private final Supplier<Channel> channel;
+ private final Connection connection;
+
private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new
ConcurrentHashMap<>();
private final TimeDuration replyQueueGracePeriod;
private final TimeoutScheduler timeoutScheduler =
TimeoutScheduler.getInstance();
public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties
properties) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
- this.workerGroup = new WorkerGroupGetter(properties);
-
- final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
- final ChannelFuture f = new Bootstrap()
- .group(workerGroup.get())
- .channel(NioSocketChannel.class)
- .handler(newChannelInitializer(sslContext))
- .option(ChannelOption.SO_KEEPALIVE, true)
- .connect(NetUtils.createSocketAddr(server.getDataStreamAddress()));
- this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
this.replyQueueGracePeriod =
NettyConfigKeys.DataStream.clientReplyQueueGracePeriod(properties);
- }
- private Channel getChannel() {
- return channel.get();
+ final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
+ this.connection = new Connection(server.getDataStreamAddress(),
+ new WorkerGroupGetter(properties),
+ () -> newChannelInitializer(sslContext, getClientHandler()));
}
private ChannelInboundHandler getClientHandler(){
@@ -202,10 +295,18 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
LOG.warn(name + ": exceptionCaught", cause);
ctx.close();
}
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ if (!connection.isClosed()) {
+ connection.scheduleReconnect("channel is inactive", null);
+ }
+ super.channelInactive(ctx);
+ }
};
}
- private ChannelInitializer<SocketChannel> newChannelInitializer(SslContext
sslContext){
+ static ChannelInitializer<SocketChannel> newChannelInitializer(SslContext
sslContext, ChannelInboundHandler handler) {
return new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) {
@@ -216,12 +317,12 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
p.addLast(newEncoder());
p.addLast(newEncoderDataStreamRequestFilePositionCount());
p.addLast(newDecoder());
- p.addLast(getClientHandler());
+ p.addLast(handler);
}
};
}
- MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
+ static MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
@Override
protected void encode(ChannelHandlerContext context,
DataStreamRequestByteBuffer request, List<Object> out) {
@@ -230,7 +331,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
};
}
- MessageToMessageEncoder<DataStreamRequestFilePositionCount>
newEncoderDataStreamRequestFilePositionCount() {
+ static MessageToMessageEncoder<DataStreamRequestFilePositionCount>
newEncoderDataStreamRequestFilePositionCount() {
return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>() {
@Override
protected void encode(ChannelHandlerContext ctx,
DataStreamRequestFilePositionCount request, List<Object> out) {
@@ -239,7 +340,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
};
}
- ByteToMessageDecoder newDecoder() {
+ static ByteToMessageDecoder newDecoder() {
return new ByteToMessageDecoder() {
{
this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
@@ -261,15 +362,25 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
f.completeExceptionally(new IllegalStateException(this + ": Failed to
offer a future for " + request));
return f;
}
+ final Channel channel = connection.getChannelUninterruptibly();
+ if (channel == null) {
+ f.completeExceptionally(new AlreadyClosedException(this + ": Failed to
send " + request));
+ return f;
+ }
LOG.debug("{}: write {}", this, request);
- getChannel().writeAndFlush(request);
+ channel.writeAndFlush(request).addListener(future -> {
+ if (!future.isSuccess()) {
+ final IOException e = new IOException(this + ": Failed to send " +
request, future.cause());
+ LOG.error("Channel write failed", e);
+ f.completeExceptionally(e);
+ }
+ });
return f;
}
@Override
public void close() {
- getChannel().close().syncUninterruptibly();
- workerGroup.shutdownGracefully();
+ connection.close();
}
@Override