This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8d4c4ae30 RATIS-2139. Fix checkstyle:ParameterNumber in GrpcService.
(#1134)
8d4c4ae30 is described below
commit 8d4c4ae3082f24e22aab454ec144f384ec6e1b1a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Aug 14 11:09:29 2024 -0700
RATIS-2139. Fix checkstyle:ParameterNumber in GrpcService. (#1134)
---
.../org/apache/ratis/grpc/server/GrpcService.java | 268 ++++++++++++---------
1 file changed, 152 insertions(+), 116 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 0802226f7..ce434c5e8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -23,7 +23,9 @@ import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
+import org.apache.ratis.protocol.AdminAsynchronousProtocol;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
@@ -31,7 +33,9 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpcWithProxy;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
+import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.grpc.Server;
@@ -98,19 +102,141 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
public static final class Builder {
private RaftServer server;
+
+ private String adminHost;
+ private int adminPort;
private GrpcTlsConfig adminTlsConfig;
+ private String clientHost;
+ private int clientPort;
private GrpcTlsConfig clientTlsConfig;
+ private String serverHost;
+ private int serverPort;
private GrpcTlsConfig serverTlsConfig;
+ private SizeInBytes messageSizeMax;
+ private SizeInBytes flowControlWindow;
+ private TimeDuration requestTimeoutDuration;
+ private boolean separateHeartbeatChannel;
+ private boolean zeroCopyEnabled;
+
private Builder() {}
public Builder setServer(RaftServer raftServer) {
this.server = raftServer;
+
+ final RaftProperties properties = server.getProperties();
+ this.adminHost = GrpcConfigKeys.Admin.host(properties);
+ this.adminPort = GrpcConfigKeys.Admin.port(properties);
+ this.clientHost = GrpcConfigKeys.Client.host(properties);
+ this.clientPort = GrpcConfigKeys.Client.port(properties);
+ this.serverHost = GrpcConfigKeys.Server.host(properties);
+ this.serverPort = GrpcConfigKeys.Server.port(properties);
+ this.messageSizeMax = GrpcConfigKeys.messageSizeMax(properties,
LOG::info);
+ this.flowControlWindow = GrpcConfigKeys.flowControlWindow(properties,
LOG::info);
+ this.requestTimeoutDuration =
RaftServerConfigKeys.Rpc.requestTimeout(properties);
+ this.separateHeartbeatChannel =
GrpcConfigKeys.Server.heartbeatChannel(properties);
+ this.zeroCopyEnabled = GrpcConfigKeys.Server.zeroCopyEnabled(properties);
+
+ final SizeInBytes appenderBufferSize =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+ final SizeInBytes gap = SizeInBytes.ONE_MB;
+ final long diff = messageSizeMax.getSize() -
appenderBufferSize.getSize();
+ if (diff < gap.getSize()) {
+ throw new IllegalArgumentException("Illegal configuration: "
+ + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + "(= " + messageSizeMax
+ + ") must be " + gap + " larger than "
+ + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + "(= "
+ appenderBufferSize + ").");
+ }
+
return this;
}
+ private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer
target) {
+ return new GrpcServerProtocolClient(target,
flowControlWindow.getSizeInt(),
+ requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
+ }
+
+ private ExecutorService newExecutor() {
+ final RaftProperties properties = server.getProperties();
+ return ConcurrentUtils.newThreadPoolWithMax(
+ GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties),
+ GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
+ server.getId() + "-request-");
+ }
+
+ private GrpcClientProtocolService newGrpcClientProtocolService(
+ ExecutorService executor, ZeroCopyMetrics zeroCopyMetrics) {
+ return new GrpcClientProtocolService(server::getId, server, executor,
zeroCopyEnabled, zeroCopyMetrics);
+ }
+
+ private GrpcServerProtocolService
newGrpcServerProtocolService(ZeroCopyMetrics zeroCopyMetrics) {
+ return new GrpcServerProtocolService(server::getId, server,
zeroCopyEnabled, zeroCopyMetrics);
+ }
+
+ private MetricServerInterceptor newMetricServerInterceptor() {
+ return new MetricServerInterceptor(server::getId,
+ JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort);
+ }
+
+ private NettyServerBuilder newNettyServerBuilderForServer() {
+ return newNettyServerBuilder(serverHost, serverPort, serverTlsConfig);
+ }
+
+ private NettyServerBuilder newNettyServerBuilderForAdmin() {
+ return newNettyServerBuilder(adminHost, adminPort, adminTlsConfig);
+ }
+
+ private NettyServerBuilder newNettyServerBuilderForClient() {
+ return newNettyServerBuilder(clientHost, clientPort, clientTlsConfig);
+ }
+
+ private NettyServerBuilder newNettyServerBuilder(String hostname, int
port, GrpcTlsConfig tlsConfig) {
+ final InetSocketAddress address = hostname == null || hostname.isEmpty()
?
+ new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
+ final NettyServerBuilder nettyServerBuilder =
NettyServerBuilder.forAddress(address)
+ .withChildOption(ChannelOption.SO_REUSEADDR, true)
+ .maxInboundMessageSize(messageSizeMax.getSizeInt())
+ .flowControlWindow(flowControlWindow.getSizeInt());
+
+ if (tlsConfig != null) {
+ SslContextBuilder sslContextBuilder =
GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager());
+ if (tlsConfig.getMtlsEnabled()) {
+ sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+ GrpcUtil.setTrustManager(sslContextBuilder,
tlsConfig.getTrustManager());
+ }
+ sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder,
OPENSSL);
+ try {
+ nettyServerBuilder.sslContext(sslContextBuilder.build());
+ } catch (Exception ex) {
+ throw new IllegalArgumentException("Failed to build SslContext,
tlsConfig=" + tlsConfig, ex);
+ }
+ }
+ return nettyServerBuilder;
+ }
+
+ private boolean separateAdminServer() {
+ return adminPort > 0 && adminPort != serverPort;
+ }
+
+ private boolean separateClientServer() {
+ return clientPort > 0 && clientPort != serverPort;
+ }
+
+ Server newServer(GrpcClientProtocolService client, ZeroCopyMetrics
zeroCopyMetrics, ServerInterceptor interceptor) {
+ final NettyServerBuilder serverBuilder =
newNettyServerBuilderForServer();
+ final ServerServiceDefinition service =
newGrpcServerProtocolService(zeroCopyMetrics).bindServiceWithZeroCopy();
+ serverBuilder.addService(ServerInterceptors.intercept(service,
interceptor));
+
+ if (!separateAdminServer()) {
+ addAdminService(serverBuilder, server, interceptor);
+ }
+ if (!separateClientServer()) {
+ addClientService(serverBuilder, client, interceptor);
+ }
+ return serverBuilder.build();
+ }
+
public GrpcService build() {
- return new GrpcService(server, adminTlsConfig, clientTlsConfig,
serverTlsConfig);
+ return new GrpcService(this);
}
public Builder setAdminTlsConfig(GrpcTlsConfig config) {
@@ -144,148 +270,58 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
private final GrpcClientProtocolService clientProtocolService;
private final MetricServerInterceptor serverInterceptor;
- private final ZeroCopyMetrics zeroCopyMetrics;
+ private final ZeroCopyMetrics zeroCopyMetrics = new ZeroCopyMetrics();
public MetricServerInterceptor getServerInterceptor() {
return serverInterceptor;
}
- private GrpcService(RaftServer server,
- GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig,
GrpcTlsConfig serverTlsConfig) {
- this(server, server::getId,
- GrpcConfigKeys.Admin.host(server.getProperties()),
- GrpcConfigKeys.Admin.port(server.getProperties()),
- adminTlsConfig,
- GrpcConfigKeys.Client.host(server.getProperties()),
- GrpcConfigKeys.Client.port(server.getProperties()),
- clientTlsConfig,
- GrpcConfigKeys.Server.host(server.getProperties()),
- GrpcConfigKeys.Server.port(server.getProperties()),
- serverTlsConfig,
- GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
-
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
- GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
- RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
- GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()),
- GrpcConfigKeys.Server.zeroCopyEnabled(server.getProperties()));
- }
-
- @SuppressWarnings("checkstyle:ParameterNumber") // private constructor
- private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
- String adminHost, int adminPort, GrpcTlsConfig adminTlsConfig,
- String clientHost, int clientPort, GrpcTlsConfig clientTlsConfig,
- String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig,
- SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
- SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
- boolean useSeparateHBChannel, boolean zeroCopyEnabled) {
- super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
- p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
- requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
-
- final SizeInBytes gap = SizeInBytes.ONE_MB;
- final long diff = grpcMessageSizeMax.getSize() -
appenderBufferSize.getSize();
- if (diff < gap.getSize()) {
- throw new IllegalArgumentException("Illegal configuration: "
- + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + "(= " + grpcMessageSizeMax
- + ") must be " + gap + " larger than "
- + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + "(= " +
appenderBufferSize + ").");
- }
+ private GrpcService(Builder b) {
+ super(b.server::getId, id -> new PeerProxyMap<>(id.toString(),
b::newGrpcServerProtocolClient));
- final RaftProperties properties = raftServer.getProperties();
- this.executor = ConcurrentUtils.newThreadPoolWithMax(
- GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties),
- GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
- getId() + "-request-");
- this.zeroCopyMetrics = new ZeroCopyMetrics();
- this.clientProtocolService = new GrpcClientProtocolService(idSupplier,
raftServer, executor,
- zeroCopyEnabled, zeroCopyMetrics);
-
- this.serverInterceptor = new MetricServerInterceptor(
- idSupplier,
- JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort
- );
-
- final boolean separateAdminServer = adminPort != serverPort && adminPort >
0;
- final boolean separateClientServer = clientPort != serverPort &&
clientPort > 0;
-
- final NettyServerBuilder serverBuilder =
- startBuildingNettyServer(serverHost, serverPort, serverTlsConfig,
grpcMessageSizeMax, flowControlWindow);
- GrpcServerProtocolService serverProtocolService = new
GrpcServerProtocolService(idSupplier, raftServer,
- zeroCopyEnabled, zeroCopyMetrics);
- serverBuilder.addService(ServerInterceptors.intercept(
- serverProtocolService.bindServiceWithZeroCopy(), serverInterceptor));
- if (!separateAdminServer) {
- addAdminService(raftServer, serverBuilder);
- }
- if (!separateClientServer) {
- addClientService(serverBuilder);
- }
+ this.executor = b.newExecutor();
+ this.clientProtocolService = b.newGrpcClientProtocolService(executor,
zeroCopyMetrics);
+ this.serverInterceptor = b.newMetricServerInterceptor();
+ final Server server = b.newServer(clientProtocolService, zeroCopyMetrics,
serverInterceptor);
- final Server server = serverBuilder.build();
servers.put(GrpcServerProtocolService.class.getSimpleName(), server);
- addressSupplier = newAddressSupplier(serverPort, server);
+ addressSupplier = newAddressSupplier(b.serverPort, server);
- if (separateAdminServer) {
- final NettyServerBuilder builder =
- startBuildingNettyServer(adminHost, adminPort, adminTlsConfig,
grpcMessageSizeMax, flowControlWindow);
- addAdminService(raftServer, builder);
+ if (b.separateAdminServer()) {
+ final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin();
+ addAdminService(builder, b.server, serverInterceptor);
final Server adminServer = builder.build();
servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
- adminServerAddressSupplier = newAddressSupplier(adminPort, adminServer);
+ adminServerAddressSupplier = newAddressSupplier(b.adminPort,
adminServer);
} else {
adminServerAddressSupplier = addressSupplier;
}
- if (separateClientServer) {
- final NettyServerBuilder builder =
- startBuildingNettyServer(clientHost, clientPort, clientTlsConfig,
grpcMessageSizeMax, flowControlWindow);
- addClientService(builder);
+ if (b.separateClientServer()) {
+ final NettyServerBuilder builder = b.newNettyServerBuilderForClient();
+ addClientService(builder, clientProtocolService, serverInterceptor);
final Server clientServer = builder.build();
servers.put(GrpcClientProtocolService.class.getName(), clientServer);
- clientServerAddressSupplier = newAddressSupplier(clientPort,
clientServer);
+ clientServerAddressSupplier = newAddressSupplier(b.clientPort,
clientServer);
} else {
clientServerAddressSupplier = addressSupplier;
}
}
- private MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port,
Server server) {
+ static MemoizedSupplier<InetSocketAddress> newAddressSupplier(int port,
Server server) {
return JavaUtils.memoize(() -> new InetSocketAddress(port != 0 ? port :
server.getPort()));
}
- private void addClientService(NettyServerBuilder builder) {
-
builder.addService(ServerInterceptors.intercept(clientProtocolService.bindServiceWithZeroCopy(),
- serverInterceptor));
+ static void addClientService(NettyServerBuilder builder,
GrpcClientProtocolService client,
+ ServerInterceptor interceptor) {
+ final ServerServiceDefinition service = client.bindServiceWithZeroCopy();
+ builder.addService(ServerInterceptors.intercept(service, interceptor));
}
- private void addAdminService(RaftServer raftServer, NettyServerBuilder
nettyServerBuilder) {
- nettyServerBuilder.addService(ServerInterceptors.intercept(
- new GrpcAdminProtocolService(raftServer),
- serverInterceptor));
- }
-
- private static NettyServerBuilder startBuildingNettyServer(String hostname,
int port, GrpcTlsConfig tlsConfig,
- SizeInBytes grpcMessageSizeMax, SizeInBytes flowControlWindow) {
- InetSocketAddress address = hostname == null || hostname.isEmpty() ?
- new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
- NettyServerBuilder nettyServerBuilder =
NettyServerBuilder.forAddress(address)
- .withChildOption(ChannelOption.SO_REUSEADDR, true)
- .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
- .flowControlWindow(flowControlWindow.getSizeInt());
-
- if (tlsConfig != null) {
- SslContextBuilder sslContextBuilder =
GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager());
- if (tlsConfig.getMtlsEnabled()) {
- sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
- GrpcUtil.setTrustManager(sslContextBuilder,
tlsConfig.getTrustManager());
- }
- sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder,
OPENSSL);
- try {
- nettyServerBuilder.sslContext(sslContextBuilder.build());
- } catch (Exception ex) {
- throw new IllegalArgumentException("Failed to build SslContext,
tlsConfig=" + tlsConfig, ex);
- }
- }
- return nettyServerBuilder;
+ static void addAdminService(NettyServerBuilder builder,
AdminAsynchronousProtocol admin,
+ ServerInterceptor interceptor) {
+ final GrpcAdminProtocolService service = new
GrpcAdminProtocolService(admin);
+ builder.addService(ServerInterceptors.intercept(service, interceptor));
}
@Override