szetszwo commented on code in PR #955:
URL: https://github.com/apache/ratis/pull/955#discussion_r1375324078
##########
ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java:
##########
@@ -158,7 +158,7 @@ static void setWorkerGroupSize(RaftProperties properties,
int clientWorkerGroupS
}
String WORKER_GROUP_SHARE_KEY = PREFIX + ".worker-group.share";
- boolean WORKER_GROUP_SHARE_DEFAULT = false;
+ boolean WORKER_GROUP_SHARE_DEFAULT = true;
Review Comment:
It is good to change the default to true.
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -78,7 +80,53 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyClientStreamRpc.class);
private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
- private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP =
new AtomicReference<>();
+
+ private static class RefCountedWorkerGroup implements
ReferenceCountedObject<EventLoopGroup> {
+ private final AtomicInteger count = new AtomicInteger();
+ private final AtomicReference<EventLoopGroup> value = new
AtomicReference<>();
+
+ @Override
+ public synchronized EventLoopGroup get() {
+ if (count.get() < 0) {
+ throw new IllegalStateException("Failed to get: object has already
been completely released.");
+ }
+ return value.get();
+ }
+
+ @Override
+ public synchronized EventLoopGroup retain() {
+ // n < 0: exception
+ // n >= 0: n++
+ final int previous = count.getAndUpdate(n -> n < 0? n : n + 1);
+ if (previous < 0) {
+ throw new IllegalStateException("Failed to retain: object has
already been completely released.");
+ } else if (previous == 0) {
+ // TODO: Find a way to pass RaftProperties
+ // New shared worker group will be created when previously there is
+ // no active connections
+ return value.updateAndGet(g -> g != null ? g: newWorkerGroup(new
RaftProperties()));
+ }
+ return value.get();
+ }
+
+ @Override
+ public synchronized boolean release() {
+ // n <= 0: exception
+ // n >= 1: n--
+ final int previous = count.getAndUpdate(n -> n <= 0? -1: n - 1);
+ if (previous <= 0) {
+ throw new IllegalStateException("Failed to release: object has
already been completely released.");
+ } else if (previous == 1) {
+ // Shutdown the event loop group when there are no active connection,
+ // subsequent retain will create a new shared worker group.
+ EventLoopGroup previousEventLoopGroup = value.getAndSet(null);
+ previousEventLoopGroup.shutdownGracefully();
+ }
+ return previous == 1;
+ }
+ }
+
+ private static final RefCountedWorkerGroup SHARED_WORKER_GROUP = new
RefCountedWorkerGroup();
Review Comment:
We needs `AtomicReference` so that a new `SHARED_WORKER_GROUP` can be
created after an old one is completely released.
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java:
##########
@@ -78,7 +80,53 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyClientStreamRpc.class);
private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
- private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP =
new AtomicReference<>();
+
+ private static class RefCountedWorkerGroup implements
ReferenceCountedObject<EventLoopGroup> {
Review Comment:
We should just use `ReferenceCountedObject.warp(..)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]