This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 69fb8a78d84 [To dev/1.3] Deleted the useless device flush map & Pipe: 
Made the assigner cache to full cache & Use precise match in assigner device & 
Print periodical log when assigner has exceeded half of the capacity (#17313)
69fb8a78d84 is described below

commit 69fb8a78d84f42b330048b8bcecd8fd1cdd45755
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 19 11:50:17 2026 +0800

    [To dev/1.3] Deleted the useless device flush map & Pipe: Made the assigner 
cache to full cache & Use precise match in assigner device & Print periodical 
log when assigner has exceeded half of the capacity (#17313)
    
    * [To dev/1.3] Deleted the useless device flush map & Pipe: Made the 
assigner cache to full cache & Use precise match in assigner device & Print 
periodical log when assigner has exceeded half of the capacity
    
    * fix
    
    * merge-problem
    
    * false-po
    
    * compile
    
    * key-fix
---
 .../realtime/assigner/DisruptorQueue.java          | 26 ++++++++++++-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  6 ++-
 .../matcher/CachedSchemaPatternMatcher.java        | 22 ++++-------
 .../db/storageengine/dataregion/DataRegion.java    |  7 ----
 .../dataregion/HashLastFlushTimeMap.java           | 44 ----------------------
 .../dataregion/ILastFlushTimeMap.java              |  6 ---
 .../db/pipe/pattern/IoTDBPipePatternTest.java      |  5 +++
 .../datastructure/pattern/IoTDBPipePattern.java    | 12 +++++-
 .../pipe/datastructure/pattern/PipePattern.java    | 15 ++++++--
 .../datastructure/pattern/PrefixPipePattern.java   |  7 +++-
 .../pattern/UnionIoTDBPipePattern.java             |  5 +++
 .../datastructure/pattern/UnionPipePattern.java    |  5 +++
 .../pattern/WithExclusionIoTDBPipePattern.java     |  5 +++
 .../pattern/WithExclusionPipePattern.java          |  6 +++
 14 files changed, 92 insertions(+), 79 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index b2b4ea9b83d..c7add196e69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -32,6 +32,8 @@ import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.function.Consumer;
 
@@ -39,18 +41,23 @@ import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISR
 
 public class DisruptorQueue {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DisruptorQueue.class);
   private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
       new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   private final Disruptor<EventContainer> disruptor;
   private final RingBuffer<EventContainer> ringBuffer;
-
   private volatile boolean isClosed = false;
 
+  private final int dataRegionId;
+  private volatile long lastLogTime = Long.MIN_VALUE;
+
   public DisruptorQueue(
+      final int dataRegionId,
       final EventHandler<PipeRealtimeEvent> eventHandler,
       final Consumer<PipeRealtimeEvent> onAssignedHook) {
+    this.dataRegionId = dataRegionId;
     final PipeConfig config = PipeConfig.getInstance();
     final int ringBufferSize = 
config.getPipeSourceAssignerDisruptorRingBufferSize();
     final long ringBufferEntrySizeInBytes =
@@ -88,6 +95,7 @@ public class DisruptorQueue {
       ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
     }
     ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
+    mayPrintExceedingLog();
   }
 
   public void shutdown() {
@@ -101,6 +109,22 @@ public class DisruptorQueue {
     return isClosed;
   }
 
+  private void mayPrintExceedingLog() {
+    final long remainingCapacity = ringBuffer.remainingCapacity();
+    final long bufferSize = ringBuffer.getBufferSize();
+    if ((double) remainingCapacity / bufferSize >= 0.5
+        && System.currentTimeMillis()
+                - 
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()
+            >= lastLogTime) {
+      LOGGER.warn(
+          "The assigner queue content has exceeded half, it may be stuck and 
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
+          dataRegionId,
+          remainingCapacity,
+          bufferSize);
+      lastLogTime = System.currentTimeMillis();
+    }
+  }
+
   private static class EventContainer {
 
     private PipeRealtimeEvent event;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 6a4f877bf47..e3067a13126 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -64,7 +64,9 @@ public class PipeDataRegionAssigner implements Closeable {
 
   public PipeDataRegionAssigner(final String dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
-    this.disruptor = new DisruptorQueue(this::assignToExtractor, 
this::onAssignedHook);
+    this.disruptor =
+        new DisruptorQueue(
+            Integer.parseInt(dataRegionId), this::assignToSource, 
this::onAssignedHook);
     this.dataRegionId = dataRegionId;
     PipeAssignerMetrics.getInstance().register(this);
   }
@@ -105,7 +107,7 @@ public class PipeDataRegionAssigner implements Closeable {
     eventCounter.decreaseEventCount(innerEvent);
   }
 
-  private void assignToExtractor(
+  private void assignToSource(
       final PipeRealtimeEvent event, final long sequence, final boolean 
endOfBatch) {
     if (disruptor.isClosed()) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index a01b50e7e68..eae60bdbc32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -19,15 +19,12 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +34,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -47,7 +45,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   protected final ReentrantReadWriteLock lock;
 
   protected final Set<PipeRealtimeDataRegionSource> sources;
-  protected final Cache<String, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
+  protected final Map<String, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
@@ -55,10 +53,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     // iterated by {@link #assignToSource}, at the same time the sources may 
be added or
     // removed by {@link #register} and {@link #deregister}.
     this.sources = new CopyOnWriteArraySet<>();
-    this.deviceToSourcesCache =
-        Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
-            .build();
+    this.deviceToSourcesCache = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -66,7 +61,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.add(source);
-      deviceToSourcesCache.invalidateAll();
+      deviceToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -77,7 +72,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.remove(source);
-      deviceToSourcesCache.invalidateAll();
+      deviceToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -123,7 +118,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
 
         // 1. try to get matched sources from cache, if not success, match 
them by device
         final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
-            deviceToSourcesCache.get(device, this::filterSourcesByDevice);
+            deviceToSourcesCache.computeIfAbsent(device, 
this::filterSourcesByDevice);
         // this would not happen
         if (sourcesFilteredByDevice == null) {
           LOGGER.warn("Match result NPE when handle device {}", device);
@@ -200,7 +195,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
       }
 
       final PipePattern pipePattern = source.getPipePattern();
-      if (Objects.isNull(pipePattern) || 
pipePattern.mayOverlapWithDevice(device)) {
+      if (Objects.isNull(pipePattern) || 
pipePattern.overlapWithDevice(device)) {
         filteredSources.add(source);
       }
     }
@@ -213,8 +208,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.clear();
-      deviceToSourcesCache.invalidateAll();
-      deviceToSourcesCache.cleanUp();
+      deviceToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index c9aff982c4b..3cad57962e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -699,9 +699,6 @@ public class DataRegion implements IDataRegionForQuery {
     if (config.isEnableSeparateData()) {
       lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
     }
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
-    }
   }
 
   protected void upgradeAndUpdateDeviceLastFlushTime(
@@ -718,9 +715,6 @@ public class DataRegion implements IDataRegionForQuery {
     if (config.isEnableSeparateData()) {
       lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
     }
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
-    }
   }
 
   public void initCompactionSchedule() {
@@ -1678,7 +1672,6 @@ public class DataRegion implements IDataRegionForQuery {
       this.workUnsequenceTsFileProcessors.clear();
       this.tsFileManager.clear();
       lastFlushTimeMap.clearFlushedTime();
-      lastFlushTimeMap.clearGlobalFlushedTime();
       TimePartitionManager.getInstance()
           .removeTimePartitionInfo(new 
DataRegionId(Integer.parseInt(dataRegionIdString)));
     } catch (InterruptedException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 3f0abfd3e24..f29c158cc0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,19 +19,13 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
-import org.apache.iotdb.db.storageengine.StorageEngine;
-
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class HashLastFlushTimeMap implements ILastFlushTimeMap {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(HashLastFlushTimeMap.class);
-
   long LONG_SIZE = 24;
 
   long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE;
@@ -47,17 +41,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
    */
   private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new 
ConcurrentHashMap<>();
 
-  /**
-   * global mapping of device -> largest timestamp of the latest memtable to * 
be submitted to
-   * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to 
maintain global
-   * latestFlushedTime of devices and will be updated along with
-   * partitionLatestFlushedTimeForEachDevice
-   *
-   * <p>It is used to update last cache.
-   */
-  private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice =
-      new ConcurrentHashMap<>();
-
   /** record memory cost of map for each partitionId */
   private final Map<Long, Long> memCostForEachPartition = new 
ConcurrentHashMap<>();
 
@@ -137,13 +120,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
     }
   }
 
-  @Override
-  public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap) {
-    for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
-      globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), 
entry.getValue(), Math::max);
-    }
-  }
-
   @Override
   public boolean checkAndCreateFlushedTimePartition(
       long timePartitionId, boolean usingDeviceFlushTime) {
@@ -165,10 +141,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
       partitionLatestFlushedTime
           .computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
           .updateLastFlushTime(entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), 
Long.MIN_VALUE)
-          < entry.getValue()) {
-        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), 
entry.getValue());
-      }
     }
   }
 
@@ -177,27 +149,11 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
     return 
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
   }
 
-  // This method is for creating last cache entry when insert
-  @Override
-  public long getGlobalFlushedTime(IDeviceID path) {
-    // If TsFileResource is not fully recovered, we should return 
Long.MAX_VALUE
-    // to avoid create Last cache entry
-    if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
-      return Long.MAX_VALUE;
-    }
-    return globalLatestFlushedTimeForEachDevice.getOrDefault(path, 
Long.MIN_VALUE);
-  }
-
   @Override
   public void clearFlushedTime() {
     partitionLatestFlushedTime.clear();
   }
 
-  @Override
-  public void clearGlobalFlushedTime() {
-    globalLatestFlushedTimeForEachDevice.clear();
-  }
-
   @Override
   public void degradeLastFlushTime(long partitionId) {
     partitionLatestFlushedTime.computeIfPresent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 7bdd141bf6b..b5947afbe89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -31,8 +31,6 @@ public interface ILastFlushTimeMap {
 
   void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
 
-  void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap);
-
   void upgradeAndUpdateMultiDeviceFlushedTime(
       long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
 
@@ -49,15 +47,11 @@ public interface ILastFlushTimeMap {
   // region read
   long getFlushedTime(long timePartitionId, IDeviceID deviceId);
 
-  long getGlobalFlushedTime(IDeviceID path);
-
   // endregion
 
   // region clear
   void clearFlushedTime();
 
-  void clearGlobalFlushedTime();
-
   // endregion
 
   void degradeLastFlushTime(long partitionId);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
index 78d96a920aa..cb5c7612fbe 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
@@ -85,12 +85,17 @@ public class IoTDBPipePatternTest {
     final String[] patternsNotOverlapWithDevice = {
       "root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**",
     };
+    final String[] patternsFalsePositiveOverLap = {"root.**.d2.**"};
     for (final String s : patternsOverlapWithDevice) {
       Assert.assertTrue(new IoTDBPipePattern(s).mayOverlapWithDevice(device));
     }
     for (final String t : patternsNotOverlapWithDevice) {
       Assert.assertFalse(new IoTDBPipePattern(t).mayOverlapWithDevice(device));
     }
+    for (final String t : patternsFalsePositiveOverLap) {
+      Assert.assertTrue(new IoTDBPipePattern(t).mayOverlapWithDevice(device));
+      Assert.assertFalse(new IoTDBPipePattern(t).overlapWithDevice(device));
+    }
 
     // Test pattern match measurement
     final String measurement = "s1";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
index ff88f9521f0..279715c2e6a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
@@ -99,7 +99,7 @@ public class IoTDBPipePattern extends 
IoTDBPipePatternOperations {
   public boolean coversDevice(final String device) {
     try {
       return patternPartialPath.include(
-          new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+          measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
     } catch (final IllegalPathException e) {
       return false;
     }
@@ -116,6 +116,16 @@ public class IoTDBPipePattern extends 
IoTDBPipePatternOperations {
     }
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    try {
+      return patternPartialPath.overlapWith(
+          measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+    } catch (final IllegalPathException e) {
+      return false;
+    }
+  }
+
   @Override
   public boolean mayOverlapWithDb(final String db) {
     try {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
index a76306aab81..8c2b4ca0409 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
@@ -82,13 +82,22 @@ public abstract class PipePattern {
   /**
    * Check if a device may have some measurements matched by the pattern.
    *
-   * <p>NOTE1: this is only called when {@link PipePattern#coversDevice} is 
false.
-   *
-   * <p>NOTE2: this is just a loose check and may have false positives. To 
further check if a
+   * <p>NOTE: this is just a loose check and may have false positives. To 
further check if a
    * measurement matches the pattern, please use {@link 
PipePattern#matchesMeasurement} after this.
    */
   public abstract boolean mayOverlapWithDevice(final String device);
 
+  /**
+   * Check if a device has some measurements matched by the pattern.
+   *
+   * <p>NOTE: this is a precise check and will not have false positives. It 
means that, you can
+   * always find a measurement(existing or non-existing) to match the pattern 
with the device.
+   * However, it may not be precise now if there are any exclusions (e.g. 
Inclusion: root.**.d*.*s,
+   * exclusion: root.**.device.*s, device: root.a.b.device). It may be 
supported by incoming
+   * versions.
+   */
+  public abstract boolean overlapWithDevice(final String device);
+
   /**
    * Check if a full path with device and measurement can be matched by 
pattern.
    *
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
index 85af46656a1..aa735722dc7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
@@ -118,6 +118,11 @@ public class PrefixPipePattern extends PipePattern {
         || (pattern.length() > device.length() && pattern.startsWith(device));
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return mayOverlapWithDevice(device);
+  }
+
   @Override
   public boolean mayOverlapWithDb(final String db) {
     return
@@ -128,7 +133,7 @@ public class PrefixPipePattern extends PipePattern {
   }
 
   @Override
-  public boolean matchesMeasurement(final String device, final String 
measurement) {
+  public boolean matchesMeasurement(final String device, String measurement) {
     // We assume that the device is already matched.
     if (pattern.length() <= device.length()) {
       return true;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
index 5000ff22657..b2e2d9bca24 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
@@ -93,6 +93,11 @@ public class UnionIoTDBPipePattern extends 
IoTDBPipePatternOperations {
     return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, 
measurement));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
index 3e497804551..ac6d7263a77 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
@@ -77,6 +77,11 @@ public class UnionPipePattern extends PipePattern {
     return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, 
measurement));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
index b68f10b54dc..b0ccb198e39 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
@@ -92,6 +92,11 @@ public class WithExclusionIoTDBPipePattern extends 
IoTDBPipePatternOperations {
     return inclusionPattern.mayOverlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return inclusionPattern.overlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     return inclusionPattern.matchesMeasurement(device, measurement)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
index 5a10613c8f2..3c3b3189393 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
@@ -91,6 +91,12 @@ public class WithExclusionPipePattern extends PipePattern {
     return inclusionPattern.mayOverlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    // May overlap if inclusion overlaps AND exclusion doesn't fully cover it.
+    return inclusionPattern.overlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     // The core logic: Must match inclusion AND NOT match exclusion.

Reply via email to