This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch check-shit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e2eeabf9a8ea28b04e0099c648e37692ff8898f9 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jul 10 18:20:46 2025 +0800 Improve client cleanup and error handling in client managers Adds additional checks and cleanup logic when returning or closing clients, especially when nodes are null or connectors are closed. Enhances error logging and ensures resources are properly invalidated to prevent leaks. Also makes the close() method in AsyncPipeDataTransferServiceClient public for broader access. --- .../connector/client/IoTDBDataNodeAsyncClientManager.java | 8 ++++++++ .../thrift/async/handler/PipeTransferTsFileHandler.java | 8 ++++++++ .../java/org/apache/iotdb/commons/client/ClientManager.java | 13 +++++++++++++ .../client/async/AsyncPipeDataTransferServiceClient.java | 2 +- 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 520772a9fdd..d8f886f496e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -300,6 +300,14 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager throw e; } finally { client.setShouldReturnSelf(true); + if (isClosed) { + try { + client.close(); + client.invalidateAll(); + } catch (final Exception e) { + LOGGER.warn("111"); + } + } client.returnSelf(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 348b5df6b29..9e32a1788f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -429,6 +429,14 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { private void returnClientIfNecessary() { if (client != null) { client.setShouldReturnSelf(true); + if (connector.isClosed()) { + try { + client.close(); + client.invalidateAll(); + } catch (final Exception e) { + LOGGER.warn("111"); + } + } client.returnSelf(); client = null; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 79fcc799ae9..0d8719ebeb6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -64,6 +64,9 @@ public class ClientManager<K, V> implements IClientManager<K, V> { * return of a client is automatic whenever a particular client is used. */ public void returnClient(K node, V client) { + if (node == null) { + LOGGER.error("{} CAN NOT BE RETURNED", client, new Exception()); + } Optional.ofNullable(node) .ifPresent( x -> { @@ -73,6 +76,16 @@ public class ClientManager<K, V> implements IClientManager<K, V> { LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e); } }); + if (node != null) { + try { + pool.returnObject(node, client); + } catch (Exception e) { + LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e); + } + } else if (client instanceof ThriftClient) { + ((ThriftClient) client).invalidateAll(); + LOGGER.info("Client {} returned thrift client.", client); + } } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index fa11bd2c27d..7580b31ff15 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -136,7 +136,7 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, timeout dynamically", id); } - private void close() { + public void close() { ___transport.close(); ___currentMethod = null; LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, closed", id);
