This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dc0758fcbdb Deleted the useless device flush map & Pipe: Made the
assigner cache to full cache & Use precise match in assigner device & Print
periodical log when assigner has exceeded half of the capacity (#17312)
dc0758fcbdb is described below
commit dc0758fcbdb7dcba535fbc1148124aac2ca0ca01
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 19 11:51:41 2026 +0800
Deleted the useless device flush map & Pipe: Made the assigner cache to
full cache & Use precise match in assigner device & Print periodical log when
assigner has exceeded half of the capacity (#17312)
* fix
* regionid
* refactor
* long-min
* false-po
---
.../realtime/assigner/DisruptorQueue.java | 27 ++++++++++++-
.../realtime/assigner/PipeDataRegionAssigner.java | 2 +-
.../matcher/CachedSchemaPatternMatcher.java | 41 ++++++++------------
.../db/storageengine/dataregion/DataRegion.java | 7 ----
.../dataregion/HashLastFlushTimeMap.java | 44 ----------------------
.../dataregion/ILastFlushTimeMap.java | 6 ---
.../db/pipe/pattern/IoTDBTreePatternTest.java | 5 +++
.../datastructure/pattern/IoTDBTreePattern.java | 12 +++++-
.../datastructure/pattern/PrefixTreePattern.java | 5 +++
.../pipe/datastructure/pattern/TreePattern.java | 15 ++++++--
.../pattern/UnionIoTDBTreePattern.java | 5 +++
.../datastructure/pattern/UnionTreePattern.java | 5 +++
.../pattern/WithExclusionIoTDBTreePattern.java | 5 +++
.../pattern/WithExclusionTreePattern.java | 6 +++
14 files changed, 97 insertions(+), 88 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index fc30f14be5b..52007765fe3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -30,24 +30,32 @@ import
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.Disruptor;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.EventHandler;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.function.Consumer;
import static
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
public class DisruptorQueue {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DisruptorQueue.class);
private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
private final PipeMemoryBlock allocatedMemoryBlock;
private final Disruptor<EventContainer> disruptor;
private final RingBuffer<EventContainer> ringBuffer;
-
private volatile boolean isClosed = false;
+ private final int dataRegionId;
+ private volatile long lastLogTime = Long.MIN_VALUE;
+
public DisruptorQueue(
+ final int dataRegionId,
final EventHandler<PipeRealtimeEvent> eventHandler,
final Consumer<PipeRealtimeEvent> onAssignedHook) {
+ this.dataRegionId = dataRegionId;
final PipeConfig config = PipeConfig.getInstance();
final int ringBufferSize =
config.getPipeSourceAssignerDisruptorRingBufferSize();
final long ringBufferEntrySizeInBytes =
@@ -84,6 +92,7 @@ public class DisruptorQueue {
((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
}
ringBuffer.publishEvent((container, sequence, o) ->
container.setEvent(event), event);
+ mayPrintExceedingLog();
}
public void shutdown() {
@@ -97,6 +106,22 @@ public class DisruptorQueue {
return isClosed;
}
+ private void mayPrintExceedingLog() {
+ final long remainingCapacity = ringBuffer.remainingCapacity();
+ final long bufferSize = ringBuffer.getBufferSize();
+ if ((double) remainingCapacity / bufferSize >= 0.5
+ && System.currentTimeMillis()
+ -
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()
+ >= lastLogTime) {
+ LOGGER.warn(
+ "The assigner queue content has exceeded half, it may be stuck and
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
+ dataRegionId,
+ remainingCapacity,
+ bufferSize);
+ lastLogTime = System.currentTimeMillis();
+ }
+ }
+
private static class EventContainer {
private volatile PipeRealtimeEvent event;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index d56bdf04e01..9c7182f051c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -78,7 +78,7 @@ public class PipeDataRegionAssigner implements Closeable {
public PipeDataRegionAssigner(final int dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
- this.disruptor = new DisruptorQueue(this::assignToSource,
this::onAssignedHook);
+ this.disruptor = new DisruptorQueue(dataRegionId, this::assignToSource,
this::onAssignedHook);
this.dataRegionId = dataRegionId;
PipeAssignerMetrics.getInstance().register(this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 284c2d9f396..77317245d80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher;
import org.apache.iotdb.commons.audit.UserEntity;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -32,8 +31,6 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.Pair;
@@ -45,6 +42,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -60,8 +58,9 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
protected final ReentrantReadWriteLock lock;
protected final Set<PipeRealtimeDataRegionSource> sources;
- protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>>
deviceToSourcesCache;
- protected final Cache<Pair<String, IDeviceID>,
Set<PipeRealtimeDataRegionSource>>
+ // Use full cache to avoid queue stuck and block insertion
+ protected final Map<IDeviceID, Set<PipeRealtimeDataRegionSource>>
deviceToSourcesCache;
+ protected final Map<Pair<String, IDeviceID>,
Set<PipeRealtimeDataRegionSource>>
databaseAndTableToSourcesCache;
public CachedSchemaPatternMatcher() {
@@ -70,14 +69,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
// iterated by {@link #assignToSource}, at the same time the sources may
be added or
// removed by {@link #register} and {@link #deregister}.
this.sources = new CopyOnWriteArraySet<>();
- this.deviceToSourcesCache =
- Caffeine.newBuilder()
-
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
- .build();
- this.databaseAndTableToSourcesCache =
- Caffeine.newBuilder()
-
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
- .build();
+ this.deviceToSourcesCache = new ConcurrentHashMap<>();
+ this.databaseAndTableToSourcesCache = new ConcurrentHashMap<>();
}
@Override
@@ -85,8 +78,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
sources.add(source);
- deviceToSourcesCache.invalidateAll();
- databaseAndTableToSourcesCache.invalidateAll();
+ deviceToSourcesCache.clear();
+ databaseAndTableToSourcesCache.clear();
} finally {
lock.writeLock().unlock();
}
@@ -97,8 +90,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
sources.remove(source);
- deviceToSourcesCache.invalidateAll();
- databaseAndTableToSourcesCache.invalidateAll();
+ deviceToSourcesCache.clear();
+ databaseAndTableToSourcesCache.clear();
} finally {
lock.writeLock().unlock();
}
@@ -109,7 +102,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
// Will invalidate device cache
- databaseAndTableToSourcesCache.invalidateAll();
+ databaseAndTableToSourcesCache.clear();
} finally {
lock.writeLock().unlock();
}
@@ -207,7 +200,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
final Set<PipeRealtimeDataRegionSource> matchedSources) {
// 1. try to get matched sources from cache, if not success, match them by
device
final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
- deviceToSourcesCache.get(device, this::filterSourcesByDevice);
+ deviceToSourcesCache.computeIfAbsent(device,
this::filterSourcesByDevice);
// this would not happen
if (sourcesFilteredByDevice == null) {
LOGGER.warn(
@@ -271,7 +264,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
final TreePattern treePattern = source.getTreePattern();
if (Objects.isNull(treePattern)
|| (treePattern.isTreeModelDataAllowedToBeCaptured()
- && treePattern.mayOverlapWithDevice(device))) {
+ && treePattern.overlapWithDevice(device))) {
filteredSources.add(source);
}
}
@@ -291,7 +284,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
}
final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDatabaseAndTable =
- databaseAndTableToSourcesCache.get(
+ databaseAndTableToSourcesCache.computeIfAbsent(
new Pair<>(databaseName, tableName),
this::filterSourcesByDatabaseAndTable);
// this would not happen
if (sourcesFilteredByDatabaseAndTable == null) {
@@ -350,10 +343,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
sources.clear();
- deviceToSourcesCache.invalidateAll();
- deviceToSourcesCache.cleanUp();
- databaseAndTableToSourcesCache.invalidateAll();
- databaseAndTableToSourcesCache.cleanUp();
+ deviceToSourcesCache.clear();
+ databaseAndTableToSourcesCache.clear();
} finally {
lock.writeLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index b78bd49e6e9..eb37cb1d5d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -812,9 +812,6 @@ public class DataRegion implements IDataRegionForQuery {
if (config.isEnableSeparateData()) {
lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
}
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
- }
}
protected void upgradeAndUpdateDeviceLastFlushTime(
@@ -831,9 +828,6 @@ public class DataRegion implements IDataRegionForQuery {
if (config.isEnableSeparateData()) {
lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
}
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
- }
}
public void initCompactionSchedule() {
@@ -2120,7 +2114,6 @@ public class DataRegion implements IDataRegionForQuery {
this.workUnsequenceTsFileProcessors.clear();
this.tsFileManager.clear();
lastFlushTimeMap.clearFlushedTime();
- lastFlushTimeMap.clearGlobalFlushedTime();
TimePartitionManager.getInstance()
.removeTimePartitionInfo(new
DataRegionId(Integer.parseInt(dataRegionIdString)));
} catch (InterruptedException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 3f0abfd3e24..f29c158cc0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,19 +19,13 @@
package org.apache.iotdb.db.storageengine.dataregion;
-import org.apache.iotdb.db.storageengine.StorageEngine;
-
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class HashLastFlushTimeMap implements ILastFlushTimeMap {
- private static final Logger logger =
LoggerFactory.getLogger(HashLastFlushTimeMap.class);
-
long LONG_SIZE = 24;
long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE;
@@ -47,17 +41,6 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
*/
private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new
ConcurrentHashMap<>();
- /**
- * global mapping of device -> largest timestamp of the latest memtable to *
be submitted to
- * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to
maintain global
- * latestFlushedTime of devices and will be updated along with
- * partitionLatestFlushedTimeForEachDevice
- *
- * <p>It is used to update last cache.
- */
- private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice =
- new ConcurrentHashMap<>();
-
/** record memory cost of map for each partitionId */
private final Map<Long, Long> memCostForEachPartition = new
ConcurrentHashMap<>();
@@ -137,13 +120,6 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
}
- @Override
- public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap) {
- for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
- globalLatestFlushedTimeForEachDevice.merge(entry.getKey(),
entry.getValue(), Math::max);
- }
- }
-
@Override
public boolean checkAndCreateFlushedTimePartition(
long timePartitionId, boolean usingDeviceFlushTime) {
@@ -165,10 +141,6 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
partitionLatestFlushedTime
.computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
.updateLastFlushTime(entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(),
Long.MIN_VALUE)
- < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
- }
}
}
@@ -177,27 +149,11 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
return
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
}
- // This method is for creating last cache entry when insert
- @Override
- public long getGlobalFlushedTime(IDeviceID path) {
- // If TsFileResource is not fully recovered, we should return
Long.MAX_VALUE
- // to avoid create Last cache entry
- if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
- return Long.MAX_VALUE;
- }
- return globalLatestFlushedTimeForEachDevice.getOrDefault(path,
Long.MIN_VALUE);
- }
-
@Override
public void clearFlushedTime() {
partitionLatestFlushedTime.clear();
}
- @Override
- public void clearGlobalFlushedTime() {
- globalLatestFlushedTimeForEachDevice.clear();
- }
-
@Override
public void degradeLastFlushTime(long partitionId) {
partitionLatestFlushedTime.computeIfPresent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 7bdd141bf6b..b5947afbe89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -31,8 +31,6 @@ public interface ILastFlushTimeMap {
void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
- void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap);
-
void upgradeAndUpdateMultiDeviceFlushedTime(
long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
@@ -49,15 +47,11 @@ public interface ILastFlushTimeMap {
// region read
long getFlushedTime(long timePartitionId, IDeviceID deviceId);
- long getGlobalFlushedTime(IDeviceID path);
-
// endregion
// region clear
void clearFlushedTime();
- void clearGlobalFlushedTime();
-
// endregion
void degradeLastFlushTime(long partitionId);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
index e2a7bc0a4b5..6b17bdee6d3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBTreePatternTest.java
@@ -97,12 +97,17 @@ public class IoTDBTreePatternTest {
final String[] patternsNotOverlapWithDevice = {
"root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**",
};
+ final String[] patternsFalsePositiveOverLap = {"root.**.d2.**"};
for (final String s : patternsOverlapWithDevice) {
Assert.assertTrue(new IoTDBTreePattern(s).mayOverlapWithDevice(device));
}
for (final String t : patternsNotOverlapWithDevice) {
Assert.assertFalse(new IoTDBTreePattern(t).mayOverlapWithDevice(device));
}
+ for (final String t : patternsFalsePositiveOverLap) {
+ Assert.assertTrue(new IoTDBTreePattern(t).mayOverlapWithDevice(device));
+ Assert.assertFalse(new IoTDBTreePattern(t).overlapWithDevice(device));
+ }
// Test pattern match measurement
final String measurement = "s1";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
index 8447f5b3258..f7d00abcd59 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
@@ -139,7 +139,7 @@ public class IoTDBTreePattern extends
IoTDBTreePatternOperations {
public boolean coversDevice(final IDeviceID device) {
try {
return patternPartialPath.include(
- new MeasurementPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+ measurementPathGetter.apply(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
} catch (final IllegalPathException e) {
return false;
}
@@ -165,6 +165,16 @@ public class IoTDBTreePattern extends
IoTDBTreePatternOperations {
}
}
+ @Override
+ public boolean overlapWithDevice(final IDeviceID device) {
+ try {
+ return patternPartialPath.overlapWith(
+ measurementPathGetter.apply(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+ } catch (final IllegalPathException e) {
+ return false;
+ }
+ }
+
@Override
public boolean matchesMeasurement(final IDeviceID device, final String
measurement) {
// For aligned timeseries, empty measurement is an alias of the time
column.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
index 19f7f815de2..1d295f57070 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixTreePattern.java
@@ -135,6 +135,11 @@ public class PrefixTreePattern extends TreePattern {
|| (pattern.length() > deviceStr.length() &&
pattern.startsWith(deviceStr));
}
+ @Override
+ public boolean overlapWithDevice(final IDeviceID device) {
+ return mayOverlapWithDevice(device);
+ }
+
@Override
public boolean matchesMeasurement(final IDeviceID device, String
measurement) {
final String deviceStr = device.toString();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 399565ecaab..0d6e7fcdad5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -98,13 +98,22 @@ public abstract class TreePattern {
/**
* Check if a device may have some measurements matched by the pattern.
*
- * <p>NOTE1: this is only called when {@link TreePattern#coversDevice} is
{@code false}.
- *
- * <p>NOTE2: this is just a loose check and may have false positives. To
further check if a
+ * <p>NOTE: this is just a loose check and may have false positives. To
further check if a
* measurement matches the pattern, please use {@link
TreePattern#matchesMeasurement} after this.
*/
public abstract boolean mayOverlapWithDevice(final IDeviceID device);
+ /**
+ * Check if a device has some measurements matched by the pattern.
+ *
+ * <p>NOTE: this is a precise check and will not have false positives. It
means that, you can
+ * always find a measurement(existing or non-existing) to match the pattern
with the device.
+ * However, it may not be precise now if there are any exclusions (e.g.
Inclusion: root.**.d*.*s,
+ * exclusion: root.**.device.*s, device: root.a.b.device). It may be
supported by incoming
+ * versions.
+ */
+ public abstract boolean overlapWithDevice(final IDeviceID device);
+
/**
* Check if a full path with device and measurement can be matched by
pattern.
*
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
index f0c1a50c20a..4921dd57021 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBTreePattern.java
@@ -103,6 +103,11 @@ public class UnionIoTDBTreePattern extends
IoTDBTreePatternOperations {
return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
}
+ @Override
+ public boolean overlapWithDevice(final IDeviceID device) {
+ return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+ }
+
@Override
public boolean matchesMeasurement(final IDeviceID device, final String
measurement) {
return patterns.stream().anyMatch(p -> p.matchesMeasurement(device,
measurement));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
index bf360c7c072..e10075467dd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionTreePattern.java
@@ -81,6 +81,11 @@ public class UnionTreePattern extends TreePattern {
return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
}
+ @Override
+ public boolean overlapWithDevice(final IDeviceID device) {
+ return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+ }
+
@Override
public boolean matchesMeasurement(final IDeviceID device, final String
measurement) {
return patterns.stream().anyMatch(p -> p.matchesMeasurement(device,
measurement));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
index 9fdbec4b2da..2ee02b74b89 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBTreePattern.java
@@ -102,6 +102,11 @@ public class WithExclusionIoTDBTreePattern extends
IoTDBTreePatternOperations {
return inclusionPattern.mayOverlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
}
+ @Override
+ public boolean overlapWithDevice(final IDeviceID device) {
+ return inclusionPattern.overlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
+ }
+
@Override
public boolean matchesMeasurement(final IDeviceID device, final String
measurement) {
return inclusionPattern.matchesMeasurement(device, measurement)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
index ea3114cebdb..ec5178b16ed 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionTreePattern.java
@@ -96,6 +96,12 @@ public class WithExclusionTreePattern extends TreePattern {
return inclusionPattern.mayOverlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
}
+ @Override
+ public boolean overlapWithDevice(final IDeviceID device) {
+ // May overlap if inclusion overlaps AND exclusion doesn't fully cover it.
+ return inclusionPattern.overlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
+ }
+
@Override
public boolean matchesMeasurement(final IDeviceID device, final String
measurement) {
// The core logic: Must match inclusion AND NOT match exclusion.