This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bf5c1922f0c4bef1dc7056f99cde3757b9dac839 Author: Caideyipi <[email protected]> AuthorDate: Fri Aug 8 12:06:02 2025 +0800 [To dev/1.3] Pipe: Fix and improve async tsfile transfer error handling and logging (avoid client connection leak) (#16008) (#16125) * Pipe: Fix and improve async tsfile transfer error handling and logging Refactored IoTDBDataRegionAsyncConnector to handle exceptions during asynchronous tsfile transfer more gracefully. Now logs warnings instead of errors, invokes onError on the handler, and provides more context in log messages. Added getTsFile() method to PipeTransferTsFileHandler for better logging, and ensured memoryBlock is set to null after closing to prevent potential resource leaks. * fix * fix * bald-logger --------- Co-authored-by: Steve Yurong Su <[email protected]> --- .../async/IoTDBDataRegionAsyncConnector.java | 24 ++++++++++------- .../handler/PipeTransferTrackableHandler.java | 31 +++++++++++++++++++++- .../async/handler/PipeTransferTsFileHandler.java | 24 ++++++++++++++--- 3 files changed, 65 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 0dacf0cb912..ecdc3cfde39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -396,8 +396,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { } } - private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) - throws Exception { + private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) { transferTsFileCounter.incrementAndGet(); CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync( @@ -419,13 +418,20 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { if (PipeConfig.getInstance().isTransferTsFileSync() || !isRealtimeFirst) { try { completableFuture.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.error("Transfer tsfile event asynchronously was interrupted.", e); - throw new PipeException("Transfer tsfile event asynchronously was interrupted.", e); - } catch (Exception e) { - LOGGER.error("Failed to transfer tsfile event asynchronously.", e); - throw e; + } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + LOGGER.warn( + "Transfer tsfile event {} asynchronously was interrupted.", + pipeTransferTsFileHandler.getTsFile(), + e); + } + + pipeTransferTsFileHandler.onError(e); + LOGGER.warn( + "Failed to transfer tsfile event {} asynchronously.", + pipeTransferTsFileHandler.getTsFile(), + e); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index 4908adba8c6..c2c42641e7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -26,11 +26,17 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; public abstract class PipeTransferTrackableHandler implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class); protected final IoTDBDataRegionAsyncConnector connector; + protected volatile AsyncPipeDataTransferServiceClient client; public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncConnector connector) { this.connector = connector; @@ -77,11 +83,22 @@ public abstract class PipeTransferTrackableHandler protected boolean tryTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { + if (Objects.isNull(this.client)) { + this.client = client; + } // track handler before checking if connector is closed connector.trackHandler(this); if (connector.isClosed()) { clearEventsReferenceCount(); connector.eliminateHandler(this); + client.setShouldReturnSelf(true); + try { + client.returnSelf(); + } catch (final IllegalStateException e) { + LOGGER.info( + "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); + } + this.client = null; return false; } doTransfer(client, req); @@ -104,6 +121,18 @@ public abstract class PipeTransferTrackableHandler @Override public void close() { - // do nothing + if (Objects.isNull(client)) { + return; + } + try { + client.close(); + client.invalidateAll(); + } catch (final Exception e) { + LOGGER.warn( + "Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}", + client, + e.getMessage(), + e); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 7353ea91e91..d5388a24d45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -86,7 +86,6 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { private final AtomicBoolean isSealSignalSent; private IoTDBDataNodeAsyncClientManager clientManager; - private volatile AsyncPipeDataTransferServiceClient client; public PipeTransferTsFileHandler( final IoTDBDataRegionAsyncConnector connector, @@ -128,6 +127,10 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { isSealSignalSent = new AtomicBoolean(false); } + public File getTsFile() { + return tsFile; + } + public void transfer( final IoTDBDataNodeAsyncClientManager clientManager, final AsyncPipeDataTransferServiceClient client) @@ -402,11 +405,22 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { } private void returnClientIfNecessary() { - if (client != null) { - client.setShouldReturnSelf(true); + if (client == null) { + return; + } + + if (connector.isClosed()) { + close(); + } + + client.setShouldReturnSelf(true); + try { client.returnSelf(); - client = null; + } catch (final IllegalStateException e) { + LOGGER.info( + "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); } + client = null; } @Override @@ -432,8 +446,10 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { @Override public void close() { super.close(); + if (memoryBlock != null) { memoryBlock.close(); + memoryBlock = null; } }
