This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4e78988 RATIS-1371. Support multi-client when transfer data between
servers (#474)
4e78988 is described below
commit 4e7898812dea82a61fdf178b68f177521684f517
Author: runzhiwang <[email protected]>
AuthorDate: Wed Apr 28 19:12:51 2021 +0800
RATIS-1371. Support multi-client when transfer data between servers (#474)
---
.../ratis/netty/server/NettyServerStreamRpc.java | 25 +++++++++++++++++-----
.../apache/ratis/server/RaftServerConfigKeys.java | 13 +++++++++++
2 files changed, 33 insertions(+), 5 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index a653d9b..5c46576 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
@@ -54,6 +55,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -111,15 +113,20 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private final ChannelFuture channelFuture;
private final DataStreamManagement requests;
- private final Proxies proxies;
+ private final List<Proxies> proxies = new ArrayList<>();
public NettyServerStreamRpc(RaftServer server) {
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
this.requests = new DataStreamManagement(server);
final RaftProperties properties = server.getProperties();
+
+ int clientPoolSize =
RaftServerConfigKeys.DataStream.clientPoolSize(properties);
+ for (int i = 0; i < clientPoolSize; i ++) {
+ this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer ->
newClient(peer, properties))));
+ }
+
final int port = NettyConfigKeys.DataStream.port(properties);
- this.proxies = new Proxies(new PeerProxyMap<>(name, peer ->
newClient(peer, properties)));
this.channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@@ -139,7 +146,9 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void addRaftPeers(Collection<RaftPeer> newPeers) {
- proxies.addPeers(newPeers);
+ for (int i = 0; i < proxies.size(); i ++) {
+ proxies.get(i).addPeers(newPeers);
+ }
}
static class RequestRef {
@@ -174,7 +183,11 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
final DataStreamRequestByteBuf request =
requestRef.set((DataStreamRequestByteBuf)msg);
- requests.read(request, ctx, proxies::getDataStreamOutput);
+
+ int index = Math.toIntExact(
+ ((0xFFFFFFFFL & request.getClientId().hashCode()) +
request.getStreamId()) % proxies.size());
+ requests.read(request, ctx, proxies.get(index)::getDataStreamOutput);
+
requestRef.reset(request);
}
@@ -243,7 +256,9 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
LOG.error(this + ": Interrupted close()", e);
}
- proxies.close();
+ for (int i = 0; i < proxies.size(); i ++) {
+ proxies.get(i).close();
+ }
}
@Override
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9d5da7d..9b5d86b 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -445,6 +445,19 @@ public interface RaftServerConfigKeys {
static void setAsyncWriteThreadPoolSize(RaftProperties properties, int
port) {
setInt(properties::setInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY, port);
}
+
+ String CLIENT_POOL_SIZE_KEY = PREFIX + ".client.pool.size";
+ int CLIENT_POOL_SIZE_DEFAULT = 10;
+
+ static int clientPoolSize(RaftProperties properties) {
+ return getInt(properties::getInt, CLIENT_POOL_SIZE_KEY,
+ CLIENT_POOL_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+
+ static void setClientPoolSize(RaftProperties properties, int num) {
+ setInt(properties::setInt, CLIENT_POOL_SIZE_KEY, num);
+ }
}
/** server rpc timeout related */