This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 865a6ca  RATIS-1117. Remove DataStreamClient.start() and 
DataStreamClientRpc.startClient(). (#239)
865a6ca is described below

commit 865a6cab64313e6702d557c385ecd328abd43692
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Oct 30 09:55:05 2020 +0800

    RATIS-1117. Remove DataStreamClient.start() and 
DataStreamClientRpc.startClient(). (#239)
---
 .../org/apache/ratis/client/DataStreamClient.java  |  7 +--
 .../apache/ratis/client/DataStreamClientRpc.java   | 12 +++--
 .../client/DisabledDataStreamClientFactory.java    |  5 +--
 .../ratis/client/impl/DataStreamClientImpl.java    | 14 ++----
 .../ratis/netty/client/NettyClientStreamRpc.java   | 52 +++++++++++-----------
 .../ratis/netty/server/NettyServerStreamRpc.java   |  4 +-
 .../ratis/datastream/TestDataStreamBase.java       |  1 -
 7 files changed, 36 insertions(+), 59 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index 9122763..cc888c7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -28,19 +28,14 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 
 /**
- * A client interface that sends request to the streaming pipeline.
- * Associated with it will be a Netty Client.
+ * A user interface extending {@link DataStreamApi}.
  */
 public interface DataStreamClient extends DataStreamApi, Closeable {
-
   Logger LOG = LoggerFactory.getLogger(DataStreamClient.class);
 
   /** Return the rpc client instance **/
   DataStreamClientRpc getClientRpc();
 
-  /** start the client */
-  void start();
-
   static Builder newBuilder() {
     return new Builder();
   }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
index 4db1c55..a9bcd9d 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
@@ -22,20 +22,18 @@ import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.util.JavaUtils;
 
+import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * An api interface for to stream from client to server.
+ * A client interface for sending stream requests.
+ * The underlying implementation is pluggable, depending on the {@link 
org.apache.ratis.datastream.DataStreamType}.
+ * The implementations of this interface define how the requests are 
transported to the server.
  */
-public interface DataStreamClientRpc {
-
+public interface DataStreamClientRpc extends Closeable {
   /** Async call to send a request. */
   default CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest 
request) {
     throw new UnsupportedOperationException(getClass() + " does not support "
         + JavaUtils.getCurrentStackTraceElement().getMethodName());
   }
-
-  void startClient();
-
-  void closeClient();
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
index 1347e7b..7150228 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/DisabledDataStreamClientFactory.java
@@ -35,10 +35,7 @@ public class DisabledDataStreamClientFactory implements 
DataStreamClientFactory
   public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, 
RaftProperties properties) {
     return new DataStreamClientRpc() {
       @Override
-      public void startClient() {}
-
-      @Override
-      public void closeClient() {}
+      public void close() {}
     };
   }
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 9316c62..506e25d 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -31,9 +31,8 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -44,8 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * allows client to create streams and send asynchronously.
  */
 public class DataStreamClientImpl implements DataStreamClient {
-  public static final Logger LOG = 
LoggerFactory.getLogger(DataStreamClientImpl.class);
-
   // TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup 
in constructor.
   private final ClientId clientId = ClientId.randomId();
   private final RaftGroupId groupId =  RaftGroupId.randomId();
@@ -117,12 +114,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
   }
 
   @Override
-  public void close(){
-    dataStreamClientRpc.closeClient();
-  }
-
-  @Override
-  public void start(){
-    dataStreamClientRpc.startClient();
+  public void close() throws IOException {
+    dataStreamClientRpc.close();
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 5478880..e769e2c 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -33,28 +33,40 @@ import 
org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 public class NettyClientStreamRpc implements DataStreamClientRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientStreamRpc.class);
 
-  private RaftPeer server;
+  private final RaftPeer server;
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
-  private Channel channel;
-  private Queue<CompletableFuture<DataStreamReply>> replies
-      = new LinkedList<>();
+  private final Supplier<Channel> channel;
+  private final Queue<CompletableFuture<DataStreamReply>> replies = new 
LinkedList<>();
 
   public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
     this.server = server;
+
+    final ChannelFuture f = new Bootstrap()
+        .group(workerGroup)
+        .channel(NioSocketChannel.class)
+        .handler(getInitializer())
+        .option(ChannelOption.SO_KEEPALIVE, true)
+        .connect(NetUtils.createSocketAddr(server.getAddress()));
+    this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
+  }
+
+  private Channel getChannel() {
+    return channel.get();
   }
 
   synchronized CompletableFuture<DataStreamReply> pollReply() {
@@ -64,7 +76,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   private ChannelInboundHandler getClientHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
InterruptedException {
+      public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamReply reply = (DataStreamReply) msg;
         pollReply().complete(reply);
       }
@@ -74,8 +86,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   private ChannelInitializer<SocketChannel> getInitializer(){
     return new ChannelInitializer<SocketChannel>(){
       @Override
-      public void initChannel(SocketChannel ch)
-          throws Exception {
+      public void initChannel(SocketChannel ch) {
         ChannelPipeline p = ch.pipeline();
         p.addLast(newEncoder());
         p.addLast(newDecoder());
@@ -110,31 +121,18 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   public synchronized CompletableFuture<DataStreamReply> 
streamAsync(DataStreamRequest request) {
     CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
     replies.offer(f);
-    channel.writeAndFlush(request);
+    getChannel().writeAndFlush(request);
     return f;
   }
 
   @Override
-  public void startClient() {
-    final InetSocketAddress address = 
NetUtils.createSocketAddr(server.getAddress());
-    try {
-      channel = (new Bootstrap())
-          .group(workerGroup)
-          .channel(NioSocketChannel.class)
-          .handler(getInitializer())
-          .option(ChannelOption.SO_KEEPALIVE, true)
-          .connect(address)
-          .sync()
-          .channel();
-      System.out.println(channel);
-    } catch (Exception e){
-      LOG.info("Exception {}", e.getCause());
-    }
+  public void close() {
+    getChannel().close().syncUninterruptibly();
+    workerGroup.shutdownGracefully();
   }
 
   @Override
-  public void closeClient(){
-    channel.close().syncUninterruptibly();
-    workerGroup.shutdownGracefully();
+  public String toString() {
+    return getClass().getSimpleName() + "->" + server;
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 9797709..4743fcd 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -177,12 +177,10 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   }
 
   static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
-    final DataStreamClient client = DataStreamClient.newBuilder()
+    return DataStreamClient.newBuilder()
         .setRaftServer(peer)
         .setProperties(properties)
         .build();
-    client.start();
-    return client;
   }
 
   @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
index d14b737..01b9fae 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
@@ -144,7 +144,6 @@ class TestDataStreamBase extends BaseTest {
 
   protected void setupClient(){
     client = new DataStreamClientImpl(peers.get(0), properties, null);
-    client.start();
   }
 
   protected void shutdown() throws IOException {

Reply via email to