This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e5861fa RATIS-1124. NettyServerStreamRpc should get stateMachine from
RaftServer. (#246)
e5861fa is described below
commit e5861fafd7606d89199b98e8ffb16e7c7a223114
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Oct 31 18:05:55 2020 +0800
RATIS-1124. NettyServerStreamRpc should get stateMachine from RaftServer.
(#246)
---
.../apache/ratis/netty/NettyDataStreamFactory.java | 12 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 21 +--
.../ratis/server/DataStreamServerFactory.java | 7 +-
.../server/DisabledDataStreamServerFactory.java | 19 +--
.../java/org/apache/ratis/server/RaftServer.java | 4 +-
.../ratis/server/impl/DataStreamServerImpl.java | 15 +--
.../apache/ratis/server/impl/RaftServerProxy.java | 11 +-
...DataStreamBase.java => DataStreamBaseTest.java} | 149 ++++++++++++++++++++-
.../ratis/datastream/TestDataStreamDisabled.java | 2 +-
.../ratis/datastream/TestDataStreamNetty.java | 13 +-
10 files changed, 179 insertions(+), 74 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 6b5f15e..140e7db 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -28,7 +28,6 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DataStreamServerFactory;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.statemachine.StateMachine;
public class NettyDataStreamFactory implements DataStreamServerFactory,
DataStreamClientFactory {
public NettyDataStreamFactory(Parameters parameters){}
@@ -44,14 +43,7 @@ public class NettyDataStreamFactory implements
DataStreamServerFactory, DataStre
}
@Override
- public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server,
StateMachine stateMachine,
- RaftProperties properties) {
- return new NettyServerStreamRpc(server, stateMachine, properties);
- }
-
- @Override
- public DataStreamServerRpc newDataStreamServerRpc(RaftServer server,
StateMachine stateMachine,
- RaftProperties properties) {
- return new NettyServerStreamRpc(server, stateMachine, properties);
+ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server,
RaftProperties properties) {
+ return new NettyServerStreamRpc(server, properties);
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 440ccda..63beca7 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -31,7 +31,6 @@ import
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
@@ -47,7 +46,6 @@ import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -203,25 +201,15 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ChannelFuture channelFuture;
- private final StateMachine stateMachine;
private final StreamMap streams = new StreamMap();
private final Proxies proxies;
- public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine,
RaftProperties properties) {
- this(null, server.getId(), stateMachine, properties,
- NetUtils.createSocketAddr(server.getAddress()).getPort());
- }
-
- public NettyServerStreamRpc(RaftServer server, StateMachine stateMachine,
RaftProperties properties) {
- this(server, server.getId(), stateMachine, properties,
NettyConfigKeys.DataStream.port(server.getProperties()));
- }
-
- public NettyServerStreamRpc(
- RaftServer server, RaftPeerId id, StateMachine stateMachine,
RaftProperties properties, int port) {
+ public NettyServerStreamRpc(RaftServer server, RaftProperties properties) {
this.server = server;
- this.name = id + "-" + getClass().getSimpleName();
- this.stateMachine = stateMachine;
+ this.name = server.getId() + "-" + getClass().getSimpleName();
+
+ final int port = NettyConfigKeys.DataStream.port(server.getProperties());
this.channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@@ -248,6 +236,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
+ final StateMachine stateMachine =
server.getStateMachine(request.getRaftGroupId());
return new StreamInfo(request, stateMachine.data().stream(request),
proxies.getDataStreamOutput());
} catch (Throwable e) {
throw new CompletionException(e);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index 63589b0..65056c9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -21,8 +21,6 @@ import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
import org.apache.ratis.datastream.DataStreamType;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.statemachine.StateMachine;
/** A {@link DataStreamFactory} to create server-side objects. */
public interface DataStreamServerFactory extends DataStreamFactory {
@@ -36,8 +34,5 @@ public interface DataStreamServerFactory extends
DataStreamFactory {
}
/** Create a new {@link DataStreamServerRpc}. */
- DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine
stateMachine, RaftProperties properties);
-
- /** Create a new {@link DataStreamServerRpc}. */
- DataStreamServerRpc newDataStreamServerRpc(RaftServer server, StateMachine
stateMachine, RaftProperties properties);
+ DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties
properties);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
index a4762fa..89d3e3a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
@@ -21,7 +21,6 @@ import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.statemachine.StateMachine;
import java.util.Collection;
@@ -30,23 +29,7 @@ public class DisabledDataStreamServerFactory implements
DataStreamServerFactory
public DisabledDataStreamServerFactory(Parameters parameters) {}
@Override
- public DataStreamServerRpc newDataStreamServerRpc(
- RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
- return new DataStreamServerRpc() {
- @Override
- public void start() {}
-
- @Override
- public void close() {}
-
- @Override
- public void addRaftPeers(Collection<RaftPeer> peers) {}
- };
- }
-
- @Override
- public DataStreamServerRpc newDataStreamServerRpc(
- RaftServer server, StateMachine stateMachine, RaftProperties properties)
{
+ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server,
RaftProperties properties) {
return new DataStreamServerRpc() {
@Override
public void start() {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index c50a80a..30ca21a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -47,6 +47,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the groups the server is part of. */
Iterable<RaftGroup> getGroups() throws IOException;
+ StateMachine getStateMachine(RaftGroupId groupId) throws IOException;
+
/** @return the server properties. */
RaftProperties getProperties();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 4ccbfc8..d41ea18 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -23,12 +23,10 @@ import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DataStreamServerFactory;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,20 +35,11 @@ public class DataStreamServerImpl implements
DataStreamServer {
private final DataStreamServerRpc serverRpc;
- public DataStreamServerImpl(RaftPeer server, StateMachine stateMachine,
- RaftProperties properties, Parameters parameters){
+ public DataStreamServerImpl(RaftServer server, RaftProperties properties,
Parameters parameters) {
final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
- .newDataStreamServerRpc(server, stateMachine, properties);
- }
-
- public DataStreamServerImpl(RaftServer server, StateMachine stateMachine,
- RaftProperties properties, Parameters parameters){
- final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
-
- this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
- .newDataStreamServerRpc(server, stateMachine, properties);
+ .newDataStreamServerRpc(server, properties);
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 4f700d9..fbe9862 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -182,11 +182,7 @@ public class RaftServerProxy implements RaftServer {
this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
this.serverRpc = factory.newRaftServerRpc(this);
-
- // TODO: Support multi-raft and should pass StateMachineRegistry to
DataStreamServerImpl instead of StateMachine
- this.dataStreamServerRpc =
- new DataStreamServerImpl(this,
stateMachineRegistry.apply(RaftGroupId.emptyGroupId()), properties, null)
- .getServerRpc();
+ this.dataStreamServerRpc = new DataStreamServerImpl(this, properties,
null).getServerRpc();
this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
@@ -309,6 +305,11 @@ public class RaftServerProxy implements RaftServer {
}
@Override
+ public StateMachine getStateMachine(RaftGroupId groupId) throws IOException {
+ return getImpl(groupId).getStateMachine();
+ }
+
+ @Override
public LifeCycle.State getLifeCycleState() {
return lifeCycle.getCurrentState();
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
similarity index 69%
rename from
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
rename to
ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 80c5a80..dd8505f 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -22,16 +22,32 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.GroupListReply;
+import org.apache.ratis.protocol.GroupListRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
@@ -47,7 +63,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
-class TestDataStreamBase extends BaseTest {
+abstract class DataStreamBaseTest extends BaseTest {
static final int MODULUS = 23;
static byte pos2byte(int pos) {
@@ -137,6 +153,132 @@ class TestDataStreamBase extends BaseTest {
private List<RaftPeer> peers;
private List<MultiDataStreamStateMachine> stateMachines;
+ protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties)
{
+ final ConcurrentMap<RaftGroupId, StateMachine> stateMachines = new
ConcurrentHashMap<>();
+
+ return new RaftServer() {
+ @Override
+ public RaftPeerId getId() {
+ return peer.getId();
+ }
+
+ @Override
+ public StateMachine getStateMachine(RaftGroupId groupId) {
+ return stateMachines.computeIfAbsent(groupId, key -> new
MultiDataStreamStateMachine());
+ }
+
+ @Override
+ public RaftProperties getProperties() {
+ return properties;
+ }
+
+
+ @Override
+ public RequestVoteReplyProto requestVote(RequestVoteRequestProto
request) {
+ return null;
+ }
+
+ @Override
+ public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto
request) {
+ return null;
+ }
+
+ @Override
+ public InstallSnapshotReplyProto
installSnapshot(InstallSnapshotRequestProto request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<AppendEntriesReplyProto>
appendEntriesAsync(AppendEntriesRequestProto request) {
+ return null;
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return null;
+ }
+
+ @Override
+ public RaftClientReply submitClientRequest(RaftClientRequest request) {
+ return null;
+ }
+
+ @Override
+ public RaftClientReply setConfiguration(SetConfigurationRequest request)
{
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply>
submitClientRequestAsync(RaftClientRequest request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply>
setConfigurationAsync(SetConfigurationRequest request) {
+ return null;
+ }
+
+ @Override
+ public GroupListReply getGroupList(GroupListRequest request) {
+ return null;
+ }
+
+ @Override
+ public GroupInfoReply getGroupInfo(GroupInfoRequest request) {
+ return null;
+ }
+
+ @Override
+ public RaftClientReply groupManagement(GroupManagementRequest request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<GroupListReply>
getGroupListAsync(GroupListRequest request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<GroupInfoReply>
getGroupInfoAsync(GroupInfoRequest request) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply>
groupManagementAsync(GroupManagementRequest request) {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Iterable<RaftGroupId> getGroupIds() {
+ return null;
+ }
+
+ @Override
+ public Iterable<RaftGroup> getGroups() {
+ return null;
+ }
+
+ @Override
+ public ServerFactory getFactory() {
+ return null;
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public LifeCycle.State getLifeCycleState() {
+ return null;
+ }
+ };
+ }
+
+
protected void setup(int numServers){
peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
.map(RaftPeerId::valueOf)
@@ -148,8 +290,9 @@ class TestDataStreamBase extends BaseTest {
for (int i = 0; i < peers.size(); i++) {
final MultiDataStreamStateMachine stateMachine = new
MultiDataStreamStateMachine();
stateMachines.add(stateMachine);
- final DataStreamServerImpl streamServer = new DataStreamServerImpl(
- peers.get(i), stateMachine, properties, null);
+ final RaftPeer peer = peers.get(i);
+ final RaftServer server = newRaftServer(peer, properties);
+ final DataStreamServerImpl streamServer = new
DataStreamServerImpl(server, properties, null);
final DataStreamServerRpc rpc = streamServer.getServerRpc();
if (i == 0) {
// only the first server routes requests to peers.
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 0414704..363fa4e 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -26,7 +26,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-public class TestDataStreamDisabled extends TestDataStreamBase {
+public class TestDataStreamDisabled extends DataStreamBaseTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index b980dbd..93922d3 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -20,17 +20,28 @@ package org.apache.ratis.datastream;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.NetUtils;
import org.junit.Before;
import org.junit.Test;
-public class TestDataStreamNetty extends TestDataStreamBase {
+public class TestDataStreamNetty extends DataStreamBaseTest {
@Before
public void setup() {
properties = new RaftProperties();
RaftConfigKeys.DataStream.setType(properties,
SupportedDataStreamType.NETTY);
}
+ @Override
+ protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties)
{
+ final RaftProperties p = new RaftProperties(properties);
+ NettyConfigKeys.DataStream.setPort(p,
NetUtils.createSocketAddr(peer.getAddress()).getPort());
+ return super.newRaftServer(peer, p);
+ }
+
@Test
public void testDataStreamSingleServer() throws Exception {
runTestDataStream(1, 2, 3, 1_000_000, 10);