This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e5861fa  RATIS-1124. NettyServerStreamRpc should get stateMachine from 
RaftServer. (#246)
e5861fa is described below

commit e5861fafd7606d89199b98e8ffb16e7c7a223114
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Oct 31 18:05:55 2020 +0800

    RATIS-1124. NettyServerStreamRpc should get stateMachine from RaftServer. 
(#246)
---
 .../apache/ratis/netty/NettyDataStreamFactory.java |  12 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   |  21 +--
 .../ratis/server/DataStreamServerFactory.java      |   7 +-
 .../server/DisabledDataStreamServerFactory.java    |  19 +--
 .../java/org/apache/ratis/server/RaftServer.java   |   4 +-
 .../ratis/server/impl/DataStreamServerImpl.java    |  15 +--
 .../apache/ratis/server/impl/RaftServerProxy.java  |  11 +-
 ...DataStreamBase.java => DataStreamBaseTest.java} | 149 ++++++++++++++++++++-
 .../ratis/datastream/TestDataStreamDisabled.java   |   2 +-
 .../ratis/datastream/TestDataStreamNetty.java      |  13 +-
 10 files changed, 179 insertions(+), 74 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 6b5f15e..140e7db 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -28,7 +28,6 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.DataStreamServerFactory;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.statemachine.StateMachine;
 
 public class NettyDataStreamFactory implements DataStreamServerFactory, 
DataStreamClientFactory {
   public NettyDataStreamFactory(Parameters parameters){}
@@ -44,14 +43,7 @@ public class NettyDataStreamFactory implements 
DataStreamServerFactory, DataStre
   }
 
   @Override
-  public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, 
StateMachine stateMachine,
-      RaftProperties properties) {
-    return new NettyServerStreamRpc(server, stateMachine, properties);
-  }
-
-  @Override
-  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, 
StateMachine stateMachine,
-      RaftProperties properties) {
-    return new NettyServerStreamRpc(server, stateMachine, properties);
+  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, 
RaftProperties properties) {
+    return new NettyServerStreamRpc(server, properties);
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 440ccda..63beca7 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -31,7 +31,6 @@ import 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.StateMachine;
@@ -47,7 +46,6 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -203,25 +201,15 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
   private final ChannelFuture channelFuture;
 
-  private final StateMachine stateMachine;
   private final StreamMap streams = new StreamMap();
 
   private final Proxies proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, 
RaftProperties properties) {
-    this(null, server.getId(), stateMachine, properties,
-        NetUtils.createSocketAddr(server.getAddress()).getPort());
-  }
-
-  public NettyServerStreamRpc(RaftServer server, StateMachine stateMachine, 
RaftProperties properties) {
-    this(server, server.getId(), stateMachine, properties, 
NettyConfigKeys.DataStream.port(server.getProperties()));
-  }
-
-  public NettyServerStreamRpc(
-      RaftServer server, RaftPeerId id, StateMachine stateMachine, 
RaftProperties properties, int port) {
+  public NettyServerStreamRpc(RaftServer server, RaftProperties properties) {
     this.server = server;
-    this.name = id + "-" + getClass().getSimpleName();
-    this.stateMachine = stateMachine;
+    this.name = server.getId() + "-" + getClass().getSimpleName();
+
+    final int port = NettyConfigKeys.DataStream.port(server.getProperties());
     this.channelFuture = new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
@@ -248,6 +236,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
+      final StateMachine stateMachine = 
server.getStateMachine(request.getRaftGroupId());
       return new StreamInfo(request, stateMachine.data().stream(request), 
proxies.getDataStreamOutput());
     } catch (Throwable e) {
       throw new CompletionException(e);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index 63589b0..65056c9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -21,8 +21,6 @@ import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.DataStreamFactory;
 import org.apache.ratis.datastream.DataStreamType;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.statemachine.StateMachine;
 
 /** A {@link DataStreamFactory} to create server-side objects. */
 public interface DataStreamServerFactory extends DataStreamFactory {
@@ -36,8 +34,5 @@ public interface DataStreamServerFactory extends 
DataStreamFactory {
   }
 
   /** Create a new {@link DataStreamServerRpc}. */
-  DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine 
stateMachine, RaftProperties properties);
-
-  /** Create a new {@link DataStreamServerRpc}. */
-  DataStreamServerRpc newDataStreamServerRpc(RaftServer server, StateMachine 
stateMachine, RaftProperties properties);
+  DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties 
properties);
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
index a4762fa..89d3e3a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
@@ -21,7 +21,6 @@ import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.statemachine.StateMachine;
 
 import java.util.Collection;
 
@@ -30,23 +29,7 @@ public class DisabledDataStreamServerFactory implements 
DataStreamServerFactory
   public DisabledDataStreamServerFactory(Parameters parameters) {}
 
   @Override
-  public DataStreamServerRpc newDataStreamServerRpc(
-      RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
-    return new DataStreamServerRpc() {
-      @Override
-      public void start() {}
-
-      @Override
-      public void close() {}
-
-      @Override
-      public void addRaftPeers(Collection<RaftPeer> peers) {}
-    };
-  }
-
-  @Override
-  public DataStreamServerRpc newDataStreamServerRpc(
-      RaftServer server, StateMachine stateMachine, RaftProperties properties) 
{
+  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, 
RaftProperties properties) {
     return new DataStreamServerRpc() {
       @Override
       public void start() {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index c50a80a..30ca21a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -47,6 +47,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
   /** @return the groups the server is part of. */
   Iterable<RaftGroup> getGroups() throws IOException;
 
+  StateMachine getStateMachine(RaftGroupId groupId) throws IOException;
+
   /** @return the server properties. */
   RaftProperties getProperties();
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 4ccbfc8..d41ea18 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -23,12 +23,10 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServer;
 import org.apache.ratis.server.DataStreamServerFactory;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,20 +35,11 @@ public class DataStreamServerImpl implements 
DataStreamServer {
 
   private final DataStreamServerRpc serverRpc;
 
-  public DataStreamServerImpl(RaftPeer server, StateMachine stateMachine,
-      RaftProperties properties, Parameters parameters){
+  public DataStreamServerImpl(RaftServer server, RaftProperties properties, 
Parameters parameters) {
     final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
 
     this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
-        .newDataStreamServerRpc(server, stateMachine, properties);
-  }
-
-  public DataStreamServerImpl(RaftServer server, StateMachine stateMachine,
-      RaftProperties properties, Parameters parameters){
-    final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
-
-    this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
-        .newDataStreamServerRpc(server, stateMachine, properties);
+        .newDataStreamServerRpc(server, properties);
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 4f700d9..fbe9862 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -182,11 +182,7 @@ public class RaftServerProxy implements RaftServer {
     this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
 
     this.serverRpc = factory.newRaftServerRpc(this);
-
-    // TODO: Support multi-raft and should pass StateMachineRegistry to 
DataStreamServerImpl instead of StateMachine
-    this.dataStreamServerRpc =
-        new DataStreamServerImpl(this, 
stateMachineRegistry.apply(RaftGroupId.emptyGroupId()), properties, null)
-            .getServerRpc();
+    this.dataStreamServerRpc = new DataStreamServerImpl(this, properties, 
null).getServerRpc();
 
     this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
     this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
@@ -309,6 +305,11 @@ public class RaftServerProxy implements RaftServer {
   }
 
   @Override
+  public StateMachine getStateMachine(RaftGroupId groupId) throws IOException {
+    return getImpl(groupId).getStateMachine();
+  }
+
+  @Override
   public LifeCycle.State getLifeCycleState() {
     return lifeCycle.getCurrentState();
   }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
similarity index 69%
rename from 
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
rename to 
ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 80c5a80..dd8505f 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamBase.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -22,16 +22,32 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.client.impl.DataStreamClientImpl;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.GroupListReply;
+import org.apache.ratis.protocol.GroupListRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
 
@@ -47,7 +63,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
-class TestDataStreamBase extends BaseTest {
+abstract class DataStreamBaseTest extends BaseTest {
   static final int MODULUS = 23;
 
   static byte pos2byte(int pos) {
@@ -137,6 +153,132 @@ class TestDataStreamBase extends BaseTest {
   private List<RaftPeer> peers;
   private List<MultiDataStreamStateMachine> stateMachines;
 
+  protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) 
{
+    final ConcurrentMap<RaftGroupId, StateMachine> stateMachines = new 
ConcurrentHashMap<>();
+
+    return new RaftServer() {
+      @Override
+      public RaftPeerId getId() {
+        return peer.getId();
+      }
+
+      @Override
+      public StateMachine getStateMachine(RaftGroupId groupId) {
+        return stateMachines.computeIfAbsent(groupId, key -> new 
MultiDataStreamStateMachine());
+      }
+
+      @Override
+      public RaftProperties getProperties() {
+        return properties;
+      }
+
+
+      @Override
+      public RequestVoteReplyProto requestVote(RequestVoteRequestProto 
request) {
+        return null;
+      }
+
+      @Override
+      public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request) {
+        return null;
+      }
+
+      @Override
+      public InstallSnapshotReplyProto 
installSnapshot(InstallSnapshotRequestProto request) {
+        return null;
+      }
+
+      @Override
+      public CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request) {
+        return null;
+      }
+
+      @Override
+      public RpcType getRpcType() {
+        return null;
+      }
+
+      @Override
+      public RaftClientReply submitClientRequest(RaftClientRequest request) {
+        return null;
+      }
+
+      @Override
+      public RaftClientReply setConfiguration(SetConfigurationRequest request) 
{
+        return null;
+      }
+
+      @Override
+      public CompletableFuture<RaftClientReply> 
submitClientRequestAsync(RaftClientRequest request) {
+        return null;
+      }
+
+      @Override
+      public CompletableFuture<RaftClientReply> 
setConfigurationAsync(SetConfigurationRequest request) {
+        return null;
+      }
+
+      @Override
+      public GroupListReply getGroupList(GroupListRequest request) {
+        return null;
+      }
+
+      @Override
+      public GroupInfoReply getGroupInfo(GroupInfoRequest request) {
+        return null;
+      }
+
+      @Override
+      public RaftClientReply groupManagement(GroupManagementRequest request) {
+        return null;
+      }
+
+      @Override
+      public CompletableFuture<GroupListReply> 
getGroupListAsync(GroupListRequest request) {
+        return null;
+      }
+
+      @Override
+      public CompletableFuture<GroupInfoReply> 
getGroupInfoAsync(GroupInfoRequest request) {
+        return null;
+      }
+
+      @Override
+      public CompletableFuture<RaftClientReply> 
groupManagementAsync(GroupManagementRequest request) {
+        return null;
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Iterable<RaftGroupId> getGroupIds() {
+        return null;
+      }
+
+      @Override
+      public Iterable<RaftGroup> getGroups() {
+        return null;
+      }
+
+      @Override
+      public ServerFactory getFactory() {
+        return null;
+      }
+
+      @Override
+      public void start() {
+      }
+
+      @Override
+      public LifeCycle.State getLifeCycleState() {
+        return null;
+      }
+    };
+  }
+
+
   protected void setup(int numServers){
     peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
         .map(RaftPeerId::valueOf)
@@ -148,8 +290,9 @@ class TestDataStreamBase extends BaseTest {
     for (int i = 0; i < peers.size(); i++) {
       final MultiDataStreamStateMachine stateMachine = new 
MultiDataStreamStateMachine();
       stateMachines.add(stateMachine);
-      final DataStreamServerImpl streamServer = new DataStreamServerImpl(
-          peers.get(i), stateMachine, properties, null);
+      final RaftPeer peer = peers.get(i);
+      final RaftServer server = newRaftServer(peer, properties);
+      final DataStreamServerImpl streamServer = new 
DataStreamServerImpl(server, properties, null);
       final DataStreamServerRpc rpc = streamServer.getServerRpc();
       if (i == 0) {
         // only the first server routes requests to peers.
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 0414704..363fa4e 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -26,7 +26,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-public class TestDataStreamDisabled extends TestDataStreamBase {
+public class TestDataStreamDisabled extends DataStreamBaseTest {
   @Rule
   public final ExpectedException exception = ExpectedException.none();
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index b980dbd..93922d3 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -20,17 +20,28 @@ package org.apache.ratis.datastream;
 
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.NetUtils;
 import org.junit.Before;
 import org.junit.Test;
 
 
-public class TestDataStreamNetty extends TestDataStreamBase {
+public class TestDataStreamNetty extends DataStreamBaseTest {
   @Before
   public void setup() {
     properties = new RaftProperties();
     RaftConfigKeys.DataStream.setType(properties, 
SupportedDataStreamType.NETTY);
   }
 
+  @Override
+  protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) 
{
+    final RaftProperties p = new RaftProperties(properties);
+    NettyConfigKeys.DataStream.setPort(p,  
NetUtils.createSocketAddr(peer.getAddress()).getPort());
+    return super.newRaftServer(peer, p);
+  }
+
   @Test
   public void testDataStreamSingleServer() throws Exception {
     runTestDataStream(1, 2, 3, 1_000_000, 10);

Reply via email to