This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-borrow-timeout-opt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8251bde754a65b48ae355ca43d92bf30f1969086 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jul 23 15:06:58 2025 +0800 Pipe: Fix and improve async tsfile transfer error handling and logging Refactored IoTDBDataRegionAsyncConnector to handle exceptions during asynchronous tsfile transfer more gracefully. Now logs warnings instead of errors, invokes onError on the handler, and provides more context in log messages. Added getTsFile() method to PipeTransferTsFileHandler for better logging, and ensured memoryBlock is set to null after closing to prevent potential resource leaks. --- .../async/IoTDBDataRegionAsyncConnector.java | 24 ++++++++++++++-------- .../async/handler/PipeTransferTsFileHandler.java | 6 ++++++ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 1b743530421..6195b5b5a50 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -425,8 +425,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { } } - private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) - throws Exception { + private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) { transferTsFileCounter.incrementAndGet(); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync( @@ -448,13 +447,20 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { if (PipeConfig.getInstance().isTransferTsFileSync() || !isRealtimeFirst) { try { completableFuture.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.error("Transfer tsfile event asynchronously was interrupted.", e); - throw new PipeException("Transfer tsfile event asynchronously was interrupted.", e); - } catch (Exception e) { - LOGGER.error("Failed to transfer tsfile event asynchronously.", e); - throw e; + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + LOGGER.warn( + "Transfer tsfile event {} asynchronously was interrupted.", + pipeTransferTsFileHandler.getTsFile(), + e); + } + + pipeTransferTsFileHandler.onError(e); + LOGGER.warn( + "Failed to transfer tsfile event {} asynchronously.", + pipeTransferTsFileHandler.getTsFile(), + e); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index f3b48a6ff8a..2c8097f72c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -133,6 +133,10 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { isSealSignalSent = new AtomicBoolean(false); } + public File getTsFile() { + return tsFile; + } + public void transfer( final IoTDBDataNodeAsyncClientManager clientManager, final AsyncPipeDataTransferServiceClient client) @@ -460,8 +464,10 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { @Override public void close() { super.close(); + if (memoryBlock != null) { memoryBlock.close(); + memoryBlock = null; } }
