This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1fbbdb3bece89b523ed0e23211289760a1455637
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jul 16 14:35:16 2025 +0800

    [To dev/1.3] Pipe: Fix DataNodeShutdownHook waiting report logic and add 
capture history file log (#15952) (#15951)
    
    * Pipe:Fix DataNodeShutdownHook waiting report logic and add capture 
history file log
    
    * update log
---
 .../PipeHistoricalDataRegionTsFileExtractor.java          | 15 +++++++++++++--
 .../org/apache/iotdb/db/service/DataNodeShutdownHook.java |  2 +-
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index c1c4893ae8a..25759654a5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -473,8 +473,19 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     if (startIndex instanceof StateProgressIndex) {
       startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
     }
-    return !startIndex.isAfter(resource.getMaxProgressIndex())
-        && !startIndex.equals(resource.getMaxProgressIndex());
+
+    if (!startIndex.isAfter(resource.getMaxProgressIndex())
+        && !startIndex.equals(resource.getMaxProgressIndex())) {
+      LOGGER.info(
+          "Pipe {}@{}: file {} meets mayTsFileContainUnprocessedData 
condition, extractor progressIndex: {}, resource ProgressIndex: {}",
+          pipeName,
+          dataRegionId,
+          resource.getTsFilePath(),
+          startIndex,
+          resource.getMaxProgressIndex());
+      return true;
+    }
+    return false;
   }
 
   private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource 
resource) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 5b62a0d614c..1a6af1cf9b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -97,7 +97,7 @@ public class DataNodeShutdownHook extends Thread {
           
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet())
 {
         boolean timeout = false;
         while (true) {
-          if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) {
+          if (entry.getValue().getRemainingNonHeartbeatEvents() == 0) {
             logger.info(
                 "Successfully waited for pipe {} to finish.", 
entry.getValue().getPipeName());
             break;

Reply via email to