This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d673154d0 RATIS-1921. Shared worker group in WorkerGroupGetter should 
be closed. (#955)
d673154d0 is described below

commit d673154d0fef6b1bcf3d47308fbffb5ab40d2522
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Nov 5 01:27:09 2023 +0800

    RATIS-1921. Shared worker group in WorkerGroupGetter should be closed. 
(#955)
---
 .../apache/ratis/util/ReferenceCountedObject.java  | 10 +++-
 .../org/apache/ratis/netty/NettyConfigKeys.java    |  2 +-
 .../ratis/netty/client/NettyClientStreamRpc.java   | 53 +++++++++++++++-------
 .../ratis/netty/server/DataStreamManagement.java   |  3 ++
 ...ettyDataStreamChainTopologyWithGrpcCluster.java | 18 ++++++++
 ...NettyDataStreamStarTopologyWithGrpcCluster.java |  3 ++
 .../ratis/util/TestReferenceCountedObject.java     |  7 ++-
 7 files changed, 77 insertions(+), 19 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 8b0c85903..fec82f099 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -62,6 +62,11 @@ public interface ReferenceCountedObject<T> {
    */
   boolean release();
 
+  /** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
+  static <V> ReferenceCountedObject<V> wrap(V value) {
+    return wrap(value, () -> {}, () -> {});
+  }
+
   /**
    * Wrap the given value as a {@link ReferenceCountedObject}.
    *
@@ -81,8 +86,11 @@ public interface ReferenceCountedObject<T> {
 
       @Override
       public V get() {
-        if (count.get() < 0) {
+        final int previous = count.get();
+        if (previous < 0) {
           throw new IllegalStateException("Failed to get: object has already 
been completely released.");
+        } else if (previous == 0) {
+          throw new IllegalStateException("Failed to get: object has not yet 
been retained.");
         }
         return value;
       }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 98b1a6d74..be3ad8ee6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -158,7 +158,7 @@ public interface NettyConfigKeys {
       }
 
       String WORKER_GROUP_SHARE_KEY = PREFIX + ".worker-group.share";
-      boolean WORKER_GROUP_SHARE_DEFAULT = false;
+      boolean WORKER_GROUP_SHARE_DEFAULT = true;
       static boolean workerGroupShare(RaftProperties properties) {
         return getBoolean(properties::getBoolean, WORKER_GROUP_SHARE_KEY,
             WORKER_GROUP_SHARE_DEFAULT, getDefaultLog());
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index e6ce29a46..d842346e2 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -56,6 +56,8 @@ import 
org.apache.ratis.thirdparty.io.netty.util.concurrent.ScheduledFuture;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -78,7 +80,36 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientStreamRpc.class);
 
   private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
-    private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = 
new AtomicReference<>();
+
+    private static final 
AtomicReference<CompletableFuture<ReferenceCountedObject<EventLoopGroup>>> 
SHARED_WORKER_GROUP
+        = new AtomicReference<>();
+
+    static WorkerGroupGetter newInstance(RaftProperties properties) {
+      final boolean shared = 
NettyConfigKeys.DataStream.Client.workerGroupShare(properties);
+      if (shared) {
+        final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> 
created = new CompletableFuture<>();
+        final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> current
+            = SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : created);
+        if (current == created) {
+          
created.complete(ReferenceCountedObject.wrap(newWorkerGroup(properties)));
+        }
+        return new WorkerGroupGetter(current.join().retain()) {
+          @Override
+          void shutdownGracefully() {
+            final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> 
returned
+                = SHARED_WORKER_GROUP.updateAndGet(previous -> {
+              Preconditions.assertSame(current, previous, 
"SHARED_WORKER_GROUP");
+              return previous.join().release() ? null : previous;
+            });
+            if (returned == null) {
+              get().shutdownGracefully();
+            }
+          }
+        };
+      } else {
+        return new WorkerGroupGetter(newWorkerGroup(properties));
+      }
+    }
 
     static EventLoopGroup newWorkerGroup(RaftProperties properties) {
       return NettyUtils.newEventLoopGroup(
@@ -88,27 +119,18 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     }
 
     private final EventLoopGroup workerGroup;
-    private final boolean ignoreShutdown;
 
-    WorkerGroupGetter(RaftProperties properties) {
-      if (NettyConfigKeys.DataStream.Client.workerGroupShare(properties)) {
-        workerGroup = SHARED_WORKER_GROUP.updateAndGet(g -> g != null? g: 
newWorkerGroup(properties));
-        ignoreShutdown = true;
-      } else {
-        workerGroup = newWorkerGroup(properties);
-        ignoreShutdown = false;
-      }
+    private WorkerGroupGetter(EventLoopGroup workerGroup) {
+      this.workerGroup = workerGroup;
     }
 
     @Override
-    public EventLoopGroup get() {
+    public final EventLoopGroup get() {
       return workerGroup;
     }
 
     void shutdownGracefully() {
-      if (!ignoreShutdown) {
-        workerGroup.shutdownGracefully();
-      }
+      workerGroup.shutdownGracefully();
     }
   }
 
@@ -267,8 +289,7 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
     final InetSocketAddress address = 
NetUtils.createSocketAddr(server.getDataStreamAddress());
     final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
-    this.connection = new Connection(address,
-        new WorkerGroupGetter(properties),
+    this.connection = new Connection(address, 
WorkerGroupGetter.newInstance(properties),
         () -> newChannelInitializer(address, sslContext, getClientHandler()));
   }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 958a1ad45..dd9a49929 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -328,10 +328,13 @@ public class DataStreamManagement {
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
       final ReferenceCountedObject<ByteBuffer> wrapped = 
ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
+      wrapped.retain();
       try {
         byteWritten += channel.write(wrapped);
       } catch (Throwable t) {
         throw new CompletionException(t);
+      } finally {
+        wrapped.release();
       }
     }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
index e4e9fef57..31b28b4c2 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
@@ -17,7 +17,25 @@
  */
 package org.apache.ratis.datastream;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Before;
+
 public class TestNettyDataStreamChainTopologyWithGrpcCluster
     extends 
DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
     implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet 
{
+
+  @Before
+  public void setup() {
+    final RaftProperties p = getProperties();
+    RaftClientConfigKeys.DataStream.setRequestTimeout(p, 
TimeDuration.ONE_MINUTE);
+    RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
+    RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, 
SizeInBytes.valueOf("10MB"));
+    RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);
+
+    NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100);
+  }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
index 14c62b74f..45247d489 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
@@ -19,6 +19,7 @@ package org.apache.ratis.datastream;
 
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RoutingTable;
@@ -41,6 +42,8 @@ public class TestNettyDataStreamStarTopologyWithGrpcCluster
     RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
     RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, 
SizeInBytes.valueOf("10MB"));
     RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);
+
+    NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100);
   }
 
   @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
 
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
index 448212154..5a855857a 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
@@ -47,7 +47,12 @@ public class TestReferenceCountedObject {
         value, retained::getAndIncrement, released::getAndIncrement);
 
     assertValues(retained, 0, released, 0);
-    Assert.assertEquals(value, ref.get());
+    try {
+      ref.get();
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      e.printStackTrace(System.out);
+    }
     assertValues(retained, 0, released, 0);
 
     Assert.assertEquals(value, ref.retain());

Reply via email to