This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-report-in-extractor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0642386a47b2020df54e254511ad9e975cef8d90 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jul 17 11:03:13 2025 +0800 Pipe: Report progress index for filter-outed tsfile events in PipeHistoricalDataRegionTsFileExtractor --- .../PipeHistoricalDataRegionTsFileExtractor.java | 41 ++++++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 25759654a5b..4b6b11efed0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; +import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter; @@ -57,6 +58,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -124,6 +126,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private volatile boolean hasBeenStarted = false; private Queue<TsFileResource> pendingQueue; + private final Set<TsFileResource> filteredTsFileResources = new HashSet<>(); @Override public void validate(final PipeParameterValidator validator) { @@ -364,7 +367,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa try { final int originalSequenceTsFileCount = tsFileManager.size(true); final int originalUnSequenceTsFileCount = tsFileManager.size(false); - final List<TsFileResource> resourceList = + final List<TsFileResource> originalResourceList = new ArrayList<>(originalSequenceTsFileCount + originalUnSequenceTsFileCount); LOGGER.info( "Pipe {}@{}: start to extract historical TsFile, original sequence file count {}, " @@ -377,6 +380,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa final Collection<TsFileResource> sequenceTsFileResources = tsFileManager.getTsFileList(true).stream() + .peek(originalResourceList::add) .filter( resource -> // Some resource is marked as deleted but not removed from the list. @@ -397,10 +401,11 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) && mayTsFileResourceOverlappedWithPattern(resource))) .collect(Collectors.toList()); - resourceList.addAll(sequenceTsFileResources); + filteredTsFileResources.addAll(sequenceTsFileResources); final Collection<TsFileResource> unSequenceTsFileResources = tsFileManager.getTsFileList(false).stream() + .peek(originalResourceList::add) .filter( resource -> // Some resource is marked as deleted but not removed from the list. @@ -421,9 +426,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) && mayTsFileResourceOverlappedWithPattern(resource))) .collect(Collectors.toList()); - resourceList.addAll(unSequenceTsFileResources); + filteredTsFileResources.addAll(unSequenceTsFileResources); - resourceList.removeIf( + filteredTsFileResources.removeIf( resource -> { // Pin the resource, in case the file is removed by compaction or anything. // Will unpin it after the PipeTsFileInsertionEvent is created and pinned. @@ -437,12 +442,12 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa } }); - resourceList.sort( + originalResourceList.sort( (o1, o2) -> startIndex instanceof TimeWindowStateProgressIndex ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) : o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex())); - pendingQueue = new ArrayDeque<>(resourceList); + pendingQueue = new ArrayDeque<>(originalResourceList); LOGGER.info( "Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, " @@ -453,7 +458,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa originalSequenceTsFileCount, unSequenceTsFileResources.size(), originalUnSequenceTsFileCount, - resourceList.size(), + filteredTsFileResources.size(), originalSequenceTsFileCount + originalUnSequenceTsFileCount, System.currentTimeMillis() - startHistoricalExtractionTime); } finally { @@ -567,6 +572,28 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa return terminateEvent; } + if (!filteredTsFileResources.contains(resource)) { + final ProgressReportEvent progressReportEvent = + new ProgressReportEvent( + pipeName, + creationTime, + pipeTaskMeta, + pipePattern, + historicalDataExtractionStartTime, + historicalDataExtractionEndTime); + progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex()); + final boolean isReferenceCountIncreased = + progressReportEvent.increaseReferenceCount( + PipeHistoricalDataRegionTsFileExtractor.class.getName()); + if (!isReferenceCountIncreased) { + LOGGER.warn( + "The reference count of the event {} cannot be increased, skipping it.", + progressReportEvent); + } + return isReferenceCountIncreased ? progressReportEvent : null; + } + + filteredTsFileResources.remove(resource); final PipeTsFileInsertionEvent event = new PipeTsFileInsertionEvent( resource,
