This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 694fe6f  RATIS-1082. Netty stream server should forward the data to 
the other servers in the group (#213)
694fe6f is described below

commit 694fe6f5985db3a2ffb1608f08b78769f0ba5abf
Author: Rui Wang <[email protected]>
AuthorDate: Mon Oct 19 19:23:36 2020 -0700

    RATIS-1082. Netty stream server should forward the data to the other 
servers in the group (#213)
    
    * RATIS-1082. Netty stream server should forward the data to other server 
in the group
    
    * update
    
    * update
    
    * fixup! address comments
    
    * fixup! update
    
    * fixup! address comments
    
    * fixup! fix checkstyle
    
    * fixup! sddress comments
    
    * fixup! address comments
    
    * address comments
    
    * address comments
    
    * trigger new CI check
---
 .../org/apache/ratis/client/DataStreamClient.java  |   7 ++
 .../apache/ratis/client/api/DataStreamOutput.java  |   3 +
 .../ratis/client/impl/DataStreamClientImpl.java    |   2 +
 .../apache/ratis/netty/NettyDataStreamFactory.java |   7 ++
 .../ratis/netty/server/NettyServerStreamRpc.java   | 108 +++++++++++++++------
 .../ratis/server/DataStreamServerFactory.java      |   9 ++
 .../ratis/server/impl/DataStreamServerImpl.java    |  14 +++
 .../apache/ratis/datastream/TestDataStream.java    |  88 +++++++++++++----
 8 files changed, 192 insertions(+), 46 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index f49dc5f..201f3a8 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -42,6 +42,13 @@ public interface DataStreamClient extends DataStreamApi {
   /** close the client */
   void close();
 
+  /** start the client */
+  void start();
+
+  static Builder newBuilder() {
+    return new Builder();
+  }
+
   /** To build {@link DataStreamClient} objects */
   class Builder {
     private RaftPeer raftServer;
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index dd86569..39f62ca 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -27,4 +27,7 @@ import java.util.concurrent.CompletableFuture;
 public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
   /** Send out the data in the buffer asynchronously */
   CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
+
+  /** Get the future of the header request. */
+  CompletableFuture<DataStreamReply> getHeaderFuture();
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 96c0cf0..9149b74 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -98,6 +98,7 @@ public class DataStreamClientImpl implements DataStreamClient 
{
       return header;
     }
 
+    @Override
     public CompletableFuture<DataStreamReply> getHeaderFuture() {
       return headerFuture;
     }
@@ -123,6 +124,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     dataStreamClientRpc.closeClient();
   }
 
+  @Override
   public void start(){
     dataStreamClientRpc.startClient();
   }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 565148a..f9af46e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.netty;
 
+import java.util.List;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.DataStreamClientFactory;
 import org.apache.ratis.conf.Parameters;
@@ -46,4 +47,10 @@ public class NettyDataStreamFactory implements 
DataStreamServerFactory, DataStre
   public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, 
StateMachine stateMachine) {
     return new NettyServerStreamRpc(server, stateMachine);
   }
+
+  @Override
+  public DataStreamServerRpc newDataStreamServerRpc(
+      RaftPeer server, List<RaftPeer> peers, StateMachine stateMachine, 
RaftProperties properties) {
+    return new NettyServerStreamRpc(server, peers, stateMachine, properties);
+  }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index c8092ef..18a4695 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -18,9 +18,12 @@
 
 package org.apache.ratis.netty.server;
 
+import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
@@ -39,14 +42,16 @@ import org.apache.ratis.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -58,6 +63,9 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = 
new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput 
= new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
@@ -65,44 +73,45 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, 
AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> 
client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           
ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, 
ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
         request.getStreamId(), request.getStreamOffset(), 
ByteBuffer.wrap("OK".getBytes()));
@@ -115,10 +124,32 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new 
CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> 
getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> 
writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), 
getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < 
peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = 
peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }
+        } else {
+          // body
+          for (int i = 0; i < clients.size(); i++) {
+            parallelWrites[i + 1]  =
+              
peersStreamOutput.get(request.getStreamId()).get(i).writeAsync(request.slice().nioBuffer());
+          }
+        }
+        CompletableFuture.allOf(parallelWrites).whenComplete((t, r) -> {
+              buf.release();
+              sendReply(request, ctx);
+        });
       }
     };
   }
@@ -146,6 +177,16 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         .bind();
   }
 
+  private void setupClient(List<RaftPeer> otherPeers, RaftProperties 
properties) {
+    for (RaftPeer peer : otherPeers) {
+      clients.add(DataStreamClient.newBuilder()
+              .setParameters(null)
+              .setRaftServer(peer)
+              .setProperties(properties)
+              .build());
+    }
+  }
+
   private Channel getChannel() {
     return channelFuture.awaitUninterruptibly().channel();
   }
@@ -155,6 +196,13 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     channelFuture.syncUninterruptibly();
   }
 
+  // TODO: RATIS-1099 build connection with other server automatically.
+  public void startClientToPeers() {
+    for (DataStreamClient client : clients) {
+      client.start();
+    }
+  }
+
   @Override
   public void closeServer() {
     final ChannelFuture f = getChannel().close();
@@ -167,5 +215,9 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     } catch (InterruptedException e) {
       LOG.error("Interrupt EventLoopGroup terminate", e);
     }
+
+    for (DataStreamClient client : clients) {
+      client.close();
+    }
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index f2800d9..b9390ee 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.server;
 
+import java.util.List;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.DataStreamFactory;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.ServerFactory;
@@ -37,4 +39,11 @@ public interface DataStreamServerFactory extends 
DataStreamFactory {
    * Server implementation for streaming in Raft group
    */
   DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine 
stateMachine);
+
+  /**
+   * Server implementation for streaming in Raft group. The server will 
forward requests
+   * to peers.
+   */
+  DataStreamServerRpc newDataStreamServerRpc(
+      RaftPeer server, List<RaftPeer> peers, StateMachine stateMachine, 
RaftProperties properties);
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index bfa3fe7..4a3cdc7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.ratis.server.impl;
 
+import java.util.List;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
@@ -48,6 +49,19 @@ public class DataStreamServerImpl implements 
DataStreamServer {
         .newDataStreamServerRpc(raftServer, stateMachine);
   }
 
+  public DataStreamServerImpl(RaftPeer server,
+      RaftProperties properties,
+      Parameters parameters,
+      StateMachine stateMachine,
+      List<RaftPeer> peers){
+    this.raftServer = server;
+    this.stateMachine = stateMachine;
+    final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
+
+    this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+        .newDataStreamServerRpc(server, peers, stateMachine, properties);
+  }
+
   @Override
   public DataStreamServerRpc getServerRpc() {
     return serverRpc;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index cca6c94..84bbc79 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -18,11 +18,13 @@
 
 package org.apache.ratis.datastream;
 
+import java.util.stream.Collectors;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.client.impl.DataStreamClientImpl;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.server.NettyServerStreamRpc;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -50,6 +52,9 @@ public class TestDataStream extends BaseTest {
   }
 
   class SingleDataStreamStateMachine extends BaseStateMachine {
+    private int byteWritten = 0;
+    private RaftClientRequest writeRequest;
+
     final WritableByteChannel channel = new WritableByteChannel() {
       private volatile boolean open = true;
 
@@ -61,7 +66,7 @@ public class TestDataStream extends BaseTest {
         final int remaining = src.remaining();
         for(; src.remaining() > 0; ) {
           Assert.assertEquals(pos2byte(byteWritten), src.get());
-          byteWritten++;
+          byteWritten += 1;
         }
         return remaining;
       }
@@ -99,28 +104,58 @@ public class TestDataStream extends BaseTest {
       writeRequest = request;
       return CompletableFuture.completedFuture(stream);
     }
+
+    public int getByteWritten() {
+      return byteWritten;
+    }
+
+    public RaftClientRequest getWriteRequest() {
+      return writeRequest;
+    }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
-  private int byteWritten = 0;
-  private RaftClientRequest writeRequest;
+  private List<SingleDataStreamStateMachine> singleDataStreamStateMachines;
+
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    singleDataStreamStateMachines = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      SingleDataStreamStateMachine singleDataStreamStateMachine = new 
SingleDataStreamStateMachine();
+      singleDataStreamStateMachines.add(singleDataStreamStateMachine);
+      DataStreamServerImpl streamServer;
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, singleDataStreamStateMachine, 
otherPeers);
+      } else {
+        streamServer = new DataStreamServerImpl(
+            peers.get(i), singleDataStreamStateMachine, properties, null);
+      }
+      servers.add(streamServer);
+      streamServer.getServerRpc().startServer();
+    }
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new 
SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      ((NettyServerStreamRpc) 
streamServer.getServerRpc()).startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
@@ -128,8 +163,21 @@ public class TestDataStream extends BaseTest {
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){
+    properties = new RaftProperties();
+    peers = Arrays.asList(MiniRaftCluster.generateIds(3, 0)).stream()
+        .map(RaftPeerId::valueOf)
+        .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
 
     setupServer();
     setupClient();
@@ -162,12 +210,16 @@ public class TestDataStream extends BaseTest {
       f.join();
     }
 
-    Assert.assertEquals(writeRequest.getClientId(), 
impl.getHeader().getClientId());
-    Assert.assertEquals(writeRequest.getCallId(), 
impl.getHeader().getCallId());
-    Assert.assertEquals(writeRequest.getRaftGroupId(), 
impl.getHeader().getRaftGroupId());
-    Assert.assertEquals(writeRequest.getServerId(), 
impl.getHeader().getServerId());
+    for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
+      RaftClientRequest writeRequest = s.getWriteRequest();
+      if (writeRequest.getClientId().equals(impl.getHeader().getClientId())) {
+        Assert.assertEquals(writeRequest.getCallId(), 
impl.getHeader().getCallId());
+        Assert.assertEquals(writeRequest.getRaftGroupId(), 
impl.getHeader().getRaftGroupId());
+        Assert.assertEquals(writeRequest.getServerId(), 
impl.getHeader().getServerId());
+      }
+      Assert.assertEquals(dataSize, s.getByteWritten());
+    }
 
-    Assert.assertEquals(dataSize, byteWritten);
     shutDownSetup();
   }
 

Reply via email to