This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d673154d0 RATIS-1921. Shared worker group in WorkerGroupGetter should
be closed. (#955)
d673154d0 is described below
commit d673154d0fef6b1bcf3d47308fbffb5ab40d2522
Author: Ivan Andika <[email protected]>
AuthorDate: Sun Nov 5 01:27:09 2023 +0800
RATIS-1921. Shared worker group in WorkerGroupGetter should be closed.
(#955)
---
.../apache/ratis/util/ReferenceCountedObject.java | 10 +++-
.../org/apache/ratis/netty/NettyConfigKeys.java | 2 +-
.../ratis/netty/client/NettyClientStreamRpc.java | 53 +++++++++++++++-------
.../ratis/netty/server/DataStreamManagement.java | 3 ++
...ettyDataStreamChainTopologyWithGrpcCluster.java | 18 ++++++++
...NettyDataStreamStarTopologyWithGrpcCluster.java | 3 ++
.../ratis/util/TestReferenceCountedObject.java | 7 ++-
7 files changed, 77 insertions(+), 19 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 8b0c85903..fec82f099 100644
---
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -62,6 +62,11 @@ public interface ReferenceCountedObject<T> {
*/
boolean release();
+ /** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
+ static <V> ReferenceCountedObject<V> wrap(V value) {
+ return wrap(value, () -> {}, () -> {});
+ }
+
/**
* Wrap the given value as a {@link ReferenceCountedObject}.
*
@@ -81,8 +86,11 @@ public interface ReferenceCountedObject<T> {
@Override
public V get() {
- if (count.get() < 0) {
+ final int previous = count.get();
+ if (previous < 0) {
throw new IllegalStateException("Failed to get: object has already
been completely released.");
+ } else if (previous == 0) {
+ throw new IllegalStateException("Failed to get: object has not yet
been retained.");
}
return value;
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 98b1a6d74..be3ad8ee6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -158,7 +158,7 @@ public interface NettyConfigKeys {
}
String WORKER_GROUP_SHARE_KEY = PREFIX + ".worker-group.share";
- boolean WORKER_GROUP_SHARE_DEFAULT = false;
+ boolean WORKER_GROUP_SHARE_DEFAULT = true;
static boolean workerGroupShare(RaftProperties properties) {
return getBoolean(properties::getBoolean, WORKER_GROUP_SHARE_KEY,
WORKER_GROUP_SHARE_DEFAULT, getDefaultLog());
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index e6ce29a46..d842346e2 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -56,6 +56,8 @@ import
org.apache.ratis.thirdparty.io.netty.util.concurrent.ScheduledFuture;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -78,7 +80,36 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyClientStreamRpc.class);
private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
- private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP =
new AtomicReference<>();
+
+ private static final
AtomicReference<CompletableFuture<ReferenceCountedObject<EventLoopGroup>>>
SHARED_WORKER_GROUP
+ = new AtomicReference<>();
+
+ static WorkerGroupGetter newInstance(RaftProperties properties) {
+ final boolean shared =
NettyConfigKeys.DataStream.Client.workerGroupShare(properties);
+ if (shared) {
+ final CompletableFuture<ReferenceCountedObject<EventLoopGroup>>
created = new CompletableFuture<>();
+ final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> current
+ = SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : created);
+ if (current == created) {
+
created.complete(ReferenceCountedObject.wrap(newWorkerGroup(properties)));
+ }
+ return new WorkerGroupGetter(current.join().retain()) {
+ @Override
+ void shutdownGracefully() {
+ final CompletableFuture<ReferenceCountedObject<EventLoopGroup>>
returned
+ = SHARED_WORKER_GROUP.updateAndGet(previous -> {
+ Preconditions.assertSame(current, previous,
"SHARED_WORKER_GROUP");
+ return previous.join().release() ? null : previous;
+ });
+ if (returned == null) {
+ get().shutdownGracefully();
+ }
+ }
+ };
+ } else {
+ return new WorkerGroupGetter(newWorkerGroup(properties));
+ }
+ }
static EventLoopGroup newWorkerGroup(RaftProperties properties) {
return NettyUtils.newEventLoopGroup(
@@ -88,27 +119,18 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
}
private final EventLoopGroup workerGroup;
- private final boolean ignoreShutdown;
- WorkerGroupGetter(RaftProperties properties) {
- if (NettyConfigKeys.DataStream.Client.workerGroupShare(properties)) {
- workerGroup = SHARED_WORKER_GROUP.updateAndGet(g -> g != null? g:
newWorkerGroup(properties));
- ignoreShutdown = true;
- } else {
- workerGroup = newWorkerGroup(properties);
- ignoreShutdown = false;
- }
+ private WorkerGroupGetter(EventLoopGroup workerGroup) {
+ this.workerGroup = workerGroup;
}
@Override
- public EventLoopGroup get() {
+ public final EventLoopGroup get() {
return workerGroup;
}
void shutdownGracefully() {
- if (!ignoreShutdown) {
- workerGroup.shutdownGracefully();
- }
+ workerGroup.shutdownGracefully();
}
}
@@ -267,8 +289,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
final InetSocketAddress address =
NetUtils.createSocketAddr(server.getDataStreamAddress());
final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
- this.connection = new Connection(address,
- new WorkerGroupGetter(properties),
+ this.connection = new Connection(address,
WorkerGroupGetter.newInstance(properties),
() -> newChannelInitializer(address, sslContext, getClientHandler()));
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 958a1ad45..dd9a49929 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -328,10 +328,13 @@ public class DataStreamManagement {
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
final ReferenceCountedObject<ByteBuffer> wrapped =
ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
+ wrapped.retain();
try {
byteWritten += channel.write(wrapped);
} catch (Throwable t) {
throw new CompletionException(t);
+ } finally {
+ wrapped.release();
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
index e4e9fef57..31b28b4c2 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java
@@ -17,7 +17,25 @@
*/
package org.apache.ratis.datastream;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Before;
+
public class TestNettyDataStreamChainTopologyWithGrpcCluster
extends
DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet
{
+
+ @Before
+ public void setup() {
+ final RaftProperties p = getProperties();
+ RaftClientConfigKeys.DataStream.setRequestTimeout(p,
TimeDuration.ONE_MINUTE);
+ RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
+ RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p,
SizeInBytes.valueOf("10MB"));
+ RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);
+
+ NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100);
+ }
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
index 14c62b74f..45247d489 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
@@ -19,6 +19,7 @@ package org.apache.ratis.datastream;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
@@ -41,6 +42,8 @@ public class TestNettyDataStreamStarTopologyWithGrpcCluster
RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p,
SizeInBytes.valueOf("10MB"));
RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);
+
+ NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100);
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
index 448212154..5a855857a 100644
---
a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
+++
b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
@@ -47,7 +47,12 @@ public class TestReferenceCountedObject {
value, retained::getAndIncrement, released::getAndIncrement);
assertValues(retained, 0, released, 0);
- Assert.assertEquals(value, ref.get());
+ try {
+ ref.get();
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ e.printStackTrace(System.out);
+ }
assertValues(retained, 0, released, 0);
Assert.assertEquals(value, ref.retain());