This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch cp-15975 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55ca02b89ae18cb3a227887398438be24168946f Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Jul 22 18:08:54 2025 +0800 Pipe: Report progress index for filter-outed insertion events in realtime extractors (#15975) * fix * fix (cherry picked from commit 7214e1f3677b760f0a932343ee38ab460b292b7f) --- .../realtime/assigner/PipeDataRegionAssigner.java | 44 +++++++++++++++++++++- .../matcher/CachedSchemaPatternMatcher.java | 28 ++++++++++---- .../realtime/matcher/PipeDataRegionMatcher.java | 9 +++-- .../pattern/CachedSchemaPatternMatcherTest.java | 11 +++--- 4 files changed, 75 insertions(+), 17 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 0b83876b186..166b3610602 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -31,11 +31,15 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSche import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher; import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.Set; public class PipeDataRegionAssigner implements Closeable { @@ -107,8 +111,11 @@ public class PipeDataRegionAssigner implements Closeable { return; } - matcher - .match(event) + final Pair<Set<PipeRealtimeDataRegionExtractor>, Set<PipeRealtimeDataRegionExtractor>> + matchedAndUnmatched = matcher.match(event); + + matchedAndUnmatched + .getLeft() .forEach( extractor -> { if (disruptor.isClosed()) { @@ -159,6 +166,39 @@ public class PipeDataRegionAssigner implements Closeable { } extractor.extract(copiedEvent); }); + + matchedAndUnmatched + .getRight() + .forEach( + extractor -> { + if (disruptor.isClosed()) { + return; + } + + final EnrichedEvent innerEvent = event.getEvent(); + if (innerEvent instanceof TabletInsertionEvent + || innerEvent instanceof TsFileInsertionEvent) { + final ProgressReportEvent reportEvent = + new ProgressReportEvent( + extractor.getPipeName(), + extractor.getCreationTime(), + extractor.getPipeTaskMeta(), + extractor.getTreePattern(), + extractor.getTablePattern(), + extractor.getUserName(), + extractor.isSkipIfNoPrivileges(), + extractor.getRealtimeDataExtractionStartTime(), + extractor.getRealtimeDataExtractionEndTime()); + reportEvent.bindProgressIndex(event.getProgressIndex()); + if (!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { + LOGGER.warn( + "The reference count of the event {} cannot be increased, skipping it.", + reportEvent); + return; + } + extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); + } + }); } public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java index 015ece38f8b..11326b90eff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java @@ -28,16 +28,17 @@ import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { @@ -93,25 +94,27 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { } @Override - public Set<PipeRealtimeDataRegionExtractor> match(final PipeRealtimeEvent event) { + public Pair<Set<PipeRealtimeDataRegionExtractor>, Set<PipeRealtimeDataRegionExtractor>> match( + final PipeRealtimeEvent event) { final Set<PipeRealtimeDataRegionExtractor> matchedExtractors = new HashSet<>(); lock.readLock().lock(); try { if (extractors.isEmpty()) { - return matchedExtractors; + return new Pair<>(matchedExtractors, extractors); } // HeartbeatEvent will be assigned to all extractors if (event.getEvent() instanceof PipeHeartbeatEvent) { - return extractors; + return new Pair<>(extractors, Collections.EMPTY_SET); } // Deletion event will be assigned to extractors listened to it if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) { - return extractors.stream() + extractors.stream() .filter(PipeRealtimeDataRegionExtractor::shouldExtractDeletion) - .collect(Collectors.toSet()); + .forEach(matchedExtractors::add); + return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors)); } for (final Map.Entry<String, String[]> entry : event.getSchemaInfo().entrySet()) { @@ -169,11 +172,22 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { break; } } + + return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors)); } finally { lock.readLock().unlock(); } + } - return matchedExtractors; + private Set<PipeRealtimeDataRegionExtractor> findUnmatchedExtractors( + final Set<PipeRealtimeDataRegionExtractor> matchedExtractors) { + final Set<PipeRealtimeDataRegionExtractor> unmatchedExtractors = new HashSet<>(); + for (final PipeRealtimeDataRegionExtractor extractor : extractors) { + if (!matchedExtractors.contains(extractor)) { + unmatchedExtractors.add(extractor); + } + } + return unmatchedExtractors; } protected Set<PipeRealtimeDataRegionExtractor> filterExtractorsByDevice(final String device) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java index 4e102a1f7cf..862ff680ec4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java @@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.tsfile.utils.Pair; + import java.util.Set; public interface PipeDataRegionMatcher { @@ -40,12 +42,13 @@ public interface PipeDataRegionMatcher { /** * Match the event's schema info with the registered extractors' patterns. If the event's schema - * info matches the pattern of a extractor, the extractor will be returned. + * info matches the pattern of an extractor, the extractor will be returned. * * @param event the event to be matched - * @return the matched extractors + * @return pair of matched extractors and unmatched extractors. */ - Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event); + Pair<Set<PipeRealtimeDataRegionExtractor>, Set<PipeRealtimeDataRegionExtractor>> match( + PipeRealtimeEvent event); /** Clear all the registered extractors and internal data structures. */ void clear(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java index 2e3e57cd49e..2b5babcaa73 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java @@ -153,13 +153,14 @@ public class CachedSchemaPatternMatcherTest { MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent( null, null, Collections.singletonMap("root." + i, measurements), null); - long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(extractor -> extractor.extract(event)); + final long startTime = System.currentTimeMillis(); + matcher.match(event).getLeft().forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } - MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, null, deviceMap, null); - long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(extractor -> extractor.extract(event)); + final MockedPipeRealtimeEvent event = + new MockedPipeRealtimeEvent(null, null, deviceMap, null); + final long startTime = System.currentTimeMillis(); + matcher.match(event).getLeft().forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } System.out.println("matcher.getRegisterCount() = " + matcher.getRegisterCount());
