This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bac1afc63556595369177e8e5e8f5e42000bdc99
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jul 22 18:08:54 2025 +0800

    Pipe: Report progress index for filter-outed insertion events in realtime 
extractors (#15975)
    
    * fix
    
    * fix
    
    (cherry picked from commit 7214e1f3677b760f0a932343ee38ab460b292b7f)
---
 .../realtime/assigner/PipeDataRegionAssigner.java  | 44 +++++++++++++++++++++-
 .../matcher/CachedSchemaPatternMatcher.java        | 27 +++++++++----
 .../realtime/matcher/PipeDataRegionMatcher.java    |  9 +++--
 .../pattern/CachedSchemaPatternMatcherTest.java    |  4 +-
 4 files changed, 70 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 366a17bdd79..c0b063708a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -42,12 +42,16 @@ import 
org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.Objects;
+import java.util.Set;
 
 public class PipeDataRegionAssigner implements Closeable {
 
@@ -130,8 +134,11 @@ public class PipeDataRegionAssigner implements Closeable {
       return;
     }
 
-    matcher
-        .match(event)
+    final Pair<Set<PipeRealtimeDataRegionExtractor>, 
Set<PipeRealtimeDataRegionExtractor>>
+        matchedAndUnmatched = matcher.match(event);
+
+    matchedAndUnmatched
+        .getLeft()
         .forEach(
             extractor -> {
               if (disruptor.isClosed()) {
@@ -216,6 +223,39 @@ public class PipeDataRegionAssigner implements Closeable {
               }
               extractor.extract(copiedEvent);
             });
+
+    matchedAndUnmatched
+        .getRight()
+        .forEach(
+            extractor -> {
+              if (disruptor.isClosed()) {
+                return;
+              }
+
+              final EnrichedEvent innerEvent = event.getEvent();
+              if (innerEvent instanceof TabletInsertionEvent
+                  || innerEvent instanceof TsFileInsertionEvent) {
+                final ProgressReportEvent reportEvent =
+                    new ProgressReportEvent(
+                        extractor.getPipeName(),
+                        extractor.getCreationTime(),
+                        extractor.getPipeTaskMeta(),
+                        extractor.getTreePattern(),
+                        extractor.getTablePattern(),
+                        extractor.getUserName(),
+                        extractor.isSkipIfNoPrivileges(),
+                        extractor.getRealtimeDataExtractionStartTime(),
+                        extractor.getRealtimeDataExtractionEndTime());
+                reportEvent.bindProgressIndex(event.getProgressIndex());
+                if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
+                  LOGGER.warn(
+                      "The reference count of the event {} cannot be 
increased, skipping it.",
+                      reportEvent);
+                  return;
+                }
+                
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+              }
+            });
   }
 
   public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 5699850c5bf..e22cf7045b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -39,13 +39,13 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
 import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
@@ -126,26 +126,28 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   }
 
   @Override
-  public Set<PipeRealtimeDataRegionExtractor> match(final PipeRealtimeEvent 
event) {
+  public Pair<Set<PipeRealtimeDataRegionExtractor>, 
Set<PipeRealtimeDataRegionExtractor>> match(
+      final PipeRealtimeEvent event) {
     final Set<PipeRealtimeDataRegionExtractor> matchedExtractors = new 
HashSet<>();
 
     lock.readLock().lock();
     try {
       if (extractors.isEmpty()) {
-        return matchedExtractors;
+        return new Pair<>(matchedExtractors, extractors);
       }
 
       // HeartbeatEvent will be assigned to all extractors
       if (event.getEvent() instanceof PipeHeartbeatEvent) {
-        return extractors;
+        return new Pair<>(extractors, Collections.EMPTY_SET);
       }
 
       // TODO: consider table pattern?
       // Deletion event will be assigned to extractors listened to it
       if (event.getEvent() instanceof PipeDeleteDataNodeEvent) {
-        return extractors.stream()
+        extractors.stream()
             .filter(PipeRealtimeDataRegionExtractor::shouldExtractDeletion)
-            .collect(Collectors.toSet());
+            .forEach(matchedExtractors::add);
+        return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
       }
 
       for (final Map.Entry<IDeviceID, String[]> entry : 
event.getSchemaInfo().entrySet()) {
@@ -171,11 +173,22 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
           break;
         }
       }
+
+      return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
     } finally {
       lock.readLock().unlock();
     }
+  }
 
-    return matchedExtractors;
+  private Set<PipeRealtimeDataRegionExtractor> findUnmatchedExtractors(
+      final Set<PipeRealtimeDataRegionExtractor> matchedExtractors) {
+    final Set<PipeRealtimeDataRegionExtractor> unmatchedExtractors = new 
HashSet<>();
+    for (final PipeRealtimeDataRegionExtractor extractor : extractors) {
+      if (!matchedExtractors.contains(extractor)) {
+        unmatchedExtractors.add(extractor);
+      }
+    }
+    return unmatchedExtractors;
   }
 
   protected void matchTreeModelEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
index 08df1b9f432..b6c9545c729 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
@@ -22,6 +22,8 @@ package 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
 
+import org.apache.tsfile.utils.Pair;
+
 import java.util.Set;
 
 public interface PipeDataRegionMatcher {
@@ -43,12 +45,13 @@ public interface PipeDataRegionMatcher {
 
   /**
    * Match the event's schema info with the registered extractors' patterns. 
If the event's schema
-   * info matches the pattern of a extractor, the extractor will be returned.
+   * info matches the pattern of an extractor, the extractor will be returned.
    *
    * @param event the event to be matched
-   * @return the matched extractors
+   * @return pair of matched extractors and unmatched extractors.
    */
-  Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event);
+  Pair<Set<PipeRealtimeDataRegionExtractor>, 
Set<PipeRealtimeDataRegionExtractor>> match(
+      PipeRealtimeEvent event);
 
   /** Clear all the registered extractors and internal data structures. */
   void clear();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 61f91fea8a3..37c57a3e8ef 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -165,12 +165,12 @@ public class CachedSchemaPatternMatcherTest {
                 null,
                 Collections.singletonMap(new StringArrayDeviceID("root.db" + 
i), measurements));
         final long startTime = System.currentTimeMillis();
-        matcher.match(event).forEach(extractor -> extractor.extract(event));
+        matcher.match(event).getLeft().forEach(extractor -> 
extractor.extract(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
       final MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, 
null, deviceMap);
       final long startTime = System.currentTimeMillis();
-      matcher.match(event).forEach(extractor -> extractor.extract(event));
+      matcher.match(event).getLeft().forEach(extractor -> 
extractor.extract(event));
       totalTime += (System.currentTimeMillis() - startTime);
     }
     System.out.println("matcher.getRegisterCount() = " + 
matcher.getRegisterCount());

Reply via email to