This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch check-shit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ccf5de5e27b7331f0622a2b458f49a84b9ac0de3
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 10 20:13:17 2025 +0800

    fix shit
---
 .../async/IoTDBDataRegionAsyncConnector.java       | 38 +++++-----------------
 .../handler/PipeTransferTrackableHandler.java      |  1 +
 .../async/handler/PipeTransferTsFileHandler.java   |  4 +++
 3 files changed, 14 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 176bcd94cfb..7d88103f815 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -74,7 +74,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -432,34 +431,15 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler)
       throws Exception {
     transferTsFileCounter.incrementAndGet();
-    CompletableFuture<Void> completableFuture =
-        CompletableFuture.supplyAsync(
-            () -> {
-              AsyncPipeDataTransferServiceClient client = null;
-              try {
-                client = transferTsFileClientManager.borrowClient();
-                
pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
-              } catch (final Exception ex) {
-                logOnClientException(client, ex);
-                pipeTransferTsFileHandler.onError(ex);
-              } finally {
-                transferTsFileCounter.decrementAndGet();
-              }
-              return null;
-            },
-            executor);
-
-    if (PipeConfig.getInstance().isTransferTsFileSync()) {
-      try {
-        completableFuture.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.error("Transfer tsfile event asynchronously was interrupted.", 
e);
-        throw new PipeException("Transfer tsfile event asynchronously was 
interrupted.", e);
-      } catch (Exception e) {
-        LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
-        throw e;
-      }
+    AsyncPipeDataTransferServiceClient client = null;
+    try {
+      client = transferTsFileClientManager.borrowClient();
+      pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
+    } catch (final Exception ex) {
+      logOnClientException(client, ex);
+      pipeTransferTsFileHandler.onError(ex);
+    } finally {
+      transferTsFileCounter.decrementAndGet();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 138c0a43564..e78cd9adbf3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -82,6 +82,7 @@ public abstract class PipeTransferTrackableHandler
     if (connector.isClosed()) {
       clearEventsReferenceCount();
       connector.eliminateHandler(this);
+      client.setShouldReturnSelf(true);
       client.returnSelf();
       return false;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 9e32a1788f2..8dc74dbf4c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -206,6 +206,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
 
         try {
           Thread.sleep(20000);
+          LOGGER.error("sleep out 22", new Throwable());
         } catch (InterruptedException e) {
         }
 
@@ -229,6 +230,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);
     try {
       Thread.sleep(20000);
+      LOGGER.error("sleep out 33", new Throwable());
     } catch (InterruptedException e) {
     }
     pipeName2WeightMap.forEach(
@@ -250,6 +252,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   public void onComplete(final TPipeTransferResp response) {
     try {
       Thread.sleep(20000);
+      LOGGER.error("sleep out 44", new Throwable());
     } catch (InterruptedException e) {
     }
     try {
@@ -363,6 +366,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   public void onError(final Exception exception) {
     try {
       Thread.sleep(20000);
+      LOGGER.error("onError sleep out 1", new Throwable());
     } catch (InterruptedException e) {
     }
     try {

Reply via email to