This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b894648ec22 [To dev/1.3] Pipe: Report progress index for filter-outed
insertion events in realtime extractors (#15975) (#16003)
b894648ec22 is described below
commit b894648ec2284fa992b5f797d1e2289466751f15
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jul 23 14:40:37 2025 +0800
[To dev/1.3] Pipe: Report progress index for filter-outed insertion events
in realtime extractors (#15975) (#16003)
* Pipe: Report progress index for filter-outed insertion events in realtime
extractors (#15975)
* fix
* fix
(cherry picked from commit 7214e1f3677b760f0a932343ee38ab460b292b7f)
* Update PipeDataRegionAssigner.java
---
.../realtime/assigner/PipeDataRegionAssigner.java | 41 ++++++++++++++++++++--
.../matcher/CachedSchemaPatternMatcher.java | 28 +++++++++++----
.../realtime/matcher/PipeDataRegionMatcher.java | 9 +++--
.../pattern/CachedSchemaPatternMatcherTest.java | 11 +++---
4 files changed, 72 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 0b83876b186..73f90805e55 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -31,11 +31,15 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSche
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.Set;
public class PipeDataRegionAssigner implements Closeable {
@@ -107,8 +111,11 @@ public class PipeDataRegionAssigner implements Closeable {
return;
}
- matcher
- .match(event)
+ final Pair<Set<PipeRealtimeDataRegionExtractor>,
Set<PipeRealtimeDataRegionExtractor>>
+ matchedAndUnmatched = matcher.match(event);
+
+ matchedAndUnmatched
+ .getLeft()
.forEach(
extractor -> {
if (disruptor.isClosed()) {
@@ -159,6 +166,36 @@ public class PipeDataRegionAssigner implements Closeable {
}
extractor.extract(copiedEvent);
});
+
+ matchedAndUnmatched
+ .getRight()
+ .forEach(
+ extractor -> {
+ if (disruptor.isClosed()) {
+ return;
+ }
+
+ final EnrichedEvent innerEvent = event.getEvent();
+ if (innerEvent instanceof TabletInsertionEvent
+ || innerEvent instanceof TsFileInsertionEvent) {
+ final ProgressReportEvent reportEvent =
+ new ProgressReportEvent(
+ extractor.getPipeName(),
+ extractor.getCreationTime(),
+ extractor.getPipeTaskMeta(),
+ extractor.getPipePattern(),
+ extractor.getRealtimeDataExtractionStartTime(),
+ extractor.getRealtimeDataExtractionEndTime());
+ reportEvent.bindProgressIndex(event.getProgressIndex());
+ if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
+ LOGGER.warn(
+ "The reference count of the event {} cannot be
increased, skipping it.",
+ reportEvent);
+ return;
+ }
+
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+ }
+ });
}
public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 015ece38f8b..11326b90eff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -28,16 +28,17 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
@@ -93,25 +94,27 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
}
@Override
- public Set<PipeRealtimeDataRegionExtractor> match(final PipeRealtimeEvent
event) {
+ public Pair<Set<PipeRealtimeDataRegionExtractor>,
Set<PipeRealtimeDataRegionExtractor>> match(
+ final PipeRealtimeEvent event) {
final Set<PipeRealtimeDataRegionExtractor> matchedExtractors = new
HashSet<>();
lock.readLock().lock();
try {
if (extractors.isEmpty()) {
- return matchedExtractors;
+ return new Pair<>(matchedExtractors, extractors);
}
// HeartbeatEvent will be assigned to all extractors
if (event.getEvent() instanceof PipeHeartbeatEvent) {
- return extractors;
+ return new Pair<>(extractors, Collections.EMPTY_SET);
}
// Deletion event will be assigned to extractors listened to it
if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) {
- return extractors.stream()
+ extractors.stream()
.filter(PipeRealtimeDataRegionExtractor::shouldExtractDeletion)
- .collect(Collectors.toSet());
+ .forEach(matchedExtractors::add);
+ return new Pair<>(matchedExtractors,
findUnmatchedExtractors(matchedExtractors));
}
for (final Map.Entry<String, String[]> entry :
event.getSchemaInfo().entrySet()) {
@@ -169,11 +172,22 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
break;
}
}
+
+ return new Pair<>(matchedExtractors,
findUnmatchedExtractors(matchedExtractors));
} finally {
lock.readLock().unlock();
}
+ }
- return matchedExtractors;
+ private Set<PipeRealtimeDataRegionExtractor> findUnmatchedExtractors(
+ final Set<PipeRealtimeDataRegionExtractor> matchedExtractors) {
+ final Set<PipeRealtimeDataRegionExtractor> unmatchedExtractors = new
HashSet<>();
+ for (final PipeRealtimeDataRegionExtractor extractor : extractors) {
+ if (!matchedExtractors.contains(extractor)) {
+ unmatchedExtractors.add(extractor);
+ }
+ }
+ return unmatchedExtractors;
}
protected Set<PipeRealtimeDataRegionExtractor>
filterExtractorsByDevice(final String device) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
index 4e102a1f7cf..862ff680ec4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
@@ -22,6 +22,8 @@ package
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
+import org.apache.tsfile.utils.Pair;
+
import java.util.Set;
public interface PipeDataRegionMatcher {
@@ -40,12 +42,13 @@ public interface PipeDataRegionMatcher {
/**
* Match the event's schema info with the registered extractors' patterns.
If the event's schema
- * info matches the pattern of a extractor, the extractor will be returned.
+ * info matches the pattern of an extractor, the extractor will be returned.
*
* @param event the event to be matched
- * @return the matched extractors
+ * @return pair of matched extractors and unmatched extractors.
*/
- Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event);
+ Pair<Set<PipeRealtimeDataRegionExtractor>,
Set<PipeRealtimeDataRegionExtractor>> match(
+ PipeRealtimeEvent event);
/** Clear all the registered extractors and internal data structures. */
void clear();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 2e3e57cd49e..2b5babcaa73 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -153,13 +153,14 @@ public class CachedSchemaPatternMatcherTest {
MockedPipeRealtimeEvent event =
new MockedPipeRealtimeEvent(
null, null, Collections.singletonMap("root." + i,
measurements), null);
- long startTime = System.currentTimeMillis();
- matcher.match(event).forEach(extractor -> extractor.extract(event));
+ final long startTime = System.currentTimeMillis();
+ matcher.match(event).getLeft().forEach(extractor ->
extractor.extract(event));
totalTime += (System.currentTimeMillis() - startTime);
}
- MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, null,
deviceMap, null);
- long startTime = System.currentTimeMillis();
- matcher.match(event).forEach(extractor -> extractor.extract(event));
+ final MockedPipeRealtimeEvent event =
+ new MockedPipeRealtimeEvent(null, null, deviceMap, null);
+ final long startTime = System.currentTimeMillis();
+ matcher.match(event).getLeft().forEach(extractor ->
extractor.extract(event));
totalTime += (System.currentTimeMillis() - startTime);
}
System.out.println("matcher.getRegisterCount() = " +
matcher.getRegisterCount());