This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 14e114ca RATIS-1602. Add a ProxiesPool inner class in
NettyServerStreamRpc. (#660)
14e114ca is described below
commit 14e114ca1ff357f5516f6c7f9f453470498e0ae8
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jul 4 00:04:26 2022 -0700
RATIS-1602. Add a ProxiesPool inner class in NettyServerStreamRpc. (#660)
---
.../java/org/apache/ratis/util/Preconditions.java | 5 ++
.../ratis/netty/server/DataStreamManagement.java | 2 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 66 ++++++++++++++--------
3 files changed, 48 insertions(+), 25 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index ce56a40f..196bd992 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -78,6 +78,11 @@ public interface Preconditions {
() -> name + ": expected == " + expected + " but computed == " +
computed);
}
+ static void assertSame(Object expected, Object computed, String name) {
+ assertTrue(expected == computed,
+ () -> name + ": expected == " + expected + " but computed == " +
computed);
+ }
+
static void assertNull(Object object, Supplier<String> message) {
assertTrue(object == null, message);
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index ddef3ac9..593c2793 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -382,8 +382,8 @@ public class DataStreamManagement {
try {
readImpl(request, ctx, buf, getStreams);
} catch (Throwable t) {
+ replyDataStreamException(t, request, ctx);
buf.release();
- throw t;
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 4f862928..dd79d839 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -27,6 +27,7 @@ import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.security.TlsConf;
@@ -57,6 +58,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,8 +66,10 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -115,13 +119,39 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
}
+ static class ProxiesPool {
+ private final List<Proxies> list;
+
+ ProxiesPool(String name, RaftProperties properties) {
+ final int clientPoolSize =
RaftServerConfigKeys.DataStream.clientPoolSize(properties);
+ final List<Proxies> proxies = new ArrayList<>(clientPoolSize);
+ for (int i = 0; i < clientPoolSize; i++) {
+ proxies.add(new Proxies(new PeerProxyMap<>(name, peer ->
newClient(peer, properties))));
+ }
+ this.list = Collections.unmodifiableList(proxies);
+ }
+
+ void addRaftPeers(Collection<RaftPeer> newPeers) {
+ list.forEach(proxy -> proxy.addPeers(newPeers));
+ }
+
+ Proxies get(DataStreamPacket p) {
+ final long hash = Integer.toUnsignedLong(Objects.hash(p.getClientId(),
p.getStreamId()));
+ return list.get(Math.toIntExact(hash % list.size()));
+ }
+
+ void close() {
+ list.forEach(Proxies::close);
+ }
+ }
+
private final String name;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final ChannelFuture channelFuture;
private final DataStreamManagement requests;
- private final List<Proxies> proxies = new ArrayList<>();
+ private final ProxiesPool proxies;
private final NettyServerStreamRpcMetrics metrics;
@@ -131,11 +161,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
this.requests = new DataStreamManagement(server, metrics);
final RaftProperties properties = server.getProperties();
-
- int clientPoolSize =
RaftServerConfigKeys.DataStream.clientPoolSize(properties);
- for (int i = 0; i < clientPoolSize; i ++) {
- this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer ->
newClient(peer, properties))));
- }
+ this.proxies = new ProxiesPool(name, properties);
final boolean useEpoll =
NettyConfigKeys.DataStream.Server.useEpoll(properties);
this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup",
@@ -166,22 +192,17 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void addRaftPeers(Collection<RaftPeer> newPeers) {
- proxies.forEach(proxy -> proxy.addPeers(newPeers));
+ proxies.addRaftPeers(newPeers);
}
static class RequestRef {
private final AtomicReference<DataStreamRequestByteBuf> ref = new
AtomicReference<>();
- DataStreamRequestByteBuf set(DataStreamRequestByteBuf current) {
- Optional.ofNullable(ref.getAndSet(current)).ifPresent(previous -> {
- throw new IllegalStateException("previous = " + previous + " != null,
current=" + current);
- });
- return current;
- }
+ UncheckedAutoCloseable set(DataStreamRequestByteBuf current) {
+ final DataStreamRequestByteBuf previous = ref.getAndUpdate(p -> p ==
null ? current : p);
+ Preconditions.assertNull(previous, () -> "previous = " + previous + " !=
null, current = " + current);
- void reset(DataStreamRequestByteBuf expected) {
- final DataStreamRequestByteBuf stored = ref.getAndSet(null);
- Preconditions.assertTrue(stored == expected, () -> "Expected=" +
expected + " but stored=" + stored);
+ return () -> Preconditions.assertSame(current, getAndSetNull(),
"RequestRef");
}
DataStreamRequestByteBuf getAndSetNull() {
@@ -201,13 +222,10 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return;
}
- final DataStreamRequestByteBuf request =
requestRef.set((DataStreamRequestByteBuf)msg);
-
- int index = Math.toIntExact(
- ((0xFFFFFFFFL & request.getClientId().hashCode()) +
request.getStreamId()) % proxies.size());
- requests.read(request, ctx, proxies.get(index)::getDataStreamOutput);
-
- requestRef.reset(request);
+ final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
+ try(UncheckedAutoCloseable autoReset = requestRef.set(request)) {
+ requests.read(request, ctx,
proxies.get(request)::getDataStreamOutput);
+ }
}
@Override
@@ -280,7 +298,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
LOG.error(this + ": Interrupted close()", e);
}
- proxies.forEach(Proxies::close);
+ proxies.close();
}
@Override