This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d4c4ae30 RATIS-2139. Fix checkstyle:ParameterNumber in GrpcService. 
(#1134)
8d4c4ae30 is described below

commit 8d4c4ae3082f24e22aab454ec144f384ec6e1b1a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Aug 14 11:09:29 2024 -0700

    RATIS-2139. Fix checkstyle:ParameterNumber in GrpcService. (#1134)
---
 .../org/apache/ratis/grpc/server/GrpcService.java  | 268 ++++++++++++---------
 1 file changed, 152 insertions(+), 116 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 0802226f7..ce434c5e8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -23,7 +23,9 @@ import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
 import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
+import org.apache.ratis.protocol.AdminAsynchronousProtocol;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
@@ -31,7 +33,9 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpcWithProxy;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
 import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
+import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
 import org.apache.ratis.thirdparty.io.grpc.Server;
@@ -98,19 +102,141 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
 
   public static final class Builder {
     private RaftServer server;
+
+    private String adminHost;
+    private int adminPort;
     private GrpcTlsConfig adminTlsConfig;
+    private String clientHost;
+    private int clientPort;
     private GrpcTlsConfig clientTlsConfig;
+    private String serverHost;
+    private int serverPort;
     private GrpcTlsConfig serverTlsConfig;
 
+    private SizeInBytes messageSizeMax;
+    private SizeInBytes flowControlWindow;
+    private TimeDuration requestTimeoutDuration;
+    private boolean separateHeartbeatChannel;
+    private boolean zeroCopyEnabled;
+
     private Builder() {}
 
     public Builder setServer(RaftServer raftServer) {
       this.server = raftServer;
+
+      final RaftProperties properties = server.getProperties();
+      this.adminHost = GrpcConfigKeys.Admin.host(properties);
+      this.adminPort = GrpcConfigKeys.Admin.port(properties);
+      this.clientHost = GrpcConfigKeys.Client.host(properties);
+      this.clientPort = GrpcConfigKeys.Client.port(properties);
+      this.serverHost = GrpcConfigKeys.Server.host(properties);
+      this.serverPort = GrpcConfigKeys.Server.port(properties);
+      this.messageSizeMax = GrpcConfigKeys.messageSizeMax(properties, 
LOG::info);
+      this.flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, 
LOG::info);
+      this.requestTimeoutDuration = 
RaftServerConfigKeys.Rpc.requestTimeout(properties);
+      this.separateHeartbeatChannel = 
GrpcConfigKeys.Server.heartbeatChannel(properties);
+      this.zeroCopyEnabled = GrpcConfigKeys.Server.zeroCopyEnabled(properties);
+
+      final SizeInBytes appenderBufferSize = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+      final SizeInBytes gap = SizeInBytes.ONE_MB;
+      final long diff = messageSizeMax.getSize() - 
appenderBufferSize.getSize();
+      if (diff < gap.getSize()) {
+        throw new IllegalArgumentException("Illegal configuration: "
+            + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + "(= " + messageSizeMax
+            + ") must be " + gap + " larger than "
+            + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + "(= " 
+ appenderBufferSize + ").");
+      }
+
       return this;
     }
 
+    private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer 
target) {
+      return new GrpcServerProtocolClient(target, 
flowControlWindow.getSizeInt(),
+          requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
+    }
+
+    private ExecutorService newExecutor() {
+      final RaftProperties properties = server.getProperties();
+      return ConcurrentUtils.newThreadPoolWithMax(
+          GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties),
+          GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
+          server.getId() + "-request-");
+    }
+
+    private GrpcClientProtocolService newGrpcClientProtocolService(
+        ExecutorService executor, ZeroCopyMetrics zeroCopyMetrics) {
+      return new GrpcClientProtocolService(server::getId, server, executor, 
zeroCopyEnabled, zeroCopyMetrics);
+    }
+
+    private GrpcServerProtocolService 
newGrpcServerProtocolService(ZeroCopyMetrics zeroCopyMetrics) {
+      return new GrpcServerProtocolService(server::getId, server, 
zeroCopyEnabled, zeroCopyMetrics);
+    }
+
+    private MetricServerInterceptor newMetricServerInterceptor() {
+      return new MetricServerInterceptor(server::getId,
+          JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort);
+    }
+
+    private NettyServerBuilder newNettyServerBuilderForServer() {
+      return newNettyServerBuilder(serverHost, serverPort, serverTlsConfig);
+    }
+
+    private NettyServerBuilder newNettyServerBuilderForAdmin() {
+      return newNettyServerBuilder(adminHost, adminPort, adminTlsConfig);
+    }
+
+    private NettyServerBuilder newNettyServerBuilderForClient() {
+      return newNettyServerBuilder(clientHost, clientPort, clientTlsConfig);
+    }
+
+    private NettyServerBuilder newNettyServerBuilder(String hostname, int 
port, GrpcTlsConfig tlsConfig) {
+      final InetSocketAddress address = hostname == null || hostname.isEmpty() 
?
+          new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
+      final NettyServerBuilder nettyServerBuilder = 
NettyServerBuilder.forAddress(address)
+          .withChildOption(ChannelOption.SO_REUSEADDR, true)
+          .maxInboundMessageSize(messageSizeMax.getSizeInt())
+          .flowControlWindow(flowControlWindow.getSizeInt());
+
+      if (tlsConfig != null) {
+        SslContextBuilder sslContextBuilder = 
GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager());
+        if (tlsConfig.getMtlsEnabled()) {
+          sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+          GrpcUtil.setTrustManager(sslContextBuilder, 
tlsConfig.getTrustManager());
+        }
+        sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, 
OPENSSL);
+        try {
+          nettyServerBuilder.sslContext(sslContextBuilder.build());
+        } catch (Exception ex) {
+          throw new IllegalArgumentException("Failed to build SslContext, 
tlsConfig=" + tlsConfig, ex);
+        }
+      }
+      return nettyServerBuilder;
+    }
+
+    private boolean separateAdminServer() {
+      return adminPort > 0 && adminPort != serverPort;
+    }
+
+    private boolean separateClientServer() {
+      return clientPort > 0 && clientPort != serverPort;
+    }
+
+    Server newServer(GrpcClientProtocolService client, ZeroCopyMetrics 
zeroCopyMetrics, ServerInterceptor interceptor) {
+      final NettyServerBuilder serverBuilder = 
newNettyServerBuilderForServer();
+      final ServerServiceDefinition service = 
newGrpcServerProtocolService(zeroCopyMetrics).bindServiceWithZeroCopy();
+      serverBuilder.addService(ServerInterceptors.intercept(service, 
interceptor));
+
+      if (!separateAdminServer()) {
+        addAdminService(serverBuilder, server, interceptor);
+      }
+      if (!separateClientServer()) {
+        addClientService(serverBuilder, client, interceptor);
+      }
+      return serverBuilder.build();
+    }
+
     public GrpcService build() {
-      return new GrpcService(server, adminTlsConfig, clientTlsConfig, 
serverTlsConfig);
+      return new GrpcService(this);
     }
 
     public Builder setAdminTlsConfig(GrpcTlsConfig config) {
@@ -144,148 +270,58 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
   private final GrpcClientProtocolService clientProtocolService;
 
   private final MetricServerInterceptor serverInterceptor;
-  private final ZeroCopyMetrics zeroCopyMetrics;
+  private final ZeroCopyMetrics zeroCopyMetrics = new ZeroCopyMetrics();
 
   public MetricServerInterceptor getServerInterceptor() {
     return serverInterceptor;
   }
 
-  private GrpcService(RaftServer server,
-      GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig, 
GrpcTlsConfig serverTlsConfig) {
-    this(server, server::getId,
-        GrpcConfigKeys.Admin.host(server.getProperties()),
-        GrpcConfigKeys.Admin.port(server.getProperties()),
-        adminTlsConfig,
-        GrpcConfigKeys.Client.host(server.getProperties()),
-        GrpcConfigKeys.Client.port(server.getProperties()),
-        clientTlsConfig,
-        GrpcConfigKeys.Server.host(server.getProperties()),
-        GrpcConfigKeys.Server.port(server.getProperties()),
-        serverTlsConfig,
-        GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
-        
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
-        GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
-        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
-        GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()),
-        GrpcConfigKeys.Server.zeroCopyEnabled(server.getProperties()));
-  }
-
-  @SuppressWarnings("checkstyle:ParameterNumber") // private constructor
-  private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
-      String adminHost, int adminPort, GrpcTlsConfig adminTlsConfig,
-      String clientHost, int clientPort, GrpcTlsConfig clientTlsConfig,
-      String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig,
-      SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
-      SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
-      boolean useSeparateHBChannel, boolean zeroCopyEnabled) {
-    super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
-        p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
-            requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
-
-    final SizeInBytes gap = SizeInBytes.ONE_MB;
-    final long diff = grpcMessageSizeMax.getSize() - 
appenderBufferSize.getSize();
-    if (diff < gap.getSize()) {
-      throw new IllegalArgumentException("Illegal configuration: "
-          + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + "(= " + grpcMessageSizeMax
-          + ") must be " + gap + " larger than "
-          + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + "(= " + 
appenderBufferSize + ").");
-    }
+  private GrpcService(Builder b) {
+    super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), 
b::newGrpcServerProtocolClient));
 
-    final RaftProperties properties = raftServer.getProperties();
-    this.executor = ConcurrentUtils.newThreadPoolWithMax(
-        GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties),
-        GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
-        getId() + "-request-");
-    this.zeroCopyMetrics = new ZeroCopyMetrics();
-    this.clientProtocolService = new GrpcClientProtocolService(idSupplier, 
raftServer, executor,
-        zeroCopyEnabled, zeroCopyMetrics);
-
-    this.serverInterceptor = new MetricServerInterceptor(
-        idSupplier,
-        JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort
-    );
-
-    final boolean separateAdminServer = adminPort != serverPort && adminPort > 
0;
-    final boolean separateClientServer = clientPort != serverPort && 
clientPort > 0;
-
-    final NettyServerBuilder serverBuilder =
-        startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, 
grpcMessageSizeMax, flowControlWindow);
-    GrpcServerProtocolService serverProtocolService = new 
GrpcServerProtocolService(idSupplier, raftServer,
-        zeroCopyEnabled, zeroCopyMetrics);
-    serverBuilder.addService(ServerInterceptors.intercept(
-        serverProtocolService.bindServiceWithZeroCopy(), serverInterceptor));
-    if (!separateAdminServer) {
-      addAdminService(raftServer, serverBuilder);
-    }
-    if (!separateClientServer) {
-      addClientService(serverBuilder);
-    }
+    this.executor = b.newExecutor();
+    this.clientProtocolService = b.newGrpcClientProtocolService(executor, 
zeroCopyMetrics);
+    this.serverInterceptor = b.newMetricServerInterceptor();
+    final Server server = b.newServer(clientProtocolService, zeroCopyMetrics, 
serverInterceptor);
 
-    final Server server = serverBuilder.build();
     servers.put(GrpcServerProtocolService.class.getSimpleName(), server);
-    addressSupplier = newAddressSupplier(serverPort, server);
+    addressSupplier = newAddressSupplier(b.serverPort, server);
 
-    if (separateAdminServer) {
-      final NettyServerBuilder builder =
-          startBuildingNettyServer(adminHost, adminPort, adminTlsConfig, 
grpcMessageSizeMax, flowControlWindow);
-      addAdminService(raftServer, builder);
+    if (b.separateAdminServer()) {
+      final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin();
+      addAdminService(builder, b.server, serverInterceptor);
       final Server adminServer = builder.build();
       servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
-      adminServerAddressSupplier = newAddressSupplier(adminPort, adminServer);
+      adminServerAddressSupplier = newAddressSupplier(b.adminPort, 
adminServer);
     } else {
       adminServerAddressSupplier = addressSupplier;
     }
 
-    if (separateClientServer) {
-      final NettyServerBuilder builder =
-          startBuildingNettyServer(clientHost, clientPort, clientTlsConfig, 
grpcMessageSizeMax, flowControlWindow);
-      addClientService(builder);
+    if (b.separateClientServer()) {
+      final NettyServerBuilder builder = b.newNettyServerBuilderForClient();
+      addClientService(builder, clientProtocolService, serverInterceptor);
       final Server clientServer = builder.build();
       servers.put(GrpcClientProtocolService.class.getName(), clientServer);
-      clientServerAddressSupplier = newAddressSupplier(clientPort, 
clientServer);
+      clientServerAddressSupplier = newAddressSupplier(b.clientPort, 
clientServer);
     } else {
       clientServerAddressSupplier = addressSupplier;
     }
   }
 
-  private MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port, 
Server server) {
+  static MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port, 
Server server) {
     return JavaUtils.memoize(() -> new InetSocketAddress(port != 0 ? port : 
server.getPort()));
   }
 
-  private void addClientService(NettyServerBuilder builder) {
-    
builder.addService(ServerInterceptors.intercept(clientProtocolService.bindServiceWithZeroCopy(),
-        serverInterceptor));
+  static void addClientService(NettyServerBuilder builder, 
GrpcClientProtocolService client,
+      ServerInterceptor interceptor) {
+    final ServerServiceDefinition service = client.bindServiceWithZeroCopy();
+    builder.addService(ServerInterceptors.intercept(service, interceptor));
   }
 
-  private void addAdminService(RaftServer raftServer, NettyServerBuilder 
nettyServerBuilder) {
-    nettyServerBuilder.addService(ServerInterceptors.intercept(
-          new GrpcAdminProtocolService(raftServer),
-          serverInterceptor));
-  }
-
-  private static NettyServerBuilder startBuildingNettyServer(String hostname, 
int port, GrpcTlsConfig tlsConfig,
-      SizeInBytes grpcMessageSizeMax, SizeInBytes flowControlWindow) {
-    InetSocketAddress address = hostname == null || hostname.isEmpty() ?
-        new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
-    NettyServerBuilder nettyServerBuilder = 
NettyServerBuilder.forAddress(address)
-        .withChildOption(ChannelOption.SO_REUSEADDR, true)
-        .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
-        .flowControlWindow(flowControlWindow.getSizeInt());
-
-    if (tlsConfig != null) {
-      SslContextBuilder sslContextBuilder = 
GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager());
-      if (tlsConfig.getMtlsEnabled()) {
-        sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
-        GrpcUtil.setTrustManager(sslContextBuilder, 
tlsConfig.getTrustManager());
-      }
-      sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, 
OPENSSL);
-      try {
-        nettyServerBuilder.sslContext(sslContextBuilder.build());
-      } catch (Exception ex) {
-        throw new IllegalArgumentException("Failed to build SslContext, 
tlsConfig=" + tlsConfig, ex);
-      }
-    }
-    return nettyServerBuilder;
+  static void addAdminService(NettyServerBuilder builder, 
AdminAsynchronousProtocol admin,
+      ServerInterceptor interceptor) {
+    final GrpcAdminProtocolService service = new 
GrpcAdminProtocolService(admin);
+    builder.addService(ServerInterceptors.intercept(service, interceptor));
   }
 
   @Override

Reply via email to