This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 8b8bddab8d8aff8566b7ae84c6190344e49f74c7 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Nov 30 09:56:09 2022 -0800 RATIS-1753. NettyServerStreamRpc should create client with TlsConf. (#791) (cherry picked from commit 2e9012f1954855fd5544b7d37cd16aab9660c184) --- .../java/org/apache/ratis/netty/NettyDataStreamFactory.java | 2 +- .../java/org/apache/ratis/netty/NettyDataStreamUtils.java | 1 + .../org/apache/ratis/netty/server/NettyServerStreamRpc.java | 13 ++++++++----- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java index 11aa0514a..44da8fcde 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java @@ -50,6 +50,6 @@ public class NettyDataStreamFactory implements DataStreamServerFactory, DataStre @Override public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) { - return new NettyServerStreamRpc(server, NettyConfigKeys.DataStream.Server.tlsConf(parameters)); + return new NettyServerStreamRpc(server, parameters); } } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java index 10f35157c..f51451142 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java @@ -110,6 +110,7 @@ public interface NettyDataStreamUtils { static void encodeByteBuffer(ByteBuffer buffer, Consumer<Object> out) { if (buffer.remaining() == 0) { + out.accept(Unpooled.EMPTY_BUFFER); // to avoid EncoderException: must produce at least one message return; } out.accept(Unpooled.wrappedBuffer(buffer)); diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java index 311c4c5c7..70cb47026 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java @@ -20,6 +20,7 @@ package org.apache.ratis.netty.server; import org.apache.ratis.client.DataStreamClient; import org.apache.ratis.client.DataStreamOutputRpc; +import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer; import org.apache.ratis.netty.NettyConfigKeys; @@ -122,11 +123,11 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { static class ProxiesPool { private final List<Proxies> list; - ProxiesPool(String name, RaftProperties properties) { + ProxiesPool(String name, RaftProperties properties, Parameters parameters) { final int clientPoolSize = RaftServerConfigKeys.DataStream.clientPoolSize(properties); final List<Proxies> proxies = new ArrayList<>(clientPoolSize); for (int i = 0; i < clientPoolSize; i++) { - proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)))); + proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties, parameters)))); } this.list = Collections.unmodifiableList(proxies); } @@ -155,13 +156,13 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { private final NettyServerStreamRpcMetrics metrics; - public NettyServerStreamRpc(RaftServer server, TlsConf tlsConf) { + public NettyServerStreamRpc(RaftServer server, Parameters parameters) { this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass()); this.metrics = new NettyServerStreamRpcMetrics(this.name); this.requests = new DataStreamManagement(server, metrics); final RaftProperties properties = server.getProperties(); - this.proxies = new ProxiesPool(name, properties); + this.proxies = new ProxiesPool(name, properties, parameters); final boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties); this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup", @@ -169,6 +170,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { this.workerGroup = NettyUtils.newEventLoopGroup(name + "-workerGroup", NettyConfigKeys.DataStream.Server.workerGroupSize(properties), useEpoll); + final TlsConf tlsConf = NettyConfigKeys.DataStream.Server.tlsConf(parameters); final SslContext sslContext = NettyUtils.buildSslContextForServer(tlsConf); final String host = NettyConfigKeys.DataStream.host(server.getProperties()); final int port = NettyConfigKeys.DataStream.port(properties); @@ -185,11 +187,12 @@ public class NettyServerStreamRpc implements DataStreamServerRpc { .bind(socketAddress); } - static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) { + static DataStreamClient newClient(RaftPeer peer, RaftProperties properties, Parameters parameters) { return DataStreamClient.newBuilder() .setClientId(ClientId.randomId()) .setDataStreamServer(peer) .setProperties(properties) + .setParameters(parameters) .build(); }
