This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch check-shit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bea0585f93d8d3284e696a864f4c4e9ae5d92246
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 10 19:13:48 2025 +0800

    try fix shit
---
 .../thrift/async/handler/PipeTransferTrackableHandler.java     |  1 +
 .../java/org/apache/iotdb/commons/client/ClientManager.java    | 10 +++++++++-
 .../client/async/AsyncPipeDataTransferServiceClient.java       |  4 ++++
 3 files changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 4908adba8c6..138c0a43564 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -82,6 +82,7 @@ public abstract class PipeTransferTrackableHandler
     if (connector.isClosed()) {
       clearEventsReferenceCount();
       connector.eliminateHandler(this);
+      client.returnSelf();
       return false;
     }
     doTransfer(client, req);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index af274f481ec..a890723e956 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class ClientManager<K, V> implements IClientManager<K, V> {
 
@@ -36,10 +37,16 @@ public class ClientManager<K, V> implements 
IClientManager<K, V> {
 
   private final GenericKeyedObjectPool<K, V> pool;
 
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
   ClientManager(IClientPoolFactory<K, V> factory) {
     pool = factory.createClientPool(this);
   }
 
+  public boolean isClosed() {
+    return isClosed.get();
+  }
+
   @TestOnly
   public GenericKeyedObjectPool<K, V> getPool() {
     return pool;
@@ -47,7 +54,7 @@ public class ClientManager<K, V> implements IClientManager<K, 
V> {
 
   @Override
   public V borrowClient(K node) throws ClientManagerException {
-    if (node == null) {
+    if (node == null || isClosed.get()) {
       throw new BorrowNullClientManagerException();
     }
     try {
@@ -99,6 +106,7 @@ public class ClientManager<K, V> implements 
IClientManager<K, V> {
 
   @Override
   public void close() {
+    isClosed.set(true);
     pool.close();
     // we need to release tManagers for AsyncThriftClientFactory
     if (pool.getFactory() instanceof AsyncThriftClientFactory) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 7580b31ff15..eadbb4435ab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -116,6 +116,10 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
    */
   public void returnSelf() {
     if (shouldReturnSelf.get()) {
+      if (clientManager.isClosed()) {
+        this.close();
+        this.invalidateAll();
+      }
       clientManager.returnClient(endpoint, this);
       LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 1", 
id);
     }

Reply via email to