This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bac1afc63556595369177e8e5e8f5e42000bdc99 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Jul 22 18:08:54 2025 +0800 Pipe: Report progress index for filter-outed insertion events in realtime extractors (#15975) * fix * fix (cherry picked from commit 7214e1f3677b760f0a932343ee38ab460b292b7f) --- .../realtime/assigner/PipeDataRegionAssigner.java | 44 +++++++++++++++++++++- .../matcher/CachedSchemaPatternMatcher.java | 27 +++++++++---- .../realtime/matcher/PipeDataRegionMatcher.java | 9 +++-- .../pattern/CachedSchemaPatternMatcherTest.java | 4 +- 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 366a17bdd79..c0b063708a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -42,12 +42,16 @@ import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.Objects; +import java.util.Set; public class PipeDataRegionAssigner implements Closeable { @@ -130,8 +134,11 @@ public class PipeDataRegionAssigner implements Closeable { return; } - matcher - .match(event) + final Pair<Set<PipeRealtimeDataRegionExtractor>, Set<PipeRealtimeDataRegionExtractor>> + matchedAndUnmatched = matcher.match(event); + + matchedAndUnmatched + .getLeft() .forEach( extractor -> { if (disruptor.isClosed()) { @@ -216,6 +223,39 @@ public class PipeDataRegionAssigner implements Closeable { } extractor.extract(copiedEvent); }); + + matchedAndUnmatched + .getRight() + .forEach( + extractor -> { + if (disruptor.isClosed()) { + return; + } + + final EnrichedEvent innerEvent = event.getEvent(); + if (innerEvent instanceof TabletInsertionEvent + || innerEvent instanceof TsFileInsertionEvent) { + final ProgressReportEvent reportEvent = + new ProgressReportEvent( + extractor.getPipeName(), + extractor.getCreationTime(), + extractor.getPipeTaskMeta(), + extractor.getTreePattern(), + extractor.getTablePattern(), + extractor.getUserName(), + extractor.isSkipIfNoPrivileges(), + extractor.getRealtimeDataExtractionStartTime(), + extractor.getRealtimeDataExtractionEndTime()); + reportEvent.bindProgressIndex(event.getProgressIndex()); + if (!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { + LOGGER.warn( + "The reference count of the event {} cannot be increased, skipping it.", + reportEvent); + return; + } + extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); + } + }); } public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java index 5699850c5bf..e22cf7045b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java @@ -39,13 +39,13 @@ import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; @@ -126,26 +126,28 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { } @Override - public Set<PipeRealtimeDataRegionExtractor> match(final PipeRealtimeEvent event) { + public Pair<Set<PipeRealtimeDataRegionExtractor>, Set<PipeRealtimeDataRegionExtractor>> match( + final PipeRealtimeEvent event) { final Set<PipeRealtimeDataRegionExtractor> matchedExtractors = new HashSet<>(); lock.readLock().lock(); try { if (extractors.isEmpty()) { - return matchedExtractors; + return new Pair<>(matchedExtractors, extractors); } // HeartbeatEvent will be assigned to all extractors if (event.getEvent() instanceof PipeHeartbeatEvent) { - return extractors; + return new Pair<>(extractors, Collections.EMPTY_SET); } // TODO: consider table pattern? // Deletion event will be assigned to extractors listened to it if (event.getEvent() instanceof PipeDeleteDataNodeEvent) { - return extractors.stream() + extractors.stream() .filter(PipeRealtimeDataRegionExtractor::shouldExtractDeletion) - .collect(Collectors.toSet()); + .forEach(matchedExtractors::add); + return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors)); } for (final Map.Entry<IDeviceID, String[]> entry : event.getSchemaInfo().entrySet()) { @@ -171,11 +173,22 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { break; } } + + return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors)); } finally { lock.readLock().unlock(); } + } - return matchedExtractors; + private Set<PipeRealtimeDataRegionExtractor> findUnmatchedExtractors( + final Set<PipeRealtimeDataRegionExtractor> matchedExtractors) { + final Set<PipeRealtimeDataRegionExtractor> unmatchedExtractors = new HashSet<>(); + for (final PipeRealtimeDataRegionExtractor extractor : extractors) { + if (!matchedExtractors.contains(extractor)) { + unmatchedExtractors.add(extractor); + } + } + return unmatchedExtractors; } protected void matchTreeModelEvent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java index 08df1b9f432..b6c9545c729 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java @@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.tsfile.utils.Pair; + import java.util.Set; public interface PipeDataRegionMatcher { @@ -43,12 +45,13 @@ public interface PipeDataRegionMatcher { /** * Match the event's schema info with the registered extractors' patterns. If the event's schema - * info matches the pattern of a extractor, the extractor will be returned. + * info matches the pattern of an extractor, the extractor will be returned. * * @param event the event to be matched - * @return the matched extractors + * @return pair of matched extractors and unmatched extractors. */ - Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event); + Pair<Set<PipeRealtimeDataRegionExtractor>, Set<PipeRealtimeDataRegionExtractor>> match( + PipeRealtimeEvent event); /** Clear all the registered extractors and internal data structures. */ void clear(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java index 61f91fea8a3..37c57a3e8ef 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java @@ -165,12 +165,12 @@ public class CachedSchemaPatternMatcherTest { null, Collections.singletonMap(new StringArrayDeviceID("root.db" + i), measurements)); final long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(extractor -> extractor.extract(event)); + matcher.match(event).getLeft().forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } final MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, null, deviceMap); final long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(extractor -> extractor.extract(event)); + matcher.match(event).getLeft().forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } System.out.println("matcher.getRegisterCount() = " + matcher.getRegisterCount());
