This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch check-shit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ccf5de5e27b7331f0622a2b458f49a84b9ac0de3 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jul 10 20:13:17 2025 +0800 fix shit --- .../async/IoTDBDataRegionAsyncConnector.java | 38 +++++----------------- .../handler/PipeTransferTrackableHandler.java | 1 + .../async/handler/PipeTransferTsFileHandler.java | 4 +++ 3 files changed, 14 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 176bcd94cfb..7d88103f815 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -74,7 +74,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -432,34 +431,15 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) throws Exception { transferTsFileCounter.incrementAndGet(); - CompletableFuture<Void> completableFuture = - CompletableFuture.supplyAsync( - () -> { - AsyncPipeDataTransferServiceClient client = null; - try { - client = transferTsFileClientManager.borrowClient(); - pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client); - } catch (final Exception ex) { - logOnClientException(client, ex); - pipeTransferTsFileHandler.onError(ex); - } finally { - transferTsFileCounter.decrementAndGet(); - } - return null; - }, - executor); - - if (PipeConfig.getInstance().isTransferTsFileSync()) { - try { - completableFuture.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.error("Transfer tsfile event asynchronously was interrupted.", e); - throw new PipeException("Transfer tsfile event asynchronously was interrupted.", e); - } catch (Exception e) { - LOGGER.error("Failed to transfer tsfile event asynchronously.", e); - throw e; - } + AsyncPipeDataTransferServiceClient client = null; + try { + client = transferTsFileClientManager.borrowClient(); + pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client); + } catch (final Exception ex) { + logOnClientException(client, ex); + pipeTransferTsFileHandler.onError(ex); + } finally { + transferTsFileCounter.decrementAndGet(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index 138c0a43564..e78cd9adbf3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -82,6 +82,7 @@ public abstract class PipeTransferTrackableHandler if (connector.isClosed()) { clearEventsReferenceCount(); connector.eliminateHandler(this); + client.setShouldReturnSelf(true); client.returnSelf(); return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 9e32a1788f2..8dc74dbf4c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -206,6 +206,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { try { Thread.sleep(20000); + LOGGER.error("sleep out 22", new Throwable()); } catch (InterruptedException e) { } @@ -229,6 +230,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq); try { Thread.sleep(20000); + LOGGER.error("sleep out 33", new Throwable()); } catch (InterruptedException e) { } pipeName2WeightMap.forEach( @@ -250,6 +252,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { public void onComplete(final TPipeTransferResp response) { try { Thread.sleep(20000); + LOGGER.error("sleep out 44", new Throwable()); } catch (InterruptedException e) { } try { @@ -363,6 +366,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { public void onError(final Exception exception) { try { Thread.sleep(20000); + LOGGER.error("onError sleep out 1", new Throwable()); } catch (InterruptedException e) { } try {
