This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e78988  RATIS-1371. Support multi-client when transfer data between 
servers (#474)
4e78988 is described below

commit 4e7898812dea82a61fdf178b68f177521684f517
Author: runzhiwang <[email protected]>
AuthorDate: Wed Apr 28 19:12:51 2021 +0800

    RATIS-1371. Support multi-client when transfer data between servers (#474)
---
 .../ratis/netty/server/NettyServerStreamRpc.java   | 25 +++++++++++++++++-----
 .../apache/ratis/server/RaftServerConfigKeys.java  | 13 +++++++++++
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index a653d9b..5c46576 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
@@ -54,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -111,15 +113,20 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private final ChannelFuture channelFuture;
 
   private final DataStreamManagement requests;
-  private final Proxies proxies;
+  private final List<Proxies> proxies = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftServer server) {
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
     this.requests = new DataStreamManagement(server);
 
     final RaftProperties properties = server.getProperties();
+
+    int clientPoolSize = 
RaftServerConfigKeys.DataStream.clientPoolSize(properties);
+    for (int i = 0; i < clientPoolSize; i ++) {
+      this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties))));
+    }
+
     final int port = NettyConfigKeys.DataStream.port(properties);
-    this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties)));
     this.channelFuture = new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
@@ -139,7 +146,9 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   @Override
   public void addRaftPeers(Collection<RaftPeer> newPeers) {
-    proxies.addPeers(newPeers);
+    for (int i = 0; i < proxies.size(); i ++) {
+      proxies.get(i).addPeers(newPeers);
+    }
   }
 
   static class RequestRef {
@@ -174,7 +183,11 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         }
 
         final DataStreamRequestByteBuf request = 
requestRef.set((DataStreamRequestByteBuf)msg);
-        requests.read(request, ctx, proxies::getDataStreamOutput);
+
+        int index = Math.toIntExact(
+            ((0xFFFFFFFFL & request.getClientId().hashCode()) + 
request.getStreamId()) % proxies.size());
+        requests.read(request, ctx, proxies.get(index)::getDataStreamOutput);
+
         requestRef.reset(request);
       }
 
@@ -243,7 +256,9 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       LOG.error(this + ": Interrupted close()", e);
     }
 
-    proxies.close();
+    for (int i = 0; i < proxies.size(); i ++) {
+      proxies.get(i).close();
+    }
   }
 
   @Override
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9d5da7d..9b5d86b 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -445,6 +445,19 @@ public interface RaftServerConfigKeys {
     static void setAsyncWriteThreadPoolSize(RaftProperties properties, int 
port) {
       setInt(properties::setInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY, port);
     }
+
+    String CLIENT_POOL_SIZE_KEY = PREFIX + ".client.pool.size";
+    int CLIENT_POOL_SIZE_DEFAULT = 10;
+
+    static int clientPoolSize(RaftProperties properties) {
+      return getInt(properties::getInt, CLIENT_POOL_SIZE_KEY,
+          CLIENT_POOL_SIZE_DEFAULT, getDefaultLog(),
+          requireMin(0), requireMax(65536));
+    }
+
+    static void setClientPoolSize(RaftProperties properties, int num) {
+      setInt(properties::setInt, CLIENT_POOL_SIZE_KEY, num);
+    }
   }
 
   /** server rpc timeout related */

Reply via email to