This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-3.1.1_review in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 94692f3cb4e1a8e89b6e1a186835c5a12a9f5588 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Aug 14 11:09:29 2024 -0700 RATIS-2139. Fix checkstyle:ParameterNumber in GrpcService. (#1134) --- .../org/apache/ratis/grpc/server/GrpcService.java | 256 ++++++++++++--------- 1 file changed, 147 insertions(+), 109 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index 5997c5429..33f23315d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -22,13 +22,16 @@ import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor; +import org.apache.ratis.protocol.AdminAsynchronousProtocol; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpcWithProxy; import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol; +import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor; import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; @@ -96,19 +99,139 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol public static final class Builder { private RaftServer server; + + private String adminHost; + private int adminPort; private GrpcTlsConfig adminTlsConfig; + private String clientHost; + private int clientPort; private GrpcTlsConfig clientTlsConfig; + private String serverHost; + private int serverPort; private GrpcTlsConfig serverTlsConfig; + private SizeInBytes messageSizeMax; + private SizeInBytes flowControlWindow; + private TimeDuration requestTimeoutDuration; + private boolean separateHeartbeatChannel; + private Builder() {} public Builder setServer(RaftServer raftServer) { this.server = raftServer; + + final RaftProperties properties = server.getProperties(); + this.adminHost = GrpcConfigKeys.Admin.host(properties); + this.adminPort = GrpcConfigKeys.Admin.port(properties); + this.clientHost = GrpcConfigKeys.Client.host(properties); + this.clientPort = GrpcConfigKeys.Client.port(properties); + this.serverHost = GrpcConfigKeys.Server.host(properties); + this.serverPort = GrpcConfigKeys.Server.port(properties); + this.messageSizeMax = GrpcConfigKeys.messageSizeMax(properties, LOG::info); + this.flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::info); + this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties); + this.separateHeartbeatChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); + + final SizeInBytes appenderBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties); + final SizeInBytes gap = SizeInBytes.ONE_MB; + final long diff = messageSizeMax.getSize() - appenderBufferSize.getSize(); + if (diff < gap.getSize()) { + throw new IllegalArgumentException("Illegal configuration: " + + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + "(= " + messageSizeMax + + ") must be " + gap + " larger than " + + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + "(= " + appenderBufferSize + ")."); + } + return this; } + private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target) { + return new GrpcServerProtocolClient(target, flowControlWindow.getSizeInt(), + requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel); + } + + private ExecutorService newExecutor() { + final RaftProperties properties = server.getProperties(); + return ConcurrentUtils.newThreadPoolWithMax( + GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties), + GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties), + server.getId() + "-request-"); + } + + private GrpcClientProtocolService newGrpcClientProtocolService( + ExecutorService executor) { + return new GrpcClientProtocolService(server::getId, server, executor); + } + + private GrpcServerProtocolService newGrpcServerProtocolService() { + return new GrpcServerProtocolService(server::getId, server); + } + + private MetricServerInterceptor newMetricServerInterceptor() { + return new MetricServerInterceptor(server::getId, + JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort); + } + + private NettyServerBuilder newNettyServerBuilderForServer() { + return newNettyServerBuilder(serverHost, serverPort, serverTlsConfig); + } + + private NettyServerBuilder newNettyServerBuilderForAdmin() { + return newNettyServerBuilder(adminHost, adminPort, adminTlsConfig); + } + + private NettyServerBuilder newNettyServerBuilderForClient() { + return newNettyServerBuilder(clientHost, clientPort, clientTlsConfig); + } + + private NettyServerBuilder newNettyServerBuilder(String hostname, int port, GrpcTlsConfig tlsConfig) { + final InetSocketAddress address = hostname == null || hostname.isEmpty() ? + new InetSocketAddress(port) : new InetSocketAddress(hostname, port); + final NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address) + .withChildOption(ChannelOption.SO_REUSEADDR, true) + .maxInboundMessageSize(messageSizeMax.getSizeInt()) + .flowControlWindow(flowControlWindow.getSizeInt()); + + if (tlsConfig != null) { + SslContextBuilder sslContextBuilder = GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager()); + if (tlsConfig.getMtlsEnabled()) { + sslContextBuilder.clientAuth(ClientAuth.REQUIRE); + GrpcUtil.setTrustManager(sslContextBuilder, tlsConfig.getTrustManager()); + } + sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, OPENSSL); + try { + nettyServerBuilder.sslContext(sslContextBuilder.build()); + } catch (Exception ex) { + throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig, ex); + } + } + return nettyServerBuilder; + } + + private boolean separateAdminServer() { + return adminPort > 0 && adminPort != serverPort; + } + + private boolean separateClientServer() { + return clientPort > 0 && clientPort != serverPort; + } + + Server newServer(GrpcClientProtocolService client, ServerInterceptor interceptor) { + final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(); + final GrpcServerProtocolService service = newGrpcServerProtocolService(); + serverBuilder.addService(ServerInterceptors.intercept(service, interceptor)); + + if (!separateAdminServer()) { + addAdminService(serverBuilder, server, interceptor); + } + if (!separateClientServer()) { + addClientService(serverBuilder, client, interceptor); + } + return serverBuilder.build(); + } + public GrpcService build() { - return new GrpcService(server, adminTlsConfig, clientTlsConfig, serverTlsConfig); + return new GrpcService(this); } public Builder setAdminTlsConfig(GrpcTlsConfig config) { @@ -147,136 +270,51 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol return serverInterceptor; } - private GrpcService(RaftServer server, - GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) { - this(server, server::getId, - GrpcConfigKeys.Admin.host(server.getProperties()), - GrpcConfigKeys.Admin.port(server.getProperties()), - adminTlsConfig, - GrpcConfigKeys.Client.host(server.getProperties()), - GrpcConfigKeys.Client.port(server.getProperties()), - clientTlsConfig, - GrpcConfigKeys.Server.host(server.getProperties()), - GrpcConfigKeys.Server.port(server.getProperties()), - serverTlsConfig, - GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info), - RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()), - GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info), - RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()), - GrpcConfigKeys.Server.heartbeatChannel(server.getProperties())); - } - - @SuppressWarnings("checkstyle:ParameterNumber") // private constructor - private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, - String adminHost, int adminPort, GrpcTlsConfig adminTlsConfig, - String clientHost, int clientPort, GrpcTlsConfig clientTlsConfig, - String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig, - SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, - SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration, - boolean useSeparateHBChannel) { - super(idSupplier, id -> new PeerProxyMap<>(id.toString(), - p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), - requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel))); - - final SizeInBytes gap = SizeInBytes.ONE_MB; - final long diff = grpcMessageSizeMax.getSize() - appenderBufferSize.getSize(); - if (diff < gap.getSize()) { - throw new IllegalArgumentException("Illegal configuration: " - + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + "(= " + grpcMessageSizeMax - + ") must be " + gap + " larger than " - + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + "(= " + appenderBufferSize + ")."); - } + private GrpcService(Builder b) { + super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), b::newGrpcServerProtocolClient)); - final RaftProperties properties = raftServer.getProperties(); - this.executor = ConcurrentUtils.newThreadPoolWithMax( - GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties), - GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties), - getId() + "-request-"); - this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor); - - this.serverInterceptor = new MetricServerInterceptor( - idSupplier, - JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort - ); - - final boolean separateAdminServer = adminPort != serverPort && adminPort > 0; - final boolean separateClientServer = clientPort != serverPort && clientPort > 0; - - final NettyServerBuilder serverBuilder = - startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow); - serverBuilder.addService(ServerInterceptors.intercept( - new GrpcServerProtocolService(idSupplier, raftServer), serverInterceptor)); - if (!separateAdminServer) { - addAdminService(raftServer, serverBuilder); - } - if (!separateClientServer) { - addClientService(serverBuilder); - } + this.executor = b.newExecutor(); + this.clientProtocolService = b.newGrpcClientProtocolService(executor); + this.serverInterceptor = b.newMetricServerInterceptor(); + final Server server = b.newServer(clientProtocolService, serverInterceptor); - final Server server = serverBuilder.build(); servers.put(GrpcServerProtocolService.class.getSimpleName(), server); - addressSupplier = newAddressSupplier(serverPort, server); + addressSupplier = newAddressSupplier(b.serverPort, server); - if (separateAdminServer) { - final NettyServerBuilder builder = - startBuildingNettyServer(adminHost, adminPort, adminTlsConfig, grpcMessageSizeMax, flowControlWindow); - addAdminService(raftServer, builder); + if (b.separateAdminServer()) { + final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin(); + addAdminService(builder, b.server, serverInterceptor); final Server adminServer = builder.build(); servers.put(GrpcAdminProtocolService.class.getName(), adminServer); - adminServerAddressSupplier = newAddressSupplier(adminPort, adminServer); + adminServerAddressSupplier = newAddressSupplier(b.adminPort, adminServer); } else { adminServerAddressSupplier = addressSupplier; } - if (separateClientServer) { - final NettyServerBuilder builder = - startBuildingNettyServer(clientHost, clientPort, clientTlsConfig, grpcMessageSizeMax, flowControlWindow); - addClientService(builder); + if (b.separateClientServer()) { + final NettyServerBuilder builder = b.newNettyServerBuilderForClient(); + addClientService(builder, clientProtocolService, serverInterceptor); final Server clientServer = builder.build(); servers.put(GrpcClientProtocolService.class.getName(), clientServer); - clientServerAddressSupplier = newAddressSupplier(clientPort, clientServer); + clientServerAddressSupplier = newAddressSupplier(b.clientPort, clientServer); } else { clientServerAddressSupplier = addressSupplier; } } - private MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port, Server server) { + static MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port, Server server) { return JavaUtils.memoize(() -> new InetSocketAddress(port != 0 ? port : server.getPort())); } - private void addClientService(NettyServerBuilder builder) { - builder.addService(ServerInterceptors.intercept(clientProtocolService, serverInterceptor)); + static void addClientService(NettyServerBuilder builder, GrpcClientProtocolService client, + ServerInterceptor interceptor) { + builder.addService(ServerInterceptors.intercept(client, interceptor)); } - private void addAdminService(RaftServer raftServer, NettyServerBuilder nettyServerBuilder) { - nettyServerBuilder.addService(ServerInterceptors.intercept( - new GrpcAdminProtocolService(raftServer), - serverInterceptor)); - } - - private static NettyServerBuilder startBuildingNettyServer(String hostname, int port, GrpcTlsConfig tlsConfig, - SizeInBytes grpcMessageSizeMax, SizeInBytes flowControlWindow) { - InetSocketAddress address = hostname == null || hostname.isEmpty() ? - new InetSocketAddress(port) : new InetSocketAddress(hostname, port); - NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address) - .withChildOption(ChannelOption.SO_REUSEADDR, true) - .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt()) - .flowControlWindow(flowControlWindow.getSizeInt()); - - if (tlsConfig != null) { - SslContextBuilder sslContextBuilder = GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager()); - if (tlsConfig.getMtlsEnabled()) { - sslContextBuilder.clientAuth(ClientAuth.REQUIRE); - GrpcUtil.setTrustManager(sslContextBuilder, tlsConfig.getTrustManager()); - } - sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, OPENSSL); - try { - nettyServerBuilder.sslContext(sslContextBuilder.build()); - } catch (Exception ex) { - throw new IllegalArgumentException("Failed to build SslContext, tlsConfig=" + tlsConfig, ex); - } - } - return nettyServerBuilder; + static void addAdminService(NettyServerBuilder builder, AdminAsynchronousProtocol admin, + ServerInterceptor interceptor) { + final GrpcAdminProtocolService service = new GrpcAdminProtocolService(admin); + builder.addService(ServerInterceptors.intercept(service, interceptor)); } @Override
