This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 797dfa5d5cf Pipe: isolate async sink client resources by endpoint 
(#18015)
797dfa5d5cf is described below

commit 797dfa5d5cfd6dce83b65856d9570dd88e51726e
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 10:44:56 2026 +0800

    Pipe: isolate async sink client resources by endpoint (#18015)
---
 .../client/IoTDBDataNodeAsyncClientManager.java    | 48 +++++++++++-------
 .../sink/IoTDBDataNodeAsyncClientManagerTest.java  | 57 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5c38c3a8540..4456550bb52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -58,6 +58,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
@@ -71,11 +72,11 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final Set<TEndPoint> endPointSet;
 
-  private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT =
-      new ConcurrentHashMap<>();
+  private static final Map<String, Integer> CLIENT_RESOURCE_REF_COUNT = new 
ConcurrentHashMap<>();
   private final String receiverAttributes;
+  private final String clientResourceKey;
 
-  // receiverAttributes -> IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
+  // clientResourceKey -> IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
   private static final Map<String, IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>>
       ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new 
ConcurrentHashMap<>();
   private static final Map<String, ExecutorService> 
TS_FILE_ASYNC_EXECUTOR_HOLDER =
@@ -129,10 +130,11 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
             shouldMarkAsPipeRequest,
             isTSFileUsed,
             skipIfNoPrivileges);
+    clientResourceKey = generateClientResourceKey(receiverAttributes, 
endPoints);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
-      if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
+      if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(clientResourceKey))
 {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
-            receiverAttributes,
+            clientResourceKey,
             new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
                 .createClientManager(
                     isTSFileUsed
@@ -140,21 +142,21 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
                             
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
                         : new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
       }
-      endPoint2Client = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
+      endPoint2Client = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(clientResourceKey);
 
       if (isTSFileUsed) {
-        if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(receiverAttributes)) {
+        if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(clientResourceKey)) {
           TS_FILE_ASYNC_EXECUTOR_HOLDER.putIfAbsent(
-              receiverAttributes,
+              clientResourceKey,
               IoTDBThreadPoolFactory.newFixedThreadPool(
                   
PipeConfig.getInstance().getPipeRealTimeQueueMaxWaitingTsFileSize(),
                   ThreadName.PIPE_TSFILE_ASYNC_SEND_POOL.getName() + "-" + 
id.getAndIncrement()));
         }
-        executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(receiverAttributes);
+        executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(clientResourceKey);
       }
 
-      RECEIVER_ATTRIBUTES_REF_COUNT.compute(
-          receiverAttributes, (attributes, refCount) -> refCount == null ? 1 : 
refCount + 1);
+      CLIENT_RESOURCE_REF_COUNT.compute(
+          clientResourceKey, (attributes, refCount) -> refCount == null ? 1 : 
refCount + 1);
     }
 
     switch (loadBalanceStrategy) {
@@ -421,30 +423,30 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
   public void close() {
     isClosed = true;
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
-      RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
-          receiverAttributes,
+      CLIENT_RESOURCE_REF_COUNT.computeIfPresent(
+          clientResourceKey,
           (attributes, refCount) -> {
             if (refCount <= 1) {
               final IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient> clientManager =
-                  
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes);
+                  
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(clientResourceKey);
               if (clientManager != null) {
                 try {
                   clientManager.close();
                   LOGGER.info(
                       DataNodePipeMessages
                           
.CLOSED_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTES,
-                      receiverAttributes);
+                      clientResourceKey);
                 } catch (final Exception e) {
                   LOGGER.warn(
                       DataNodePipeMessages
                           
.FAILED_TO_CLOSE_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTE,
-                      receiverAttributes,
+                      clientResourceKey,
                       e);
                 }
               }
 
               final ExecutorService executor =
-                  TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(receiverAttributes);
+                  TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(clientResourceKey);
               if (executor != null) {
                 try {
                   executor.shutdown();
@@ -552,4 +554,16 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
   private void markHealthy(TEndPoint endPoint) {
     unhealthyEndPointMap.remove(endPoint);
   }
+
+  private static String generateClientResourceKey(
+      final String receiverAttributes, final List<TEndPoint> endPoints) {
+    return String.format(
+        "%s-%s",
+        receiverAttributes,
+        endPoints.stream()
+            .map(endPoint -> String.format("%s:%s", endPoint.getIp(), 
endPoint.getPort()))
+            .distinct()
+            .sorted()
+            .collect(Collectors.joining(",", "[", "]")));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
index fb13e438dec..f564dbbd70d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import java.lang.reflect.Field;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
 
 public class IoTDBDataNodeAsyncClientManagerTest {
 
@@ -71,6 +72,48 @@ public class IoTDBDataNodeAsyncClientManagerTest {
     }
   }
 
+  @Test
+  public void testClientResourcesShouldDifferentiateEndPoints() throws 
Exception {
+    final IoTDBDataNodeAsyncClientManager firstManager =
+        new IoTDBDataNodeAsyncClientManager(
+            Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+            false,
+            "round-robin",
+            new UserEntity(1L, "user", "cli-host"),
+            "password",
+            true,
+            "sync",
+            true,
+            true,
+            true,
+            true);
+    final IoTDBDataNodeAsyncClientManager secondManager =
+        new IoTDBDataNodeAsyncClientManager(
+            Collections.singletonList(new TEndPoint("127.0.0.2", 6667)),
+            false,
+            "round-robin",
+            new UserEntity(1L, "user", "cli-host"),
+            "password",
+            true,
+            "sync",
+            true,
+            true,
+            true,
+            true);
+
+    try {
+      Assert.assertEquals(
+          getReceiverAttributes(firstManager), 
getReceiverAttributes(secondManager));
+      Assert.assertNotEquals(
+          getClientResourceKey(firstManager), 
getClientResourceKey(secondManager));
+      Assert.assertNotSame(getEndPoint2Client(firstManager), 
getEndPoint2Client(secondManager));
+      Assert.assertNotSame(getExecutor(firstManager), 
getExecutor(secondManager));
+    } finally {
+      firstManager.close();
+      secondManager.close();
+    }
+  }
+
   private static String getReceiverAttributes(final 
IoTDBDataNodeAsyncClientManager manager)
       throws Exception {
     final Field field =
@@ -79,10 +122,24 @@ public class IoTDBDataNodeAsyncClientManagerTest {
     return (String) field.get(manager);
   }
 
+  private static String getClientResourceKey(final 
IoTDBDataNodeAsyncClientManager manager)
+      throws Exception {
+    final Field field = 
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("clientResourceKey");
+    field.setAccessible(true);
+    return (String) field.get(manager);
+  }
+
   private static Object getEndPoint2Client(final 
IoTDBDataNodeAsyncClientManager manager)
       throws Exception {
     final Field field = 
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("endPoint2Client");
     field.setAccessible(true);
     return field.get(manager);
   }
+
+  private static ExecutorService getExecutor(final 
IoTDBDataNodeAsyncClientManager manager)
+      throws Exception {
+    final Field field = 
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("executor");
+    field.setAccessible(true);
+    return (ExecutorService) field.get(manager);
+  }
 }

Reply via email to