This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new a798425d1aa Pipe: Remove and close asynchronous connector manager when 
all clients are useless (#13399) (#13476)
a798425d1aa is described below

commit a798425d1aa4b3b88a418137641a95dc1a3eace9
Author: YC27 <[email protected]>
AuthorDate: Wed Sep 11 12:47:12 2024 +0800

    Pipe: Remove and close asynchronous connector manager when all clients are 
useless (#13399) (#13476)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../client/IoTDBDataNodeAsyncClientManager.java    | 49 +++++++++++++++++-----
 .../async/IoTDBDataRegionAsyncConnector.java       |  8 ++++
 2 files changed, 46 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index bfe5360e749..391545b7419 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransfe
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -62,6 +61,11 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final Set<TEndPoint> endPointSet;
 
+  private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT =
+      new ConcurrentHashMap<>();
+  private final String receiverAttributes;
+
+  // receiverAttributes -> IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
   private static final Map<String, IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>>
       ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new 
ConcurrentHashMap<>();
   private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
endPoint2Client;
@@ -82,18 +86,19 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
     endPointSet = new HashSet<>(endPoints);
 
-    final String receiverAttributes =
+    receiverAttributes =
         String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, 
loadTsFileStrategy);
-    if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
-      synchronized (IoTDBDataRegionAsyncConnector.class) {
-        ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
-            receiverAttributes,
-            new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
-                .createClientManager(
-                    new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
-      }
+    synchronized (IoTDBDataNodeAsyncClientManager.class) {
+      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
+          receiverAttributes,
+          new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
+              .createClientManager(
+                  new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
+      endPoint2Client = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
+
+      RECEIVER_ATTRIBUTES_REF_COUNT.compute(
+          receiverAttributes, (attributes, refCount) -> refCount == null ? 1 : 
refCount + 1);
     }
-    endPoint2Client = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
 
     switch (loadBalanceStrategy) {
       case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
@@ -288,6 +293,28 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     LEADER_CACHE_MANAGER.updateLeaderEndPoint(deviceId, endPoint);
   }
 
+  public void close() {
+    synchronized (IoTDBDataNodeAsyncClientManager.class) {
+      RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
+          receiverAttributes,
+          (attributes, refCount) -> {
+            if (refCount <= 1) {
+              final IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient> clientManager =
+                  
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes);
+              if (clientManager != null) {
+                try {
+                  clientManager.close();
+                } catch (Exception e) {
+                  LOGGER.warn("Failed to close client manager.", e);
+                }
+              }
+              return null;
+            }
+            return refCount - 1;
+          });
+    }
+  }
+
   /////////////////////// Strategies for load balance 
//////////////////////////
 
   private interface LoadBalancer {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 5e13c4e0243..f68d86dd7e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -539,6 +539,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       tabletBatchBuilder.close();
     }
 
+    try {
+      if (clientManager != null) {
+        clientManager.close();
+      }
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to close client manager.", e);
+    }
+
     super.close();
   }
 

Reply via email to