This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 5bd1503be71 Pipe: Report progress index for filter-outed tsfile events
in PipeHistoricalDataRegionTsFileExtractor (#15963)
5bd1503be71 is described below
commit 5bd1503be715f0f7945641666d612b0cc25a38db
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jul 17 12:30:41 2025 +0800
Pipe: Report progress index for filter-outed tsfile events in
PipeHistoricalDataRegionTsFileExtractor (#15963)
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 41 ++++++++++++++++++----
1 file changed, 34 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 25759654a5b..4b6b11efed0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
@@ -57,6 +58,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -124,6 +126,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private volatile boolean hasBeenStarted = false;
private Queue<TsFileResource> pendingQueue;
+ private final Set<TsFileResource> filteredTsFileResources = new HashSet<>();
@Override
public void validate(final PipeParameterValidator validator) {
@@ -364,7 +367,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
try {
final int originalSequenceTsFileCount = tsFileManager.size(true);
final int originalUnSequenceTsFileCount = tsFileManager.size(false);
- final List<TsFileResource> resourceList =
+ final List<TsFileResource> originalResourceList =
new ArrayList<>(originalSequenceTsFileCount +
originalUnSequenceTsFileCount);
LOGGER.info(
"Pipe {}@{}: start to extract historical TsFile, original sequence
file count {}, "
@@ -377,6 +380,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final Collection<TsFileResource> sequenceTsFileResources =
tsFileManager.getTsFileList(true).stream()
+ .peek(originalResourceList::add)
.filter(
resource ->
// Some resource is marked as deleted but not removed
from the list.
@@ -397,10 +401,11 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&&
mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
- resourceList.addAll(sequenceTsFileResources);
+ filteredTsFileResources.addAll(sequenceTsFileResources);
final Collection<TsFileResource> unSequenceTsFileResources =
tsFileManager.getTsFileList(false).stream()
+ .peek(originalResourceList::add)
.filter(
resource ->
// Some resource is marked as deleted but not removed
from the list.
@@ -421,9 +426,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&&
mayTsFileResourceOverlappedWithPattern(resource)))
.collect(Collectors.toList());
- resourceList.addAll(unSequenceTsFileResources);
+ filteredTsFileResources.addAll(unSequenceTsFileResources);
- resourceList.removeIf(
+ filteredTsFileResources.removeIf(
resource -> {
// Pin the resource, in case the file is removed by compaction
or anything.
// Will unpin it after the PipeTsFileInsertionEvent is created
and pinned.
@@ -437,12 +442,12 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
});
- resourceList.sort(
+ originalResourceList.sort(
(o1, o2) ->
startIndex instanceof TimeWindowStateProgressIndex
? Long.compare(o1.getFileStartTime(),
o2.getFileStartTime())
:
o1.getMaxProgressIndex().topologicalCompareTo(o2.getMaxProgressIndex()));
- pendingQueue = new ArrayDeque<>(resourceList);
+ pendingQueue = new ArrayDeque<>(originalResourceList);
LOGGER.info(
"Pipe {}@{}: finish to extract historical TsFile, extracted
sequence file count {}/{}, "
@@ -453,7 +458,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
originalSequenceTsFileCount,
unSequenceTsFileResources.size(),
originalUnSequenceTsFileCount,
- resourceList.size(),
+ filteredTsFileResources.size(),
originalSequenceTsFileCount + originalUnSequenceTsFileCount,
System.currentTimeMillis() - startHistoricalExtractionTime);
} finally {
@@ -567,6 +572,28 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
return terminateEvent;
}
+ if (!filteredTsFileResources.contains(resource)) {
+ final ProgressReportEvent progressReportEvent =
+ new ProgressReportEvent(
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ pipePattern,
+ historicalDataExtractionStartTime,
+ historicalDataExtractionEndTime);
+ progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
+ final boolean isReferenceCountIncreased =
+ progressReportEvent.increaseReferenceCount(
+ PipeHistoricalDataRegionTsFileExtractor.class.getName());
+ if (!isReferenceCountIncreased) {
+ LOGGER.warn(
+ "The reference count of the event {} cannot be increased, skipping
it.",
+ progressReportEvent);
+ }
+ return isReferenceCountIncreased ? progressReportEvent : null;
+ }
+
+ filteredTsFileResources.remove(resource);
final PipeTsFileInsertionEvent event =
new PipeTsFileInsertionEvent(
resource,