This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 26582bf  RATIS-1549. If the stream client is abnormally disconnected, 
reconnect. (#620)
26582bf is described below

commit 26582bf4de6ce9cd2e61e69a1c235298f4635fe7
Author: hao guo <[email protected]>
AuthorDate: Mon Mar 14 23:02:13 2022 +0800

    RATIS-1549. If the stream client is abnormally disconnected, reconnect. 
(#620)
---
 .../ratis/netty/client/NettyClientStreamRpc.java   | 163 +++++++++++++++++----
 1 file changed, 137 insertions(+), 26 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 57b44c2..b2714c7 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -29,23 +29,34 @@ import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.security.TlsConf;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.*;
-import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.Channel;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
@@ -54,6 +65,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -64,7 +76,9 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = 
new AtomicReference<>();
 
     static EventLoopGroup newWorkerGroup(RaftProperties properties) {
-      return new 
NioEventLoopGroup(NettyConfigKeys.DataStream.clientWorkerGroupSize(properties));
+      return NettyUtils.newEventLoopGroup(
+          JavaUtils.getClassSimpleName(NettyClientStreamRpc.class) + 
"-workerGroup",
+          NettyConfigKeys.DataStream.clientWorkerGroupSize(properties), false);
     }
 
     private final EventLoopGroup workerGroup;
@@ -125,30 +139,109 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     }
   }
 
+  static class Connection {
+    static final TimeDuration RECONNECT = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
+
+    private final String address;
+    private final WorkerGroupGetter workerGroup;
+    private final Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier;
+
+    /** The {@link ChannelFuture} is null when this connection is closed. */
+    private final AtomicReference<ChannelFuture> ref;
+
+    Connection(String address, WorkerGroupGetter workerGroup,
+        Supplier<ChannelInitializer<SocketChannel>> 
channelInitializerSupplier) {
+      this.address = address;
+      this.workerGroup = workerGroup;
+      this.channelInitializerSupplier = channelInitializerSupplier;
+      this.ref = new AtomicReference<>(connect());
+    }
+
+    Channel getChannelUninterruptibly() {
+      final ChannelFuture future = ref.get();
+      if (future == null) {
+        return null; //closed
+      }
+      final Channel channel = future.syncUninterruptibly().channel();
+      if (channel.isOpen()) {
+        return channel;
+      }
+      return reconnect().syncUninterruptibly().channel();
+    }
+
+    private EventLoopGroup getWorkerGroup() {
+      return workerGroup.get();
+    }
+
+    private ChannelFuture connect() {
+      return new Bootstrap()
+          .group(getWorkerGroup())
+          .channel(NioSocketChannel.class)
+          .handler(channelInitializerSupplier.get())
+          .option(ChannelOption.SO_KEEPALIVE, true)
+          .connect(NetUtils.createSocketAddr(address))
+          .addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) {
+              if (!future.isSuccess()) {
+                scheduleReconnect(this + " failed", future.cause());
+              } else {
+                LOG.trace("{} succeed.", this);
+              }
+            }
+          });
+    }
+
+    void scheduleReconnect(String message, Throwable cause) {
+      LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message, 
address, RECONNECT);
+      if (cause != null) {
+        LOG.warn("", cause);
+      }
+      getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), 
RECONNECT.getUnit());
+    }
+
+    private ChannelFuture reconnect() {
+      final MemoizedSupplier<ChannelFuture> supplier = 
MemoizedSupplier.valueOf(this::connect);
+      final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? 
null: supplier.get());
+      if (previous != null) {
+        previous.channel().close();
+      }
+      return supplier.isInitialized() ? supplier.get() : null;
+    }
+
+    void close() {
+      final ChannelFuture previous = ref.getAndSet(null);
+      if (previous != null) {
+        previous.channel().close();
+      }
+      workerGroup.shutdownGracefully();
+    }
+
+    boolean isClosed() {
+      return ref.get() == null;
+    }
+
+    @Override
+    public String toString() {
+      return JavaUtils.getClassSimpleName(getClass()) + "-" + address;
+    }
+  }
+
   private final String name;
-  private final WorkerGroupGetter workerGroup;
-  private final Supplier<Channel> channel;
+  private final Connection connection;
+
   private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new 
ConcurrentHashMap<>();
   private final TimeDuration replyQueueGracePeriod;
   private final TimeoutScheduler timeoutScheduler = 
TimeoutScheduler.getInstance();
 
   public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties 
properties) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
-    this.workerGroup = new WorkerGroupGetter(properties);
-
-    final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
-    final ChannelFuture f = new Bootstrap()
-        .group(workerGroup.get())
-        .channel(NioSocketChannel.class)
-        .handler(newChannelInitializer(sslContext))
-        .option(ChannelOption.SO_KEEPALIVE, true)
-        .connect(NetUtils.createSocketAddr(server.getDataStreamAddress()));
-    this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
     this.replyQueueGracePeriod = 
NettyConfigKeys.DataStream.clientReplyQueueGracePeriod(properties);
-  }
 
-  private Channel getChannel() {
-    return channel.get();
+    final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
+    this.connection = new Connection(server.getDataStreamAddress(),
+        new WorkerGroupGetter(properties),
+        () -> newChannelInitializer(sslContext, getClientHandler()));
   }
 
   private ChannelInboundHandler getClientHandler(){
@@ -202,10 +295,18 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         LOG.warn(name + ": exceptionCaught", cause);
         ctx.close();
       }
+
+      @Override
+      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        if (!connection.isClosed()) {
+          connection.scheduleReconnect("channel is inactive", null);
+        }
+        super.channelInactive(ctx);
+      }
     };
   }
 
-  private ChannelInitializer<SocketChannel> newChannelInitializer(SslContext 
sslContext){
+  static ChannelInitializer<SocketChannel> newChannelInitializer(SslContext 
sslContext, ChannelInboundHandler handler) {
     return new ChannelInitializer<SocketChannel>(){
       @Override
       public void initChannel(SocketChannel ch) {
@@ -216,12 +317,12 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
         p.addLast(newEncoder());
         p.addLast(newEncoderDataStreamRequestFilePositionCount());
         p.addLast(newDecoder());
-        p.addLast(getClientHandler());
+        p.addLast(handler);
       }
     };
   }
 
-  MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
+  static MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
     return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
       @Override
       protected void encode(ChannelHandlerContext context, 
DataStreamRequestByteBuffer request, List<Object> out) {
@@ -230,7 +331,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     };
   }
 
-  MessageToMessageEncoder<DataStreamRequestFilePositionCount> 
newEncoderDataStreamRequestFilePositionCount() {
+  static MessageToMessageEncoder<DataStreamRequestFilePositionCount> 
newEncoderDataStreamRequestFilePositionCount() {
     return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>() {
       @Override
       protected void encode(ChannelHandlerContext ctx, 
DataStreamRequestFilePositionCount request, List<Object> out) {
@@ -239,7 +340,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     };
   }
 
-  ByteToMessageDecoder newDecoder() {
+  static ByteToMessageDecoder newDecoder() {
     return new ByteToMessageDecoder() {
       {
         this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
@@ -261,15 +362,25 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
       f.completeExceptionally(new IllegalStateException(this + ": Failed to 
offer a future for " + request));
       return f;
     }
+    final Channel channel = connection.getChannelUninterruptibly();
+    if (channel == null) {
+      f.completeExceptionally(new AlreadyClosedException(this + ": Failed to 
send " + request));
+      return f;
+    }
     LOG.debug("{}: write {}", this, request);
-    getChannel().writeAndFlush(request);
+    channel.writeAndFlush(request).addListener(future -> {
+      if (!future.isSuccess()) {
+        final IOException e = new IOException(this + ": Failed to send " + 
request, future.cause());
+        LOG.error("Channel write failed", e);
+        f.completeExceptionally(e);
+      }
+    });
     return f;
   }
 
   @Override
   public void close() {
-    getChannel().close().syncUninterruptibly();
-    workerGroup.shutdownGracefully();
+    connection.close();
   }
 
   @Override

Reply via email to