This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-report-in-extractor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0642386a47b2020df54e254511ad9e975cef8d90
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 17 11:03:13 2025 +0800

    Pipe: Report progress index for filter-outed tsfile events in 
PipeHistoricalDataRegionTsFileExtractor
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 41 ++++++++++++++++++----
 1 file changed, 34 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 25759654a5b..4b6b11efed0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
@@ -57,6 +58,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -124,6 +126,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private volatile boolean hasBeenStarted = false;
 
   private Queue<TsFileResource> pendingQueue;
+  private final Set<TsFileResource> filteredTsFileResources = new HashSet<>();
 
   @Override
   public void validate(final PipeParameterValidator validator) {
@@ -364,7 +367,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       try {
         final int originalSequenceTsFileCount = tsFileManager.size(true);
         final int originalUnSequenceTsFileCount = tsFileManager.size(false);
-        final List<TsFileResource> resourceList =
+        final List<TsFileResource> originalResourceList =
             new ArrayList<>(originalSequenceTsFileCount + 
originalUnSequenceTsFileCount);
         LOGGER.info(
             "Pipe {}@{}: start to extract historical TsFile, original sequence 
file count {}, "
@@ -377,6 +380,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
         final Collection<TsFileResource> sequenceTsFileResources =
             tsFileManager.getTsFileList(true).stream()
+                .peek(originalResourceList::add)
                 .filter(
                     resource ->
                         // Some resource is marked as deleted but not removed 
from the list.
@@ -397,10 +401,11 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                                     && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
                                     && 
mayTsFileResourceOverlappedWithPattern(resource)))
                 .collect(Collectors.toList());
-        resourceList.addAll(sequenceTsFileResources);
+        filteredTsFileResources.addAll(sequenceTsFileResources);
 
         final Collection<TsFileResource> unSequenceTsFileResources =
             tsFileManager.getTsFileList(false).stream()
+                .peek(originalResourceList::add)
                 .filter(
                     resource ->
                         // Some resource is marked as deleted but not removed 
from the list.
@@ -421,9 +426,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                                     && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
                                     && 
mayTsFileResourceOverlappedWithPattern(resource)))
                 .collect(Collectors.toList());
-        resourceList.addAll(unSequenceTsFileResources);
+        filteredTsFileResources.addAll(unSequenceTsFileResources);
 
-        resourceList.removeIf(
+        filteredTsFileResources.removeIf(
             resource -> {
               // Pin the resource, in case the file is removed by compaction 
or anything.
               // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
@@ -437,12 +442,12 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
               }
             });
 
-        resourceList.sort(
+        originalResourceList.sort(
             (o1, o2) ->
                 startIndex instanceof TimeWindowStateProgressIndex
                     ? Long.compare(o1.getFileStartTime(), 
o2.getFileStartTime())
                     : 
o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex()));
-        pendingQueue = new ArrayDeque<>(resourceList);
+        pendingQueue = new ArrayDeque<>(originalResourceList);
 
         LOGGER.info(
             "Pipe {}@{}: finish to extract historical TsFile, extracted 
sequence file count {}/{}, "
@@ -453,7 +458,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             originalSequenceTsFileCount,
             unSequenceTsFileResources.size(),
             originalUnSequenceTsFileCount,
-            resourceList.size(),
+            filteredTsFileResources.size(),
             originalSequenceTsFileCount + originalUnSequenceTsFileCount,
             System.currentTimeMillis() - startHistoricalExtractionTime);
       } finally {
@@ -567,6 +572,28 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       return terminateEvent;
     }
 
+    if (!filteredTsFileResources.contains(resource)) {
+      final ProgressReportEvent progressReportEvent =
+          new ProgressReportEvent(
+              pipeName,
+              creationTime,
+              pipeTaskMeta,
+              pipePattern,
+              historicalDataExtractionStartTime,
+              historicalDataExtractionEndTime);
+      progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
+      final boolean isReferenceCountIncreased =
+          progressReportEvent.increaseReferenceCount(
+              PipeHistoricalDataRegionTsFileExtractor.class.getName());
+      if (!isReferenceCountIncreased) {
+        LOGGER.warn(
+            "The reference count of the event {} cannot be increased, skipping 
it.",
+            progressReportEvent);
+      }
+      return isReferenceCountIncreased ? progressReportEvent : null;
+    }
+
+    filteredTsFileResources.remove(resource);
     final PipeTsFileInsertionEvent event =
         new PipeTsFileInsertionEvent(
             resource,

Reply via email to