This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit fe0991a9eadf54ed6e792044a1fe3739b57dd699 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Mon Jul 4 00:04:26 2022 -0700 RATIS-1602. Add a ProxiesPool inner class in NettyServerStreamRpc. (#660) --- .../java/org/apache/ratis/util/Preconditions.java | 5 ++ .../ratis/netty/server/DataStreamManagement.java | 2 +- .../ratis/netty/server/NettyServerStreamRpc.java | 66 ++++++++++++++-------- 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java index ce56a40f..196bd992 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java @@ -78,6 +78,11 @@ public interface Preconditions { () -> name + ": expected == " + expected + " but computed == " + computed); } + static void assertSame(Object expected, Object computed, String name) { + assertTrue(expected == computed, + () -> name + ": expected == " + expected + " but computed == " + computed); + } + static void assertNull(Object object, Supplier<String> message) { assertTrue(object == null, message); } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 81c3aa1b..65e25f4d 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -407,8 +407,8 @@ public class DataStreamManagement { try { readImpl(request, ctx, buf, getStreams); } catch (Throwable t) { + replyDataStreamException(t, request, ctx); buf.release(); - throw t; } } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java index 4f862928..dd79d839 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java @@ -27,6 +27,7 @@ import org.apache.ratis.netty.NettyDataStreamUtils; import org.apache.ratis.netty.NettyUtils; import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.DataStreamPacket; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.security.TlsConf; @@ -57,6 +58,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,8 +66,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -115,13 +119,39 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { } } + static class ProxiesPool { + private final List<Proxies> list; + + ProxiesPool(String name, RaftProperties properties) { + final int clientPoolSize = RaftServerConfigKeys.DataStream.clientPoolSize(properties); + final List<Proxies> proxies = new ArrayList<>(clientPoolSize); + for (int i = 0; i < clientPoolSize; i++) { + proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)))); + } + this.list = Collections.unmodifiableList(proxies); + } + + void addRaftPeers(Collection<RaftPeer> newPeers) { + list.forEach(proxy -> proxy.addPeers(newPeers)); + } + + Proxies get(DataStreamPacket p) { + final long hash = Integer.toUnsignedLong(Objects.hash(p.getClientId(), p.getStreamId())); + return list.get(Math.toIntExact(hash % list.size())); + } + + void close() { + list.forEach(Proxies::close); + } + } + private final String name; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; private final ChannelFuture channelFuture; private final DataStreamManagement requests; - private final List<Proxies> proxies = new ArrayList<>(); + private final ProxiesPool proxies; private final NettyServerStreamRpcMetrics metrics; @@ -131,11 +161,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { this.requests = new DataStreamManagement(server, metrics); final RaftProperties properties = server.getProperties(); - - int clientPoolSize = RaftServerConfigKeys.DataStream.clientPoolSize(properties); - for (int i = 0; i < clientPoolSize; i ++) { - this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)))); - } + this.proxies = new ProxiesPool(name, properties); final boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties); this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup", @@ -166,22 +192,17 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { @Override public void addRaftPeers(Collection<RaftPeer> newPeers) { - proxies.forEach(proxy -> proxy.addPeers(newPeers)); + proxies.addRaftPeers(newPeers); } static class RequestRef { private final AtomicReference<DataStreamRequestByteBuf> ref = new AtomicReference<>(); - DataStreamRequestByteBuf set(DataStreamRequestByteBuf current) { - Optional.ofNullable(ref.getAndSet(current)).ifPresent(previous -> { - throw new IllegalStateException("previous = " + previous + " != null, current=" + current); - }); - return current; - } + UncheckedAutoCloseable set(DataStreamRequestByteBuf current) { + final DataStreamRequestByteBuf previous = ref.getAndUpdate(p -> p == null ? current : p); + Preconditions.assertNull(previous, () -> "previous = " + previous + " != null, current = " + current); - void reset(DataStreamRequestByteBuf expected) { - final DataStreamRequestByteBuf stored = ref.getAndSet(null); - Preconditions.assertTrue(stored == expected, () -> "Expected=" + expected + " but stored=" + stored); + return () -> Preconditions.assertSame(current, getAndSetNull(), "RequestRef"); } DataStreamRequestByteBuf getAndSetNull() { @@ -201,13 +222,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { return; } - final DataStreamRequestByteBuf request = requestRef.set((DataStreamRequestByteBuf)msg); - - int index = Math.toIntExact( - ((0xFFFFFFFFL & request.getClientId().hashCode()) + request.getStreamId()) % proxies.size()); - requests.read(request, ctx, proxies.get(index)::getDataStreamOutput); - - requestRef.reset(request); + final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg; + try(UncheckedAutoCloseable autoReset = requestRef.set(request)) { + requests.read(request, ctx, proxies.get(request)::getDataStreamOutput); + } } @Override @@ -280,7 +298,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { LOG.error(this + ": Interrupted close()", e); } - proxies.forEach(Proxies::close); + proxies.close(); } @Override
