This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-report-realtime-filtered-evemts
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 687e88d3992f9070cbce7ff9ee28b2f50227e9ad
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jul 18 12:33:33 2025 +0800

    fix
---
 .../realtime/assigner/PipeDataRegionAssigner.java  | 43 +++++++++++++++++++++-
 .../matcher/CachedSchemaPatternMatcher.java        | 27 ++++++++++----
 .../realtime/matcher/PipeDataRegionMatcher.java    |  4 +-
 3 files changed, 63 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 6b92f4bccdc..7c5cfc0e37c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -45,12 +45,16 @@ import 
org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeDataRegionAssigner implements Closeable {
@@ -139,8 +143,10 @@ public class PipeDataRegionAssigner implements Closeable {
       return;
     }
 
-    matcher
-        .match(event)
+    final Pair<Set<PipeRealtimeDataRegionExtractor>, 
Set<PipeRealtimeDataRegionExtractor>>
+        matchedAndUnmatched = matcher.match(event);
+
+    matchedAndUnmatched
         .getLeft()
         .forEach(
             extractor -> {
@@ -226,6 +232,39 @@ public class PipeDataRegionAssigner implements Closeable {
               }
               extractor.extract(copiedEvent);
             });
+
+    matchedAndUnmatched
+        .getRight()
+        .forEach(
+            extractor -> {
+              if (disruptor.isClosed()) {
+                return;
+              }
+
+              final EnrichedEvent innerEvent = event.getEvent();
+              if (innerEvent instanceof TabletInsertionEvent
+                  || innerEvent instanceof TsFileInsertionEvent) {
+                final ProgressReportEvent reportEvent =
+                    new ProgressReportEvent(
+                        extractor.getPipeName(),
+                        extractor.getCreationTime(),
+                        extractor.getPipeTaskMeta(),
+                        extractor.getTreePattern(),
+                        extractor.getTablePattern(),
+                        extractor.getUserName(),
+                        extractor.isSkipIfNoPrivileges(),
+                        extractor.getRealtimeDataExtractionStartTime(),
+                        extractor.getRealtimeDataExtractionEndTime());
+                reportEvent.bindProgressIndex(event.getProgressIndex());
+                if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
+                  LOGGER.warn(
+                      "The reference count of the event {} cannot be 
increased, skipping it.",
+                      reportEvent);
+                  return;
+                }
+                
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
+              }
+            });
   }
 
   public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 5699850c5bf..e22cf7045b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -39,13 +39,13 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
 import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
@@ -126,26 +126,28 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   }
 
   @Override
-  public Set<PipeRealtimeDataRegionExtractor> match(final PipeRealtimeEvent 
event) {
+  public Pair<Set<PipeRealtimeDataRegionExtractor>, 
Set<PipeRealtimeDataRegionExtractor>> match(
+      final PipeRealtimeEvent event) {
     final Set<PipeRealtimeDataRegionExtractor> matchedExtractors = new 
HashSet<>();
 
     lock.readLock().lock();
     try {
       if (extractors.isEmpty()) {
-        return matchedExtractors;
+        return new Pair<>(matchedExtractors, extractors);
       }
 
       // HeartbeatEvent will be assigned to all extractors
       if (event.getEvent() instanceof PipeHeartbeatEvent) {
-        return extractors;
+        return new Pair<>(extractors, Collections.EMPTY_SET);
       }
 
       // TODO: consider table pattern?
       // Deletion event will be assigned to extractors listened to it
       if (event.getEvent() instanceof PipeDeleteDataNodeEvent) {
-        return extractors.stream()
+        extractors.stream()
             .filter(PipeRealtimeDataRegionExtractor::shouldExtractDeletion)
-            .collect(Collectors.toSet());
+            .forEach(matchedExtractors::add);
+        return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
       }
 
       for (final Map.Entry<IDeviceID, String[]> entry : 
event.getSchemaInfo().entrySet()) {
@@ -171,11 +173,22 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
           break;
         }
       }
+
+      return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
     } finally {
       lock.readLock().unlock();
     }
+  }
 
-    return matchedExtractors;
+  private Set<PipeRealtimeDataRegionExtractor> findUnmatchedExtractors(
+      final Set<PipeRealtimeDataRegionExtractor> matchedExtractors) {
+    final Set<PipeRealtimeDataRegionExtractor> unmatchedExtractors = new 
HashSet<>();
+    for (final PipeRealtimeDataRegionExtractor extractor : extractors) {
+      if (!matchedExtractors.contains(extractor)) {
+        unmatchedExtractors.add(extractor);
+      }
+    }
+    return unmatchedExtractors;
   }
 
   protected void matchTreeModelEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
index 6efb3681cc9..b6c9545c729 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
@@ -45,10 +45,10 @@ public interface PipeDataRegionMatcher {
 
   /**
    * Match the event's schema info with the registered extractors' patterns. 
If the event's schema
-   * info matches the pattern of a extractor, the extractor will be returned.
+   * info matches the pattern of an extractor, the extractor will be returned.
    *
    * @param event the event to be matched
-   * @return the matched extractors
+   * @return pair of matched extractors and unmatched extractors.
    */
   Pair<Set<PipeRealtimeDataRegionExtractor>, 
Set<PipeRealtimeDataRegionExtractor>> match(
       PipeRealtimeEvent event);

Reply via email to