This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e00bc49 RATIS-1107. Start NettyServerStreamRpc when create
RaftServerProxy. (#233)
e00bc49 is described below
commit e00bc493c989d56db8c8dd0f4438f9639bbfb8d6
Author: runzhiwang <[email protected]>
AuthorDate: Tue Oct 27 17:59:42 2020 +0800
RATIS-1107. Start NettyServerStreamRpc when create RaftServerProxy. (#233)
---
.../org/apache/ratis/netty/NettyConfigKeys.java | 21 +++++++++++++++++++++
.../apache/ratis/netty/NettyDataStreamFactory.java | 7 +++++++
.../ratis/netty/server/NettyServerStreamRpc.java | 21 +++++++++++++++++----
.../ratis/server/DataStreamServerFactory.java | 3 +++
.../ratis/server/impl/DataStreamServerImpl.java | 9 +++++++++
.../apache/ratis/server/impl/RaftServerProxy.java | 21 +++++++++++++++++++++
6 files changed, 78 insertions(+), 4 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 2bfcca6..d3be80b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -49,6 +49,27 @@ public interface NettyConfigKeys {
}
}
+ interface DataStream {
+ Logger LOG = LoggerFactory.getLogger(Server.class);
+ static Consumer<String> getDefaultLog() {
+ return LOG::info;
+ }
+
+ String PREFIX = NettyConfigKeys.PREFIX + ".dataStream";
+
+ String PORT_KEY = PREFIX + ".port";
+ int PORT_DEFAULT = 0;
+
+ static int port(RaftProperties properties) {
+ return getInt(properties::getInt,
+ PORT_KEY, PORT_DEFAULT, getDefaultLog(), requireMin(0),
requireMax(65536));
+ }
+
+ static void setPort(RaftProperties properties, int port) {
+ setInt(properties::setInt, PORT_KEY, port);
+ }
+ }
+
static void main(String[] args) {
printAll(NettyConfigKeys.class);
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index aa76b58..6b5f15e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -27,6 +27,7 @@ import org.apache.ratis.netty.server.NettyServerStreamRpc;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
public class NettyDataStreamFactory implements DataStreamServerFactory,
DataStreamClientFactory {
@@ -47,4 +48,10 @@ public class NettyDataStreamFactory implements
DataStreamServerFactory, DataStre
RaftProperties properties) {
return new NettyServerStreamRpc(server, stateMachine, properties);
}
+
+ @Override
+ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server,
StateMachine stateMachine,
+ RaftProperties properties) {
+ return new NettyServerStreamRpc(server, stateMachine, properties);
+ }
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index bc98720..4d2a43e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -24,13 +24,16 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
@@ -117,6 +120,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
}
+ private final RaftServer server;
private final String name;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -129,7 +133,18 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private final Proxies proxies;
public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine,
RaftProperties properties) {
- this.name = server + "-" + getClass().getSimpleName();
+ this(null, server.getId(), stateMachine, properties,
+ NetUtils.createSocketAddr(server.getAddress()).getPort());
+ }
+
+ public NettyServerStreamRpc(RaftServer server, StateMachine stateMachine,
RaftProperties properties) {
+ this(server, server.getId(), stateMachine, properties,
NettyConfigKeys.DataStream.port(server.getProperties()));
+ }
+
+ public NettyServerStreamRpc(
+ RaftServer server, RaftPeerId id, StateMachine stateMachine,
RaftProperties properties, int port) {
+ this.server = server;
+ this.name = id + "-" + getClass().getSimpleName();
this.stateMachine = stateMachine;
this.channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
@@ -137,9 +152,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(getInitializer())
.childOption(ChannelOption.SO_KEEPALIVE, true)
- .localAddress(NetUtils.createSocketAddr(server.getAddress()))
- .bind();
-
+ .bind(port);
this.proxies = new Proxies(new PeerProxyMap<>(name, peer ->
newClient(peer, properties)));
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index be3db98..edfaf76 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -37,4 +37,7 @@ public interface DataStreamServerFactory extends
DataStreamFactory {
/** Create a new {@link DataStreamServerRpc}. */
DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine
stateMachine, RaftProperties properties);
+
+ /** Create a new {@link DataStreamServerRpc}. */
+ DataStreamServerRpc newDataStreamServerRpc(RaftServer server, StateMachine
stateMachine, RaftProperties properties);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 2504f6f..c66829b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DataStreamServerFactory;
import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,14 @@ public class DataStreamServerImpl implements
DataStreamServer {
.newDataStreamServerRpc(server, stateMachine, properties);
}
+ public DataStreamServerImpl(RaftServer server, StateMachine stateMachine,
+ RaftProperties properties, Parameters parameters){
+ final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
+
+ this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+ .newDataStreamServerRpc(server, stateMachine, properties);
+ }
+
@Override
public DataStreamServerRpc getServerRpc() {
return serverRpc;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 8d5e692..4f700d9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -33,6 +34,7 @@ import
org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
@@ -165,6 +167,8 @@ public class RaftServerProxy implements RaftServer {
private final RaftServerRpc serverRpc;
private final ServerFactory factory;
+ private final DataStreamServerRpc dataStreamServerRpc;
+
private ExecutorService implExecutor;
private final ImplMap impls = new ImplMap();
@@ -178,6 +182,12 @@ public class RaftServerProxy implements RaftServer {
this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
this.serverRpc = factory.newRaftServerRpc(this);
+
+ // TODO: Support multi-raft and should pass StateMachineRegistry to
DataStreamServerImpl instead of StateMachine
+ this.dataStreamServerRpc =
+ new DataStreamServerImpl(this,
stateMachineRegistry.apply(RaftGroupId.emptyGroupId()), properties, null)
+ .getServerRpc();
+
this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
@@ -265,6 +275,10 @@ public class RaftServerProxy implements RaftServer {
return serverRpc;
}
+ public DataStreamServerRpc getDataStreamServerRpc() {
+ return dataStreamServerRpc;
+ }
+
public boolean containsGroup(RaftGroupId groupId) {
return impls.containsGroup(groupId);
}
@@ -306,6 +320,7 @@ public class RaftServerProxy implements RaftServer {
lifeCycle.startAndTransition(() -> {
LOG.info("{}: start RPC server", getId());
getServerRpc().start();
+ getDataStreamServerRpc().start();
}, IOException.class);
}
@@ -327,6 +342,12 @@ public class RaftServerProxy implements RaftServer {
} catch(IOException ignored) {
LOG.warn(getId() + ": Failed to close " + getRpcType() + " server",
ignored);
}
+
+ try {
+ getDataStreamServerRpc().close();
+ } catch (IOException ignored) {
+ LOG.warn(getId() + ": Failed to close " +
SupportedDataStreamType.NETTY + " server", ignored);
+ }
});
}