This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8618d26d74a2464ff9791158a82291d83fe69dad Author: Caideyipi <[email protected]> AuthorDate: Sat Aug 9 14:05:10 2025 +0800 Pipe: Avoid unnecessary close-client in async client (Follow up fix for #16008) --- .../thrift/async/IoTDBDataRegionAsyncConnector.java | 8 ++++++-- .../async/handler/PipeTransferTrackableHandler.java | 18 +++++++++++------- .../async/handler/PipeTransferTsFileHandler.java | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index ecdc3cfde39..25d76a65897 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -739,7 +739,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { .forEach( handler -> { handler.clearEventsReferenceCount(); - eliminateHandler(handler); + eliminateHandler(handler, true); }); } @@ -796,7 +796,11 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { pendingHandlers.put(handler, handler); } - public void eliminateHandler(final PipeTransferTrackableHandler handler) { + public void eliminateHandler( + final PipeTransferTrackableHandler handler, final boolean closeClient) { + if (closeClient) { + handler.closeClient(); + } handler.close(); pendingHandlers.remove(handler); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index c2c42641e7c..dc5dce57826 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -46,7 +46,7 @@ public abstract class PipeTransferTrackableHandler public void onComplete(final TPipeTransferResp response) { if (connector.isClosed()) { clearEventsReferenceCount(); - connector.eliminateHandler(this); + connector.eliminateHandler(this, true); return; } @@ -55,7 +55,7 @@ public abstract class PipeTransferTrackableHandler // completed // NOTE: We should not clear the reference count of events, as this would cause the // `org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3` test to fail. - connector.eliminateHandler(this); + connector.eliminateHandler(this, false); } } @@ -63,12 +63,12 @@ public abstract class PipeTransferTrackableHandler public void onError(final Exception exception) { if (connector.isClosed()) { clearEventsReferenceCount(); - connector.eliminateHandler(this); + connector.eliminateHandler(this, true); return; } onErrorInternal(exception); - connector.eliminateHandler(this); + connector.eliminateHandler(this, false); } /** @@ -90,7 +90,7 @@ public abstract class PipeTransferTrackableHandler connector.trackHandler(this); if (connector.isClosed()) { clearEventsReferenceCount(); - connector.eliminateHandler(this); + connector.eliminateHandler(this, true); client.setShouldReturnSelf(true); try { client.returnSelf(); @@ -119,8 +119,7 @@ public abstract class PipeTransferTrackableHandler public abstract void clearEventsReferenceCount(); - @Override - public void close() { + public void closeClient() { if (Objects.isNull(client)) { return; } @@ -135,4 +134,9 @@ public abstract class PipeTransferTrackableHandler e); } } + + @Override + public void close() { + // Do nothing + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index d5388a24d45..fad3a61ce7f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -410,7 +410,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { } if (connector.isClosed()) { - close(); + closeClient(); } client.setShouldReturnSelf(true);
