This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1fbbdb3bece89b523ed0e23211289760a1455637 Author: Zhenyu Luo <[email protected]> AuthorDate: Wed Jul 16 14:35:16 2025 +0800 [To dev/1.3] Pipe: Fix DataNodeShutdownHook waiting report logic and add capture history file log (#15952) (#15951) * Pipe:Fix DataNodeShutdownHook waiting report logic and add capture history file log * update log --- .../PipeHistoricalDataRegionTsFileExtractor.java | 15 +++++++++++++-- .../org/apache/iotdb/db/service/DataNodeShutdownHook.java | 2 +- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index c1c4893ae8a..25759654a5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -473,8 +473,19 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa if (startIndex instanceof StateProgressIndex) { startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex(); } - return !startIndex.isAfter(resource.getMaxProgressIndex()) - && !startIndex.equals(resource.getMaxProgressIndex()); + + if (!startIndex.isAfter(resource.getMaxProgressIndex()) + && !startIndex.equals(resource.getMaxProgressIndex())) { + LOGGER.info( + "Pipe {}@{}: file {} meets mayTsFileContainUnprocessedData condition, extractor progressIndex: {}, resource ProgressIndex: {}", + pipeName, + dataRegionId, + resource.getTsFilePath(), + startIndex, + resource.getMaxProgressIndex()); + return true; + } + return false; } private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 5b62a0d614c..1a6af1cf9b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -97,7 +97,7 @@ public class DataNodeShutdownHook extends Thread { PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet()) { boolean timeout = false; while (true) { - if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) { + if (entry.getValue().getRemainingNonHeartbeatEvents() == 0) { logger.info( "Successfully waited for pipe {} to finish.", entry.getValue().getPipeName()); break;
