This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5bd1503be71 Pipe: Report progress index for filter-outed tsfile events 
in PipeHistoricalDataRegionTsFileExtractor (#15963)
5bd1503be71 is described below

commit 5bd1503be715f0f7945641666d612b0cc25a38db
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 17 12:30:41 2025 +0800

    Pipe: Report progress index for filter-outed tsfile events in 
PipeHistoricalDataRegionTsFileExtractor (#15963)
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 41 ++++++++++++++++++----
 1 file changed, 34 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 25759654a5b..4b6b11efed0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
@@ -57,6 +58,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -124,6 +126,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private volatile boolean hasBeenStarted = false;
 
   private Queue<TsFileResource> pendingQueue;
+  private final Set<TsFileResource> filteredTsFileResources = new HashSet<>();
 
   @Override
   public void validate(final PipeParameterValidator validator) {
@@ -364,7 +367,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       try {
         final int originalSequenceTsFileCount = tsFileManager.size(true);
         final int originalUnSequenceTsFileCount = tsFileManager.size(false);
-        final List<TsFileResource> resourceList =
+        final List<TsFileResource> originalResourceList =
             new ArrayList<>(originalSequenceTsFileCount + 
originalUnSequenceTsFileCount);
         LOGGER.info(
             "Pipe {}@{}: start to extract historical TsFile, original sequence 
file count {}, "
@@ -377,6 +380,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
         final Collection<TsFileResource> sequenceTsFileResources =
             tsFileManager.getTsFileList(true).stream()
+                .peek(originalResourceList::add)
                 .filter(
                     resource ->
                         // Some resource is marked as deleted but not removed 
from the list.
@@ -397,10 +401,11 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                                     && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
                                     && 
mayTsFileResourceOverlappedWithPattern(resource)))
                 .collect(Collectors.toList());
-        resourceList.addAll(sequenceTsFileResources);
+        filteredTsFileResources.addAll(sequenceTsFileResources);
 
         final Collection<TsFileResource> unSequenceTsFileResources =
             tsFileManager.getTsFileList(false).stream()
+                .peek(originalResourceList::add)
                 .filter(
                     resource ->
                         // Some resource is marked as deleted but not removed 
from the list.
@@ -421,9 +426,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                                     && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
                                     && 
mayTsFileResourceOverlappedWithPattern(resource)))
                 .collect(Collectors.toList());
-        resourceList.addAll(unSequenceTsFileResources);
+        filteredTsFileResources.addAll(unSequenceTsFileResources);
 
-        resourceList.removeIf(
+        filteredTsFileResources.removeIf(
             resource -> {
               // Pin the resource, in case the file is removed by compaction 
or anything.
               // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
@@ -437,12 +442,12 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
               }
             });
 
-        resourceList.sort(
+        originalResourceList.sort(
             (o1, o2) ->
                 startIndex instanceof TimeWindowStateProgressIndex
                     ? Long.compare(o1.getFileStartTime(), 
o2.getFileStartTime())
                     : 
o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex()));
-        pendingQueue = new ArrayDeque<>(resourceList);
+        pendingQueue = new ArrayDeque<>(originalResourceList);
 
         LOGGER.info(
             "Pipe {}@{}: finish to extract historical TsFile, extracted 
sequence file count {}/{}, "
@@ -453,7 +458,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             originalSequenceTsFileCount,
             unSequenceTsFileResources.size(),
             originalUnSequenceTsFileCount,
-            resourceList.size(),
+            filteredTsFileResources.size(),
             originalSequenceTsFileCount + originalUnSequenceTsFileCount,
             System.currentTimeMillis() - startHistoricalExtractionTime);
       } finally {
@@ -567,6 +572,28 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       return terminateEvent;
     }
 
+    if (!filteredTsFileResources.contains(resource)) {
+      final ProgressReportEvent progressReportEvent =
+          new ProgressReportEvent(
+              pipeName,
+              creationTime,
+              pipeTaskMeta,
+              pipePattern,
+              historicalDataExtractionStartTime,
+              historicalDataExtractionEndTime);
+      progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
+      final boolean isReferenceCountIncreased =
+          progressReportEvent.increaseReferenceCount(
+              PipeHistoricalDataRegionTsFileExtractor.class.getName());
+      if (!isReferenceCountIncreased) {
+        LOGGER.warn(
+            "The reference count of the event {} cannot be increased, skipping 
it.",
+            progressReportEvent);
+      }
+      return isReferenceCountIncreased ? progressReportEvent : null;
+    }
+
+    filteredTsFileResources.remove(resource);
     final PipeTsFileInsertionEvent event =
         new PipeTsFileInsertionEvent(
             resource,

Reply via email to