This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c77c183  RATIS-1084. Support multiple groups in Streaming (#245). 
Contributed by Rui Wang
c77c183 is described below

commit c77c183c31d7d09b497c4f6500dc230fe5d82f43
Author: Rui Wang <[email protected]>
AuthorDate: Mon Nov 2 18:39:28 2020 -0800

    RATIS-1084. Support multiple groups in Streaming (#245). Contributed by Rui 
Wang
---
 .../org/apache/ratis/netty/server/NettyServerStreamRpc.java  | 12 +++++++-----
 .../java/org/apache/ratis/datastream/DataStreamBaseTest.java |  5 ++++-
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 29a85cf..041d64d 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -30,6 +30,7 @@ import 
org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
@@ -94,10 +95,10 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       peers.addAll(newPeers);
     }
 
-    List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    List<DataStreamOutput> getDataStreamOutput(RaftGroupId groupId) throws 
IOException {
       final List<DataStreamOutput> outs = new ArrayList<>();
       try {
-        getDataStreamOutput(outs);
+        getDataStreamOutput(outs, groupId);
       } catch (IOException e) {
         outs.forEach(CloseAsync::closeAsync);
         throw e;
@@ -105,10 +106,10 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       return outs;
     }
 
-    private void getDataStreamOutput(List<DataStreamOutput> outs) throws 
IOException {
+    private void getDataStreamOutput(List<DataStreamOutput> outs, RaftGroupId 
groupId) throws IOException {
       for (RaftPeer peer : peers) {
         try {
-          outs.add(map.getProxy(peer.getId()).stream());
+          outs.add(map.getProxy(peer.getId()).stream(groupId));
         } catch (IOException e) {
           throw new IOException(map.getName() + ": Failed to 
getDataStreamOutput for " + peer, e);
         }
@@ -245,7 +246,8 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final StateMachine stateMachine = 
server.getStateMachine(request.getRaftGroupId());
-      return new StreamInfo(request, stateMachine.data().stream(request), 
proxies.getDataStreamOutput());
+      return new StreamInfo(request, stateMachine.data().stream(request),
+          proxies.getDataStreamOutput(request.getRaftGroupId()));
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 739767a..67d2ecc 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -182,6 +182,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   protected RaftProperties properties;
 
   private List<Server> servers;
+  private RaftGroup raftGroup;
 
   Server getPrimaryServer() {
     return servers.get(0);
@@ -318,6 +319,7 @@ abstract class DataStreamBaseTest extends BaseTest {
         .map(RaftPeerId::valueOf)
         .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
         .collect(Collectors.toList());
+    raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
     servers = new ArrayList<>(peers.size());
     // start stream servers on raft peers.
     for (int i = 0; i < peers.size(); i++) {
@@ -363,7 +365,7 @@ abstract class DataStreamBaseTest extends BaseTest {
         clients.add(client);
         for (int i = 0; i < numStreams; i++) {
           futures.add(CompletableFuture.runAsync(
-              () -> runTestDataStream((DataStreamOutputImpl) client.stream(), 
bufferSize, bufferNum)));
+              () -> runTestDataStream((DataStreamOutputImpl) 
client.stream(raftGroup.getGroupId()), bufferSize, bufferNum)));
         }
       }
       Assert.assertEquals(numClients*numStreams, futures.size());
@@ -419,6 +421,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     final RaftClientRequest writeRequest = stream.getWriteRequest();
     Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
     Assert.assertEquals(writeRequest.getRaftGroupId(), 
header.getRaftGroupId());
+    Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
     Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
     Assert.assertEquals(dataSize, stream.getByteWritten());
   }

Reply via email to