This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dc0758fcbdb Deleted the useless device flush map & Pipe: Made the 
assigner cache to full cache & Use precise match in assigner device & Print 
periodical log when assigner has exceeded half of the capacity (#17312)
dc0758fcbdb is described below

commit dc0758fcbdb7dcba535fbc1148124aac2ca0ca01
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 19 11:51:41 2026 +0800

    Deleted the useless device flush map & Pipe: Made the assigner cache to 
full cache & Use precise match in assigner device & Print periodical log when 
assigner has exceeded half of the capacity (#17312)
    
    * fix
    
    * regionid
    
    * refactor
    
    * long-min
    
    * false-po
---
 .../realtime/assigner/DisruptorQueue.java          | 27 ++++++++++++-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  2 +-
 .../matcher/CachedSchemaPatternMatcher.java        | 41 ++++++++------------
 .../db/storageengine/dataregion/DataRegion.java    |  7 ----
 .../dataregion/HashLastFlushTimeMap.java           | 44 ----------------------
 .../dataregion/ILastFlushTimeMap.java              |  6 ---
 .../db/pipe/pattern/IoTDBTreePatternTest.java      |  5 +++
 .../datastructure/pattern/IoTDBTreePattern.java    | 12 +++++-
 .../datastructure/pattern/PrefixTreePattern.java   |  5 +++
 .../pipe/datastructure/pattern/TreePattern.java    | 15 ++++++--
 .../pattern/UnionIoTDBTreePattern.java             |  5 +++
 .../datastructure/pattern/UnionTreePattern.java    |  5 +++
 .../pattern/WithExclusionIoTDBTreePattern.java     |  5 +++
 .../pattern/WithExclusionTreePattern.java          |  6 +++
 14 files changed, 97 insertions(+), 88 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index fc30f14be5b..52007765fe3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -30,24 +30,32 @@ import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.Disruptor;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.EventHandler;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.function.Consumer;
 
 import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
 
 public class DisruptorQueue {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DisruptorQueue.class);
   private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
       new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   private final Disruptor<EventContainer> disruptor;
   private final RingBuffer<EventContainer> ringBuffer;
-
   private volatile boolean isClosed = false;
 
+  private final int dataRegionId;
+  private volatile long lastLogTime = Long.MIN_VALUE;
+
   public DisruptorQueue(
+      final int dataRegionId,
       final EventHandler<PipeRealtimeEvent> eventHandler,
       final Consumer<PipeRealtimeEvent> onAssignedHook) {
+    this.dataRegionId = dataRegionId;
     final PipeConfig config = PipeConfig.getInstance();
     final int ringBufferSize = 
config.getPipeSourceAssignerDisruptorRingBufferSize();
     final long ringBufferEntrySizeInBytes =
@@ -84,6 +92,7 @@ public class DisruptorQueue {
       ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
     }
     ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
+    mayPrintExceedingLog();
   }
 
   public void shutdown() {
@@ -97,6 +106,22 @@ public class DisruptorQueue {
     return isClosed;
   }
 
+  private void mayPrintExceedingLog() {
+    final long remainingCapacity = ringBuffer.remainingCapacity();
+    final long bufferSize = ringBuffer.getBufferSize();
+    if ((double) remainingCapacity / bufferSize >= 0.5
+        && System.currentTimeMillis()
+                - 
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()
+            >= lastLogTime) {
+      LOGGER.warn(
+          "The assigner queue content has exceeded half, it may be stuck and 
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
+          dataRegionId,
+          remainingCapacity,
+          bufferSize);
+      lastLogTime = System.currentTimeMillis();
+    }
+  }
+
   private static class EventContainer {
 
     private volatile PipeRealtimeEvent event;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index d56bdf04e01..9c7182f051c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -78,7 +78,7 @@ public class PipeDataRegionAssigner implements Closeable {
 
   public PipeDataRegionAssigner(final int dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
-    this.disruptor = new DisruptorQueue(this::assignToSource, 
this::onAssignedHook);
+    this.disruptor = new DisruptorQueue(dataRegionId, this::assignToSource, 
this::onAssignedHook);
     this.dataRegionId = dataRegionId;
     PipeAssignerMetrics.getInstance().register(this);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 284c2d9f396..77317245d80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher;
 
 import org.apache.iotdb.commons.audit.UserEntity;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -32,8 +31,6 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.utils.Pair;
@@ -45,6 +42,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -60,8 +58,9 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   protected final ReentrantReadWriteLock lock;
   protected final Set<PipeRealtimeDataRegionSource> sources;
 
-  protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
-  protected final Cache<Pair<String, IDeviceID>, 
Set<PipeRealtimeDataRegionSource>>
+  // Use full cache to avoid queue stuck and block insertion
+  protected final Map<IDeviceID, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
+  protected final Map<Pair<String, IDeviceID>, 
Set<PipeRealtimeDataRegionSource>>
       databaseAndTableToSourcesCache;
 
   public CachedSchemaPatternMatcher() {
@@ -70,14 +69,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     // iterated by {@link #assignToSource}, at the same time the sources may 
be added or
     // removed by {@link #register} and {@link #deregister}.
     this.sources = new CopyOnWriteArraySet<>();
-    this.deviceToSourcesCache =
-        Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
-            .build();
-    this.databaseAndTableToSourcesCache =
-        Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
-            .build();
+    this.deviceToSourcesCache = new ConcurrentHashMap<>();
+    this.databaseAndTableToSourcesCache = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -85,8 +78,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.add(source);
-      deviceToSourcesCache.invalidateAll();
-      databaseAndTableToSourcesCache.invalidateAll();
+      deviceToSourcesCache.clear();
+      databaseAndTableToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -97,8 +90,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.remove(source);
-      deviceToSourcesCache.invalidateAll();
-      databaseAndTableToSourcesCache.invalidateAll();
+      deviceToSourcesCache.clear();
+      databaseAndTableToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -109,7 +102,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       // Will invalidate device cache
-      databaseAndTableToSourcesCache.invalidateAll();
+      databaseAndTableToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -207,7 +200,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
       final Set<PipeRealtimeDataRegionSource> matchedSources) {
     // 1. try to get matched sources from cache, if not success, match them by 
device
     final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
-        deviceToSourcesCache.get(device, this::filterSourcesByDevice);
+        deviceToSourcesCache.computeIfAbsent(device, 
this::filterSourcesByDevice);
     // this would not happen
     if (sourcesFilteredByDevice == null) {
       LOGGER.warn(
@@ -271,7 +264,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
       final TreePattern treePattern = source.getTreePattern();
       if (Objects.isNull(treePattern)
           || (treePattern.isTreeModelDataAllowedToBeCaptured()
-              && treePattern.mayOverlapWithDevice(device))) {
+              && treePattern.overlapWithDevice(device))) {
         filteredSources.add(source);
       }
     }
@@ -291,7 +284,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     }
 
     final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDatabaseAndTable =
-        databaseAndTableToSourcesCache.get(
+        databaseAndTableToSourcesCache.computeIfAbsent(
             new Pair<>(databaseName, tableName), 
this::filterSourcesByDatabaseAndTable);
     // this would not happen
     if (sourcesFilteredByDatabaseAndTable == null) {
@@ -350,10 +343,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.clear();
-      deviceToSourcesCache.invalidateAll();
-      deviceToSourcesCache.cleanUp();
-      databaseAndTableToSourcesCache.invalidateAll();
-      databaseAndTableToSourcesCache.cleanUp();
+      deviceToSourcesCache.clear();
+      databaseAndTableToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index b78bd49e6e9..eb37cb1d5d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -812,9 +812,6 @@ public class DataRegion implements IDataRegionForQuery {
     if (config.isEnableSeparateData()) {
       lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
     }
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
-    }
   }
 
   protected void upgradeAndUpdateDeviceLastFlushTime(
@@ -831,9 +828,6 @@ public class DataRegion implements IDataRegionForQuery {
     if (config.isEnableSeparateData()) {
       lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
     }
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
-    }
   }
 
   public void initCompactionSchedule() {
@@ -2120,7 +2114,6 @@ public class DataRegion implements IDataRegionForQuery {
       this.workUnsequenceTsFileProcessors.clear();
       this.tsFileManager.clear();
       lastFlushTimeMap.clearFlushedTime();
-      lastFlushTimeMap.clearGlobalFlushedTime();
       TimePartitionManager.getInstance()
           .removeTimePartitionInfo(new 
DataRegionId(Integer.parseInt(dataRegionIdString)));
     } catch (InterruptedException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 3f0abfd3e24..f29c158cc0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,19 +19,13 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
-import org.apache.iotdb.db.storageengine.StorageEngine;
-
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class HashLastFlushTimeMap implements ILastFlushTimeMap {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(HashLastFlushTimeMap.class);
-
   long LONG_SIZE = 24;
 
   long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE;
@@ -47,17 +41,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
    */
   private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new 
ConcurrentHashMap<>();
 
-  /**
-   * global mapping of device -> largest timestamp of the latest memtable to * 
be submitted to
-   * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to 
maintain global
-   * latestFlushedTime of devices and will be updated along with
-   * partitionLatestFlushedTimeForEachDevice
-   *
-   * <p>It is used to update last cache.
-   */
-  private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice =
-      new ConcurrentHashMap<>();
-
   /** record memory cost of map for each partitionId */
   private final Map<Long, Long> memCostForEachPartition = new 
ConcurrentHashMap<>();
 
@@ -137,13 +120,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
     }
   }
 
-  @Override
-  public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap) {
-    for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
-      globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), 
entry.getValue(), Math::max);
-    }
-  }
-
   @Override
   public boolean checkAndCreateFlushedTimePartition(
       long timePartitionId, boolean usingDeviceFlushTime) {
@@ -165,10 +141,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
       partitionLatestFlushedTime
           .computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
           .updateLastFlushTime(entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), 
Long.MIN_VALUE)
-          < entry.getValue()) {
-        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), 
entry.getValue());
-      }
     }
   }
 
@@ -177,27 +149,11 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
     return 
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
   }
 
-  // This method is for creating last cache entry when insert
-  @Override
-  public long getGlobalFlushedTime(IDeviceID path) {
-    // If TsFileResource is not fully recovered, we should return 
Long.MAX_VALUE
-    // to avoid create Last cache entry
-    if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
-      return Long.MAX_VALUE;
-    }
-    return globalLatestFlushedTimeForEachDevice.getOrDefault(path, 
Long.MIN_VALUE);
-  }
-
   @Override
   public void clearFlushedTime() {
     partitionLatestFlushedTime.clear();
   }
 
-  @Override
-  public void clearGlobalFlushedTime() {
-    globalLatestFlushedTimeForEachDevice.clear();
-  }
-
   @Override
   public void degradeLastFlushTime(long partitionId) {
     partitionLatestFlushedTime.computeIfPresent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 7bdd141bf6b..b5947afbe89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -31,8 +31,6 @@ public interface ILastFlushTimeMap {
 
   void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
 
-  void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap);
-
   void upgradeAndUpdateMultiDeviceFlushedTime(
       long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
 
@@ -49,15 +47,11 @@ public interface ILastFlushTimeMap {
   // region read
   long getFlushedTime(long timePartitionId, IDeviceID deviceId);
 
-  long getGlobalFlushedTime(IDeviceID path);
-
   // endregion
 
   // region clear
   void clearFlushedTime();
 
-  void clearGlobalFlushedTime();
-
   // endregion
 
   void degradeLastFlushTime(long partitionId);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
index e2a7bc0a4b5..6b17bdee6d3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
@@ -97,12 +97,17 @@ public class IoTDBTreePatternTest {
     final String[] patternsNotOverlapWithDevice = {
       "root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**",
     };
+    final String[] patternsFalsePositiveOverLap = {"root.**.d2.**"};
     for (final String s : patternsOverlapWithDevice) {
       Assert.assertTrue(new IoTDBTreePattern(s).mayOverlapWithDevice(device));
     }
     for (final String t : patternsNotOverlapWithDevice) {
       Assert.assertFalse(new IoTDBTreePattern(t).mayOverlapWithDevice(device));
     }
+    for (final String t : patternsFalsePositiveOverLap) {
+      Assert.assertTrue(new IoTDBTreePattern(t).mayOverlapWithDevice(device));
+      Assert.assertFalse(new IoTDBTreePattern(t).overlapWithDevice(device));
+    }
 
     // Test pattern match measurement
     final String measurement = "s1";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
index 8447f5b3258..f7d00abcd59 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
@@ -139,7 +139,7 @@ public class IoTDBTreePattern extends 
IoTDBTreePatternOperations {
   public boolean coversDevice(final IDeviceID device) {
     try {
       return patternPartialPath.include(
-          new MeasurementPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+          measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
     } catch (final IllegalPathException e) {
       return false;
     }
@@ -165,6 +165,16 @@ public class IoTDBTreePattern extends 
IoTDBTreePatternOperations {
     }
   }
 
+  @Override
+  public boolean overlapWithDevice(final IDeviceID device) {
+    try {
+      return patternPartialPath.overlapWith(
+          measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+    } catch (final IllegalPathException e) {
+      return false;
+    }
+  }
+
   @Override
   public boolean matchesMeasurement(final IDeviceID device, final String 
measurement) {
     // For aligned timeseries, empty measurement is an alias of the time 
column.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
index 19f7f815de2..1d295f57070 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
@@ -135,6 +135,11 @@ public class PrefixTreePattern extends TreePattern {
         || (pattern.length() > deviceStr.length() && 
pattern.startsWith(deviceStr));
   }
 
+  @Override
+  public boolean overlapWithDevice(final IDeviceID device) {
+    return mayOverlapWithDevice(device);
+  }
+
   @Override
   public boolean matchesMeasurement(final IDeviceID device, String 
measurement) {
     final String deviceStr = device.toString();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 399565ecaab..0d6e7fcdad5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -98,13 +98,22 @@ public abstract class TreePattern {
   /**
    * Check if a device may have some measurements matched by the pattern.
    *
-   * <p>NOTE1: this is only called when {@link TreePattern#coversDevice} is 
{@code false}.
-   *
-   * <p>NOTE2: this is just a loose check and may have false positives. To 
further check if a
+   * <p>NOTE: this is just a loose check and may have false positives. To 
further check if a
    * measurement matches the pattern, please use {@link 
TreePattern#matchesMeasurement} after this.
    */
   public abstract boolean mayOverlapWithDevice(final IDeviceID device);
 
+  /**
+   * Check if a device has some measurements matched by the pattern.
+   *
+   * <p>NOTE: this is a precise check and will not have false positives. It 
means that, you can
+   * always find a measurement(existing or non-existing) to match the pattern 
with the device.
+   * However, it may not be precise now if there are any exclusions (e.g. 
Inclusion: root.**.d*.*s,
+   * exclusion: root.**.device.*s, device: root.a.b.device). It may be 
supported by incoming
+   * versions.
+   */
+  public abstract boolean overlapWithDevice(final IDeviceID device);
+
   /**
    * Check if a full path with device and measurement can be matched by 
pattern.
    *
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
index f0c1a50c20a..4921dd57021 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
@@ -103,6 +103,11 @@ public class UnionIoTDBTreePattern extends 
IoTDBTreePatternOperations {
     return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
   }
 
+  @Override
+  public boolean overlapWithDevice(final IDeviceID device) {
+    return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+  }
+
   @Override
   public boolean matchesMeasurement(final IDeviceID device, final String 
measurement) {
     return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, 
measurement));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
index bf360c7c072..e10075467dd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
@@ -81,6 +81,11 @@ public class UnionTreePattern extends TreePattern {
     return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
   }
 
+  @Override
+  public boolean overlapWithDevice(final IDeviceID device) {
+    return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+  }
+
   @Override
   public boolean matchesMeasurement(final IDeviceID device, final String 
measurement) {
     return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, 
measurement));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
index 9fdbec4b2da..2ee02b74b89 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
@@ -102,6 +102,11 @@ public class WithExclusionIoTDBTreePattern extends 
IoTDBTreePatternOperations {
     return inclusionPattern.mayOverlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
   }
 
+  @Override
+  public boolean overlapWithDevice(final IDeviceID device) {
+    return inclusionPattern.overlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
+  }
+
   @Override
   public boolean matchesMeasurement(final IDeviceID device, final String 
measurement) {
     return inclusionPattern.matchesMeasurement(device, measurement)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
index ea3114cebdb..ec5178b16ed 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
@@ -96,6 +96,12 @@ public class WithExclusionTreePattern extends TreePattern {
     return inclusionPattern.mayOverlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
   }
 
+  @Override
+  public boolean overlapWithDevice(final IDeviceID device) {
+    // May overlap if inclusion overlaps AND exclusion doesn't fully cover it.
+    return inclusionPattern.overlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
+  }
+
   @Override
   public boolean matchesMeasurement(final IDeviceID device, final String 
measurement) {
     // The core logic: Must match inclusion AND NOT match exclusion.

Reply via email to