This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1b3b1a14d664d99cf9e7e6991e2e1a4e51ce80aa
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Dec 2 11:29:47 2025 +0800

    Pipe: Ignore logging when `returnSelf` is called in the event of an 
exception in `AsyncClient`. (#16827)
    
    (cherry picked from commit e3127e6c61f13f91c95c851d49253c4d668dff8e)
---
 .../handler/PipeTransferTrackableHandler.java      | 15 +++++++------
 .../async/handler/PipeTransferTsFileHandler.java   | 15 +++++++------
 .../apache/iotdb/commons/client/ClientManager.java | 25 ++++++++++++++++++++++
 .../async/AsyncPipeDataTransferServiceClient.java  | 14 +++++++++++-
 4 files changed, 56 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index d0d6d054299..8a552f0cbee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -97,12 +97,15 @@ public abstract class PipeTransferTrackableHandler
       clearEventsReferenceCount();
       connector.eliminateHandler(this, true);
       client.setShouldReturnSelf(true);
-      try {
-        client.returnSelf();
-      } catch (final IllegalStateException e) {
-        LOGGER.info(
-            "Illegal state when return the client to object pool, maybe the 
pool is already cleared. Will ignore.");
-      }
+      client.returnSelf(
+          (e) -> {
+            if (e instanceof IllegalStateException) {
+              LOGGER.info(
+                  "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
+              return true;
+            }
+            return false;
+          });
       this.client = null;
       return false;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3cdb2abda15..3ef46034554 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -431,12 +431,15 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
 
     client.setShouldReturnSelf(true);
-    try {
-      client.returnSelf();
-    } catch (final IllegalStateException e) {
-      LOGGER.info(
-          "Illegal state when return the client to object pool, maybe the pool 
is already cleared. Will ignore.");
-    }
+    client.returnSelf(
+        (e) -> {
+          if (e instanceof IllegalStateException) {
+            LOGGER.info(
+                "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
+            return true;
+          }
+          return false;
+        });
     client = null;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 0915e69802e..41d779c4398 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
+import java.util.function.Function;
 
 public class ClientManager<K, V> implements IClientManager<K, V> {
 
@@ -79,6 +80,30 @@ public class ClientManager<K, V> implements 
IClientManager<K, V> {
     }
   }
 
+  /**
+   * return a client V for node K to the {@link ClientManager}, and ignore 
some exception
+   *
+   * <p>Note: We do not define this interface in {@link IClientManager} to 
make you aware that the
+   * return of a client is automatic whenever a particular client is used.
+   */
+  public void returnClient(K node, V client, Function<Exception, Boolean> 
ignoreError) {
+    if (node != null) {
+      try {
+        pool.returnObject(node, client);
+      } catch (Exception e) {
+        if (!Boolean.TRUE.equals(ignoreError.apply(e))) {
+          LOGGER.warn("Return client {} for node {} to pool failed.", client, 
node, e);
+        }
+      }
+    } else if (client instanceof ThriftClient) {
+      ((ThriftClient) client).invalidateAll();
+      LOGGER.warn(
+          "Return client {} to pool failed because the node is null. "
+              + "This may cause resource leak, please check your code.",
+          client);
+    }
+  }
+
   @Override
   public void clear(K node) {
     Optional.ofNullable(node)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index f662b697ebf..e5a89215817 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 
 public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncClient
     implements ThriftClient {
@@ -84,7 +85,8 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
   public void onError(final Exception e) {
     super.onError(e);
     ThriftClient.resolveException(e, this);
-    returnSelf();
+    returnSelf(
+        (i) -> i instanceof IllegalStateException && "Client has an 
error!".equals(i.getMessage()));
   }
 
   @Override
@@ -114,6 +116,16 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     }
   }
 
+  /**
+   * return self, the method doesn't need to be called by the user and will be 
triggered after the
+   * RPC is finished.
+   */
+  public void returnSelf(Function<Exception, Boolean> ignoreError) {
+    if (shouldReturnSelf.get()) {
+      clientManager.returnClient(endpoint, this, ignoreError);
+    }
+  }
+
   public void setShouldReturnSelf(final boolean shouldReturnSelf) {
     this.shouldReturnSelf.set(shouldReturnSelf);
   }

Reply via email to