This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 14e114ca RATIS-1602. Add a ProxiesPool inner class in 
NettyServerStreamRpc. (#660)
14e114ca is described below

commit 14e114ca1ff357f5516f6c7f9f453470498e0ae8
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jul 4 00:04:26 2022 -0700

    RATIS-1602. Add a ProxiesPool inner class in NettyServerStreamRpc. (#660)
---
 .../java/org/apache/ratis/util/Preconditions.java  |  5 ++
 .../ratis/netty/server/DataStreamManagement.java   |  2 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   | 66 ++++++++++++++--------
 3 files changed, 48 insertions(+), 25 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java 
b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index ce56a40f..196bd992 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -78,6 +78,11 @@ public interface Preconditions {
         () -> name + ": expected == " + expected + " but computed == " + 
computed);
   }
 
+  static void assertSame(Object expected, Object computed, String name) {
+    assertTrue(expected == computed,
+        () -> name + ": expected == " + expected + " but computed == " + 
computed);
+  }
+
   static void assertNull(Object object, Supplier<String> message) {
     assertTrue(object == null, message);
   }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index ddef3ac9..593c2793 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -382,8 +382,8 @@ public class DataStreamManagement {
     try {
       readImpl(request, ctx, buf, getStreams);
     } catch (Throwable t) {
+      replyDataStreamException(t, request, ctx);
       buf.release();
-      throw t;
     }
   }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 4f862928..dd79d839 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -27,6 +27,7 @@ import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.netty.NettyUtils;
 import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.DataStreamPacket;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.security.TlsConf;
@@ -57,6 +58,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.PeerProxyMap;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,8 +66,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -115,13 +119,39 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     }
   }
 
+  static class ProxiesPool {
+    private final List<Proxies> list;
+
+    ProxiesPool(String name, RaftProperties properties) {
+      final int clientPoolSize = 
RaftServerConfigKeys.DataStream.clientPoolSize(properties);
+      final List<Proxies> proxies = new ArrayList<>(clientPoolSize);
+      for (int i = 0; i < clientPoolSize; i++) {
+        proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties))));
+      }
+      this.list = Collections.unmodifiableList(proxies);
+    }
+
+    void addRaftPeers(Collection<RaftPeer> newPeers) {
+      list.forEach(proxy -> proxy.addPeers(newPeers));
+    }
+
+    Proxies get(DataStreamPacket p) {
+      final long hash = Integer.toUnsignedLong(Objects.hash(p.getClientId(), 
p.getStreamId()));
+      return list.get(Math.toIntExact(hash % list.size()));
+    }
+
+    void close() {
+      list.forEach(Proxies::close);
+    }
+  }
+
   private final String name;
   private final EventLoopGroup bossGroup;
   private final EventLoopGroup workerGroup;
   private final ChannelFuture channelFuture;
 
   private final DataStreamManagement requests;
-  private final List<Proxies> proxies = new ArrayList<>();
+  private final ProxiesPool proxies;
 
   private final NettyServerStreamRpcMetrics metrics;
 
@@ -131,11 +161,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     this.requests = new DataStreamManagement(server, metrics);
 
     final RaftProperties properties = server.getProperties();
-
-    int clientPoolSize = 
RaftServerConfigKeys.DataStream.clientPoolSize(properties);
-    for (int i = 0; i < clientPoolSize; i ++) {
-      this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties))));
-    }
+    this.proxies = new ProxiesPool(name, properties);
 
     final boolean useEpoll = 
NettyConfigKeys.DataStream.Server.useEpoll(properties);
     this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup",
@@ -166,22 +192,17 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   @Override
   public void addRaftPeers(Collection<RaftPeer> newPeers) {
-    proxies.forEach(proxy -> proxy.addPeers(newPeers));
+    proxies.addRaftPeers(newPeers);
   }
 
   static class RequestRef {
     private final AtomicReference<DataStreamRequestByteBuf> ref = new 
AtomicReference<>();
 
-    DataStreamRequestByteBuf set(DataStreamRequestByteBuf current) {
-      Optional.ofNullable(ref.getAndSet(current)).ifPresent(previous -> {
-        throw new IllegalStateException("previous = " + previous + " != null, 
current=" + current);
-      });
-      return current;
-    }
+    UncheckedAutoCloseable set(DataStreamRequestByteBuf current) {
+      final DataStreamRequestByteBuf previous = ref.getAndUpdate(p -> p == 
null ? current : p);
+      Preconditions.assertNull(previous, () -> "previous = " + previous + " != 
null, current = " + current);
 
-    void reset(DataStreamRequestByteBuf expected) {
-      final DataStreamRequestByteBuf stored = ref.getAndSet(null);
-      Preconditions.assertTrue(stored == expected, () -> "Expected=" + 
expected + " but stored=" + stored);
+      return () -> Preconditions.assertSame(current, getAndSetNull(), 
"RequestRef");
     }
 
     DataStreamRequestByteBuf getAndSetNull() {
@@ -201,13 +222,10 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
           return;
         }
 
-        final DataStreamRequestByteBuf request = 
requestRef.set((DataStreamRequestByteBuf)msg);
-
-        int index = Math.toIntExact(
-            ((0xFFFFFFFFL & request.getClientId().hashCode()) + 
request.getStreamId()) % proxies.size());
-        requests.read(request, ctx, proxies.get(index)::getDataStreamOutput);
-
-        requestRef.reset(request);
+        final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
+        try(UncheckedAutoCloseable autoReset = requestRef.set(request)) {
+          requests.read(request, ctx, 
proxies.get(request)::getDataStreamOutput);
+        }
       }
 
       @Override
@@ -280,7 +298,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       LOG.error(this + ": Interrupted close()", e);
     }
 
-    proxies.forEach(Proxies::close);
+    proxies.close();
   }
 
   @Override

Reply via email to