This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e9012f19 RATIS-1753. NettyServerStreamRpc should create client with 
TlsConf. (#791)
2e9012f19 is described below

commit 2e9012f1954855fd5544b7d37cd16aab9660c184
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Nov 30 09:56:09 2022 -0800

    RATIS-1753. NettyServerStreamRpc should create client with TlsConf. (#791)
---
 .../java/org/apache/ratis/netty/NettyDataStreamFactory.java |  2 +-
 .../java/org/apache/ratis/netty/NettyDataStreamUtils.java   |  1 +
 .../org/apache/ratis/netty/server/NettyServerStreamRpc.java | 13 ++++++++-----
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 11aa0514a..44da8fcde 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -50,6 +50,6 @@ public class NettyDataStreamFactory implements 
DataStreamServerFactory, DataStre
 
   @Override
   public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
-    return new NettyServerStreamRpc(server, 
NettyConfigKeys.DataStream.Server.tlsConf(parameters));
+    return new NettyServerStreamRpc(server, parameters);
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 10f35157c..f51451142 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -110,6 +110,7 @@ public interface NettyDataStreamUtils {
 
   static void encodeByteBuffer(ByteBuffer buffer, Consumer<Object> out) {
     if (buffer.remaining() == 0) {
+      out.accept(Unpooled.EMPTY_BUFFER); // to avoid EncoderException: must 
produce at least one message
       return;
     }
     out.accept(Unpooled.wrappedBuffer(buffer));
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 311c4c5c7..70cb47026 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,6 +20,7 @@ package org.apache.ratis.netty.server;
 
 import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.netty.NettyConfigKeys;
@@ -122,11 +123,11 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   static class ProxiesPool {
     private final List<Proxies> list;
 
-    ProxiesPool(String name, RaftProperties properties) {
+    ProxiesPool(String name, RaftProperties properties, Parameters parameters) 
{
       final int clientPoolSize = 
RaftServerConfigKeys.DataStream.clientPoolSize(properties);
       final List<Proxies> proxies = new ArrayList<>(clientPoolSize);
       for (int i = 0; i < clientPoolSize; i++) {
-        proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties))));
+        proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> 
newClient(peer, properties, parameters))));
       }
       this.list = Collections.unmodifiableList(proxies);
     }
@@ -155,13 +156,13 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   private final NettyServerStreamRpcMetrics metrics;
 
-  public NettyServerStreamRpc(RaftServer server, TlsConf tlsConf) {
+  public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
     this.metrics = new NettyServerStreamRpcMetrics(this.name);
     this.requests = new DataStreamManagement(server, metrics);
 
     final RaftProperties properties = server.getProperties();
-    this.proxies = new ProxiesPool(name, properties);
+    this.proxies = new ProxiesPool(name, properties, parameters);
 
     final boolean useEpoll = 
NettyConfigKeys.DataStream.Server.useEpoll(properties);
     this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup",
@@ -169,6 +170,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     this.workerGroup = NettyUtils.newEventLoopGroup(name + "-workerGroup",
         NettyConfigKeys.DataStream.Server.workerGroupSize(properties), 
useEpoll);
 
+    final TlsConf tlsConf = 
NettyConfigKeys.DataStream.Server.tlsConf(parameters);
     final SslContext sslContext = NettyUtils.buildSslContextForServer(tlsConf);
     final String host = 
NettyConfigKeys.DataStream.host(server.getProperties());
     final int port = NettyConfigKeys.DataStream.port(properties);
@@ -185,11 +187,12 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         .bind(socketAddress);
   }
 
-  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties, 
Parameters parameters) {
     return DataStreamClient.newBuilder()
         .setClientId(ClientId.randomId())
         .setDataStreamServer(peer)
         .setProperties(properties)
+        .setParameters(parameters)
         .build();
   }
 

Reply via email to