This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-borrow-timeout-opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8251bde754a65b48ae355ca43d92bf30f1969086
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jul 23 15:06:58 2025 +0800

    Pipe: Fix and improve async tsfile transfer error handling and logging
    
    Refactored IoTDBDataRegionAsyncConnector to handle exceptions during 
asynchronous tsfile transfer more gracefully. Now logs warnings instead of 
errors, invokes onError on the handler, and provides more context in log 
messages. Added getTsFile() method to PipeTransferTsFileHandler for better 
logging, and ensured memoryBlock is set to null after closing to prevent 
potential resource leaks.
---
 .../async/IoTDBDataRegionAsyncConnector.java       | 24 ++++++++++++++--------
 .../async/handler/PipeTransferTsFileHandler.java   |  6 ++++++
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 1b743530421..6195b5b5a50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -425,8 +425,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
   }
 
-  private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler)
-      throws Exception {
+  private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler) {
     transferTsFileCounter.incrementAndGet();
     CompletableFuture<Void> completableFuture =
         CompletableFuture.supplyAsync(
@@ -448,13 +447,20 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     if (PipeConfig.getInstance().isTransferTsFileSync() || !isRealtimeFirst) {
       try {
         completableFuture.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOGGER.error("Transfer tsfile event asynchronously was interrupted.", 
e);
-        throw new PipeException("Transfer tsfile event asynchronously was 
interrupted.", e);
-      } catch (Exception e) {
-        LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
-        throw e;
+      } catch (final Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+          LOGGER.warn(
+              "Transfer tsfile event {} asynchronously was interrupted.",
+              pipeTransferTsFileHandler.getTsFile(),
+              e);
+        }
+
+        pipeTransferTsFileHandler.onError(e);
+        LOGGER.warn(
+            "Failed to transfer tsfile event {} asynchronously.",
+            pipeTransferTsFileHandler.getTsFile(),
+            e);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index f3b48a6ff8a..2c8097f72c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -133,6 +133,10 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     isSealSignalSent = new AtomicBoolean(false);
   }
 
+  public File getTsFile() {
+    return tsFile;
+  }
+
   public void transfer(
       final IoTDBDataNodeAsyncClientManager clientManager,
       final AsyncPipeDataTransferServiceClient client)
@@ -460,8 +464,10 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
   @Override
   public void close() {
     super.close();
+
     if (memoryBlock != null) {
       memoryBlock.close();
+      memoryBlock = null;
     }
   }
 

Reply via email to