This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new a798425d1aa Pipe: Remove and close asynchronous connector manager when
all clients are useless (#13399) (#13476)
a798425d1aa is described below
commit a798425d1aa4b3b88a418137641a95dc1a3eace9
Author: YC27 <[email protected]>
AuthorDate: Wed Sep 11 12:47:12 2024 +0800
Pipe: Remove and close asynchronous connector manager when all clients are
useless (#13399) (#13476)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../client/IoTDBDataNodeAsyncClientManager.java | 49 +++++++++++++++++-----
.../async/IoTDBDataRegionAsyncConnector.java | 8 ++++
2 files changed, 46 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index bfe5360e749..391545b7419 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransfe
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -62,6 +61,11 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private final Set<TEndPoint> endPointSet;
+ private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT =
+ new ConcurrentHashMap<>();
+ private final String receiverAttributes;
+
+ // receiverAttributes -> IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>
private static final Map<String, IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>>
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new
ConcurrentHashMap<>();
private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
endPoint2Client;
@@ -82,18 +86,19 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
endPointSet = new HashSet<>(endPoints);
- final String receiverAttributes =
+ receiverAttributes =
String.format("%s-%s", shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
- if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
- synchronized (IoTDBDataRegionAsyncConnector.class) {
- ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
- receiverAttributes,
- new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
- .createClientManager(
- new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
- }
+ synchronized (IoTDBDataNodeAsyncClientManager.class) {
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
+ receiverAttributes,
+ new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
+ endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
+
+ RECEIVER_ATTRIBUTES_REF_COUNT.compute(
+ receiverAttributes, (attributes, refCount) -> refCount == null ? 1 :
refCount + 1);
}
- endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
switch (loadBalanceStrategy) {
case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
@@ -288,6 +293,28 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint);
}
+ public void close() {
+ synchronized (IoTDBDataNodeAsyncClientManager.class) {
+ RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
+ receiverAttributes,
+ (attributes, refCount) -> {
+ if (refCount <= 1) {
+ final IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient> clientManager =
+
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes);
+ if (clientManager != null) {
+ try {
+ clientManager.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close client manager.", e);
+ }
+ }
+ return null;
+ }
+ return refCount - 1;
+ });
+ }
+ }
+
/////////////////////// Strategies for load balance
//////////////////////////
private interface LoadBalancer {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 5e13c4e0243..f68d86dd7e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -539,6 +539,14 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
tabletBatchBuilder.close();
}
+ try {
+ if (clientManager != null) {
+ clientManager.close();
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to close client manager.", e);
+ }
+
super.close();
}