This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch check-shit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bea0585f93d8d3284e696a864f4c4e9ae5d92246 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jul 10 19:13:48 2025 +0800 try fix shit --- .../thrift/async/handler/PipeTransferTrackableHandler.java | 1 + .../java/org/apache/iotdb/commons/client/ClientManager.java | 10 +++++++++- .../client/async/AsyncPipeDataTransferServiceClient.java | 4 ++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index 4908adba8c6..138c0a43564 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -82,6 +82,7 @@ public abstract class PipeTransferTrackableHandler if (connector.isClosed()) { clearEventsReferenceCount(); connector.eliminateHandler(this); + client.returnSelf(); return false; } doTransfer(client, req); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index af274f481ec..a890723e956 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; public class ClientManager<K, V> implements IClientManager<K, V> { @@ -36,10 +37,16 @@ public class ClientManager<K, V> implements IClientManager<K, V> { private final GenericKeyedObjectPool<K, V> pool; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + ClientManager(IClientPoolFactory<K, V> factory) { pool = factory.createClientPool(this); } + public boolean isClosed() { + return isClosed.get(); + } + @TestOnly public GenericKeyedObjectPool<K, V> getPool() { return pool; @@ -47,7 +54,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> { @Override public V borrowClient(K node) throws ClientManagerException { - if (node == null) { + if (node == null || isClosed.get()) { throw new BorrowNullClientManagerException(); } try { @@ -99,6 +106,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> { @Override public void close() { + isClosed.set(true); pool.close(); // we need to release tManagers for AsyncThriftClientFactory if (pool.getFactory() instanceof AsyncThriftClientFactory) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 7580b31ff15..eadbb4435ab 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -116,6 +116,10 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC */ public void returnSelf() { if (shouldReturnSelf.get()) { + if (clientManager.isClosed()) { + this.close(); + this.invalidateAll(); + } clientManager.returnClient(endpoint, this); LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 1", id); }
