This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8618d26d74a2464ff9791158a82291d83fe69dad
Author: Caideyipi <[email protected]>
AuthorDate: Sat Aug 9 14:05:10 2025 +0800

    Pipe: Avoid unnecessary close-client in async client (Follow up fix for 
#16008)
---
 .../thrift/async/IoTDBDataRegionAsyncConnector.java    |  8 ++++++--
 .../async/handler/PipeTransferTrackableHandler.java    | 18 +++++++++++-------
 .../async/handler/PipeTransferTsFileHandler.java       |  2 +-
 3 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index ecdc3cfde39..25d76a65897 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -739,7 +739,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
           .forEach(
               handler -> {
                 handler.clearEventsReferenceCount();
-                eliminateHandler(handler);
+                eliminateHandler(handler, true);
               });
     }
 
@@ -796,7 +796,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     pendingHandlers.put(handler, handler);
   }
 
-  public void eliminateHandler(final PipeTransferTrackableHandler handler) {
+  public void eliminateHandler(
+      final PipeTransferTrackableHandler handler, final boolean closeClient) {
+    if (closeClient) {
+      handler.closeClient();
+    }
     handler.close();
     pendingHandlers.remove(handler);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index c2c42641e7c..dc5dce57826 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -46,7 +46,7 @@ public abstract class PipeTransferTrackableHandler
   public void onComplete(final TPipeTransferResp response) {
     if (connector.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, true);
       return;
     }
 
@@ -55,7 +55,7 @@ public abstract class PipeTransferTrackableHandler
       // completed
       // NOTE: We should not clear the reference count of events, as this 
would cause the
       // 
`org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3`
 test to fail.
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, false);
     }
   }
 
@@ -63,12 +63,12 @@ public abstract class PipeTransferTrackableHandler
   public void onError(final Exception exception) {
     if (connector.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, true);
       return;
     }
 
     onErrorInternal(exception);
-    connector.eliminateHandler(this);
+    connector.eliminateHandler(this, false);
   }
 
   /**
@@ -90,7 +90,7 @@ public abstract class PipeTransferTrackableHandler
     connector.trackHandler(this);
     if (connector.isClosed()) {
       clearEventsReferenceCount();
-      connector.eliminateHandler(this);
+      connector.eliminateHandler(this, true);
       client.setShouldReturnSelf(true);
       try {
         client.returnSelf();
@@ -119,8 +119,7 @@ public abstract class PipeTransferTrackableHandler
 
   public abstract void clearEventsReferenceCount();
 
-  @Override
-  public void close() {
+  public void closeClient() {
     if (Objects.isNull(client)) {
       return;
     }
@@ -135,4 +134,9 @@ public abstract class PipeTransferTrackableHandler
           e);
     }
   }
+
+  @Override
+  public void close() {
+    // Do nothing
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index d5388a24d45..fad3a61ce7f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -410,7 +410,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     }
 
     if (connector.isClosed()) {
-      close();
+      closeClient();
     }
 
     client.setShouldReturnSelf(true);

Reply via email to