This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 061d2de0285 Pipe:Fix DataNodeShutdownHook waiting report logic and add 
capture history file log (#15952)
061d2de0285 is described below

commit 061d2de02851ca9332325dbdf59d10883b6c3f6e
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jul 16 14:35:22 2025 +0800

    Pipe:Fix DataNodeShutdownHook waiting report logic and add capture history 
file log (#15952)
    
    * Pipe:Fix DataNodeShutdownHook waiting report logic and add capture 
history file log
    
    * fix
---
 ...oricalDataRegionTsFileAndDeletionExtractor.java | 22 ++++++++++++++++------
 .../iotdb/db/service/DataNodeShutdownHook.java     |  2 +-
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index e18c2907f52..f752b2c50c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -703,13 +703,23 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
       // instead of replication or something else.
       ProgressIndex dedicatedProgressIndex =
           
tryToExtractLocalProgressIndexForIoTV2(resource.getMaxProgressIndexAfterClose());
-      return greaterThanStartIndex(dedicatedProgressIndex);
+      return greaterThanStartIndex(resource, dedicatedProgressIndex);
     }
-    return greaterThanStartIndex(resource.getMaxProgressIndexAfterClose());
+    return greaterThanStartIndex(resource, 
resource.getMaxProgressIndexAfterClose());
   }
 
-  private boolean greaterThanStartIndex(ProgressIndex progressIndex) {
-    return !startIndex.isAfter(progressIndex) && 
!startIndex.equals(progressIndex);
+  private boolean greaterThanStartIndex(PersistentResource resource, 
ProgressIndex progressIndex) {
+    if (!startIndex.isAfter(progressIndex) && 
!startIndex.equals(progressIndex)) {
+      LOGGER.info(
+          "Pipe {}@{}: resource {} meets mayTsFileContainUnprocessedData 
condition, extractor progressIndex: {}, resource ProgressIndex: {}",
+          pipeName,
+          dataRegionId,
+          resource,
+          startIndex,
+          progressIndex);
+      return true;
+    }
+    return false;
   }
 
   private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource 
resource) {
@@ -809,7 +819,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
               if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
                 toBeCompared = 
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
               }
-              return !greaterThanStartIndex(toBeCompared);
+              return !greaterThanStartIndex(resource, toBeCompared);
             })
         .forEach(DeletionResource::decreaseReference);
     // Get deletions that should be sent.
@@ -821,7 +831,7 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
                   if 
(pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
                     toBeCompared = 
tryToExtractLocalProgressIndexForIoTV2(toBeCompared);
                   }
-                  return greaterThanStartIndex(toBeCompared);
+                  return greaterThanStartIndex(resource, toBeCompared);
                 })
             .collect(Collectors.toList());
     resourceList.addAll(allDeletionResources);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 9c31923b7bc..e9b125818fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -128,7 +128,7 @@ public class DataNodeShutdownHook extends Thread {
           
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet())
 {
         boolean timeout = false;
         while (true) {
-          if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) {
+          if (entry.getValue().getRemainingNonHeartbeatEvents() == 0) {
             logger.info(
                 "Successfully waited for pipe {} to finish.", 
entry.getValue().getPipeName());
             break;

Reply via email to