This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch check-shit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e2eeabf9a8ea28b04e0099c648e37692ff8898f9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 10 18:20:46 2025 +0800

    Improve client cleanup and error handling in client managers
    
    Adds additional checks and cleanup logic when returning or closing clients, 
especially when nodes are null or connectors are closed. Enhances error logging 
and ensures resources are properly invalidated to prevent leaks. Also makes the 
close() method in AsyncPipeDataTransferServiceClient public for broader access.
---
 .../connector/client/IoTDBDataNodeAsyncClientManager.java   |  8 ++++++++
 .../thrift/async/handler/PipeTransferTsFileHandler.java     |  8 ++++++++
 .../java/org/apache/iotdb/commons/client/ClientManager.java | 13 +++++++++++++
 .../client/async/AsyncPipeDataTransferServiceClient.java    |  2 +-
 4 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 520772a9fdd..d8f886f496e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -300,6 +300,14 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       throw e;
     } finally {
       client.setShouldReturnSelf(true);
+      if (isClosed) {
+        try {
+          client.close();
+          client.invalidateAll();
+        } catch (final Exception e) {
+          LOGGER.warn("111");
+        }
+      }
       client.returnSelf();
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 348b5df6b29..9e32a1788f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -429,6 +429,14 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   private void returnClientIfNecessary() {
     if (client != null) {
       client.setShouldReturnSelf(true);
+      if (connector.isClosed()) {
+        try {
+          client.close();
+          client.invalidateAll();
+        } catch (final Exception e) {
+          LOGGER.warn("111");
+        }
+      }
       client.returnSelf();
       client = null;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 79fcc799ae9..0d8719ebeb6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -64,6 +64,9 @@ public class ClientManager<K, V> implements IClientManager<K, 
V> {
    * return of a client is automatic whenever a particular client is used.
    */
   public void returnClient(K node, V client) {
+    if (node == null) {
+      LOGGER.error("{} CAN NOT BE RETURNED", client, new Exception());
+    }
     Optional.ofNullable(node)
         .ifPresent(
             x -> {
@@ -73,6 +76,16 @@ public class ClientManager<K, V> implements 
IClientManager<K, V> {
                 LOGGER.warn("Return client {} for node {} to pool failed.", 
client, node, e);
               }
             });
+    if (node != null) {
+      try {
+        pool.returnObject(node, client);
+      } catch (Exception e) {
+        LOGGER.warn("Return client {} for node {} to pool failed.", client, 
node, e);
+      }
+    } else if (client instanceof ThriftClient) {
+      ((ThriftClient) client).invalidateAll();
+      LOGGER.info("Client {} returned thrift client.", client);
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index fa11bd2c27d..7580b31ff15 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -136,7 +136,7 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, timeout 
dynamically", id);
   }
 
-  private void close() {
+  public void close() {
     ___transport.close();
     ___currentMethod = null;
     LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, closed", id);

Reply via email to