This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e00bc49  RATIS-1107. Start NettyServerStreamRpc when create 
RaftServerProxy. (#233)
e00bc49 is described below

commit e00bc493c989d56db8c8dd0f4438f9639bbfb8d6
Author: runzhiwang <[email protected]>
AuthorDate: Tue Oct 27 17:59:42 2020 +0800

    RATIS-1107. Start NettyServerStreamRpc when create RaftServerProxy. (#233)
---
 .../org/apache/ratis/netty/NettyConfigKeys.java     | 21 +++++++++++++++++++++
 .../apache/ratis/netty/NettyDataStreamFactory.java  |  7 +++++++
 .../ratis/netty/server/NettyServerStreamRpc.java    | 21 +++++++++++++++++----
 .../ratis/server/DataStreamServerFactory.java       |  3 +++
 .../ratis/server/impl/DataStreamServerImpl.java     |  9 +++++++++
 .../apache/ratis/server/impl/RaftServerProxy.java   | 21 +++++++++++++++++++++
 6 files changed, 78 insertions(+), 4 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 2bfcca6..d3be80b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -49,6 +49,27 @@ public interface NettyConfigKeys {
     }
   }
 
+  interface DataStream {
+    Logger LOG = LoggerFactory.getLogger(Server.class);
+    static Consumer<String> getDefaultLog() {
+      return LOG::info;
+    }
+
+    String PREFIX = NettyConfigKeys.PREFIX + ".dataStream";
+
+    String PORT_KEY = PREFIX + ".port";
+    int PORT_DEFAULT = 0;
+
+    static int port(RaftProperties properties) {
+      return getInt(properties::getInt,
+          PORT_KEY, PORT_DEFAULT, getDefaultLog(), requireMin(0), 
requireMax(65536));
+    }
+
+    static void setPort(RaftProperties properties, int port) {
+      setInt(properties::setInt, PORT_KEY, port);
+    }
+  }
+
   static void main(String[] args) {
     printAll(NettyConfigKeys.class);
   }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index aa76b58..6b5f15e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -27,6 +27,7 @@ import org.apache.ratis.netty.server.NettyServerStreamRpc;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.StateMachine;
 
 public class NettyDataStreamFactory implements DataStreamServerFactory, 
DataStreamClientFactory {
@@ -47,4 +48,10 @@ public class NettyDataStreamFactory implements 
DataStreamServerFactory, DataStre
       RaftProperties properties) {
     return new NettyServerStreamRpc(server, stateMachine, properties);
   }
+
+  @Override
+  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, 
StateMachine stateMachine,
+      RaftProperties properties) {
+    return new NettyServerStreamRpc(server, stateMachine, properties);
+  }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index bc98720..4d2a43e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -24,13 +24,16 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
@@ -117,6 +120,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     }
   }
 
+  private final RaftServer server;
   private final String name;
   private final EventLoopGroup bossGroup = new NioEventLoopGroup();
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -129,7 +133,18 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private final Proxies proxies;
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, 
RaftProperties properties) {
-    this.name = server + "-" + getClass().getSimpleName();
+    this(null, server.getId(), stateMachine, properties,
+        NetUtils.createSocketAddr(server.getAddress()).getPort());
+  }
+
+  public NettyServerStreamRpc(RaftServer server, StateMachine stateMachine, 
RaftProperties properties) {
+    this(server, server.getId(), stateMachine, properties, 
NettyConfigKeys.DataStream.port(server.getProperties()));
+  }
+
+  public NettyServerStreamRpc(
+      RaftServer server, RaftPeerId id, StateMachine stateMachine, 
RaftProperties properties, int port) {
+    this.server = server;
+    this.name = id + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
     this.channelFuture = new ServerBootstrap()
         .group(bossGroup, workerGroup)
@@ -137,9 +152,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(getInitializer())
         .childOption(ChannelOption.SO_KEEPALIVE, true)
-        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
-        .bind();
-
+        .bind(port);
     this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties)));
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index be3db98..edfaf76 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -37,4 +37,7 @@ public interface DataStreamServerFactory extends 
DataStreamFactory {
 
   /** Create a new {@link DataStreamServerRpc}. */
   DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine 
stateMachine, RaftProperties properties);
+
+  /** Create a new {@link DataStreamServerRpc}. */
+  DataStreamServerRpc newDataStreamServerRpc(RaftServer server, StateMachine 
stateMachine, RaftProperties properties);
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 2504f6f..c66829b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServer;
 import org.apache.ratis.server.DataStreamServerFactory;
 import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,14 @@ public class DataStreamServerImpl implements 
DataStreamServer {
         .newDataStreamServerRpc(server, stateMachine, properties);
   }
 
+  public DataStreamServerImpl(RaftServer server, StateMachine stateMachine,
+      RaftProperties properties, Parameters parameters){
+    final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
+
+    this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+        .newDataStreamServerRpc(server, stateMachine, properties);
+  }
+
   @Override
   public DataStreamServerRpc getServerRpc() {
     return serverRpc;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 8d5e692..4f700d9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -33,6 +34,7 @@ import 
org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
@@ -165,6 +167,8 @@ public class RaftServerProxy implements RaftServer {
   private final RaftServerRpc serverRpc;
   private final ServerFactory factory;
 
+  private final DataStreamServerRpc dataStreamServerRpc;
+
   private ExecutorService implExecutor;
 
   private final ImplMap impls = new ImplMap();
@@ -178,6 +182,12 @@ public class RaftServerProxy implements RaftServer {
     this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
 
     this.serverRpc = factory.newRaftServerRpc(this);
+
+    // TODO: Support multi-raft and should pass StateMachineRegistry to 
DataStreamServerImpl instead of StateMachine
+    this.dataStreamServerRpc =
+        new DataStreamServerImpl(this, 
stateMachineRegistry.apply(RaftGroupId.emptyGroupId()), properties, null)
+            .getServerRpc();
+
     this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
     this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
 
@@ -265,6 +275,10 @@ public class RaftServerProxy implements RaftServer {
     return serverRpc;
   }
 
+  public DataStreamServerRpc getDataStreamServerRpc() {
+    return dataStreamServerRpc;
+  }
+
   public boolean containsGroup(RaftGroupId groupId) {
     return impls.containsGroup(groupId);
   }
@@ -306,6 +320,7 @@ public class RaftServerProxy implements RaftServer {
     lifeCycle.startAndTransition(() -> {
       LOG.info("{}: start RPC server", getId());
       getServerRpc().start();
+      getDataStreamServerRpc().start();
     }, IOException.class);
   }
 
@@ -327,6 +342,12 @@ public class RaftServerProxy implements RaftServer {
       } catch(IOException ignored) {
         LOG.warn(getId() + ": Failed to close " + getRpcType() + " server", 
ignored);
       }
+
+      try {
+        getDataStreamServerRpc().close();
+      } catch (IOException ignored) {
+        LOG.warn(getId() + ": Failed to close " + 
SupportedDataStreamType.NETTY + " server", ignored);
+      }
     });
   }
 

Reply via email to