szetszwo commented on code in PR #1466:
URL: https://github.com/apache/ratis/pull/1466#discussion_r3306065120
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java:
##########
@@ -235,7 +243,21 @@ Server newServer(GrpcClientProtocolService client,
ServerInterceptor interceptor
}
public GrpcServicesImpl build() {
- return new GrpcServicesImpl(this);
+ final RaftProperties props = server.getProperties();
+ final String id = server.getId() + "";
+ final boolean useEpoll = GrpcConfigKeys.useEpoll(props);
+ try {
+ serverBosses = NettyUtils.newEventLoopGroup(id + "-boss",
+ GrpcConfigKeys.Server.bossGroupSize(props), useEpoll);
+ serverWorkers = NettyUtils.newEventLoopGroup(id + "-server-workers",
+ GrpcConfigKeys.Server.workerGroupSize(props), useEpoll);
+ clientWorkers = NettyUtils.newEventLoopGroup(id + "-client-workers",
+ GrpcConfigKeys.Client.workerGroupSize(props), useEpoll);
+ return new GrpcServicesImpl(this);
+ } catch (RuntimeException | Error e) {
Review Comment:
Let's catch Throwable?
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java:
##########
@@ -213,6 +236,15 @@ private RaftClientRequestProto
toRaftClientRequestProto(RaftClientRequest reques
return proto;
}
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } finally {
+ NettyUtils.shutdownGracefully(clientWorkers);
Review Comment:
Check if it is initialized:
```java
if (clientWorkers.isInitialized()) {
NettyUtils.shutdownGracefully(clientWorkers.get());
}
```
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java:
##########
@@ -90,6 +96,10 @@ private ManagedChannel buildChannel(RaftPeer target, int
flowControlWindow, SslC
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
channelBuilder.disableRetry();
+ if (eventLoopGroup != null) {
Review Comment:
It won't be null. Let's add requireNonNull:
```java
this.eventLoopGroup = Objects.requireNonNull(eventLoopGroup,
"eventLoopGroup == null");
```
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java:
##########
@@ -53,6 +55,10 @@ static ManagedChannel buildManagedChannel(String address,
SslContext sslContext)
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
+ if (eventLoopGroup != null) {
Review Comment:
Similarly, it won't be null.
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java:
##########
@@ -60,15 +62,36 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
private final int maxMessageSize;
private final TimeDuration requestTimeoutDuration;
private final TimeDuration watchRequestTimeoutDuration;
+ private final EventLoopGroup clientWorkers;
public GrpcClientRpc(ClientId clientId, RaftProperties properties,
SslContext adminSslContext, SslContext clientSslContext) {
+ this(clientId, properties, adminSslContext, clientSslContext,
newClientWorkers(clientId, properties));
+ }
Review Comment:
Let's change it to a static method:
```java
public static GrpcClientRpc create(ClientId clientId, RaftProperties
properties,
SslContext adminSslContext, SslContext clientSslContext) {
final MemoizedSupplier<EventLoopGroup> eventLoopGroup =
MemoizedSupplier.valueOf(() -> NettyUtils.newEventLoopGroup(
clientId + "-client-workers",
GrpcConfigKeys.Client.workerGroupSize(properties),
GrpcConfigKeys.useEpoll(properties)));
return new GrpcClientRpc(clientId, properties, adminSslContext,
clientSslContext, eventLoopGroup);
}
```
```java
private final MemoizedSupplier<EventLoopGroup> clientWorkers;
private GrpcClientRpc(ClientId clientId, RaftProperties properties,
SslContext adminSslContext, SslContext clientSslContext,
MemoizedSupplier<EventLoopGroup> clientWorkers) {
super(new PeerProxyMap<>(clientId.toString(),
p -> new GrpcClientProtocolClient(clientId, p, properties,
adminSslContext, clientSslContext, clientWorkers)));
this.clientWorkers = clientWorkers;
this.clientId = clientId;
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties,
LOG::debug).getSizeInt();
this.requestTimeoutDuration =
RaftClientConfigKeys.Rpc.requestTimeout(properties);
this.watchRequestTimeoutDuration =
RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
}
```
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java:
##########
@@ -60,15 +62,36 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
private final int maxMessageSize;
private final TimeDuration requestTimeoutDuration;
private final TimeDuration watchRequestTimeoutDuration;
+ private final EventLoopGroup clientWorkers;
Review Comment:
Let's use a MemoizedSupplier for lazy initialization.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]