This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 69fb8a78d84 [To dev/1.3] Deleted the useless device flush map & Pipe:
Made the assigner cache to full cache & Use precise match in assigner device &
Print periodical log when assigner has exceeded half of the capacity (#17313)
69fb8a78d84 is described below
commit 69fb8a78d84f42b330048b8bcecd8fd1cdd45755
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 19 11:50:17 2026 +0800
[To dev/1.3] Deleted the useless device flush map & Pipe: Made the assigner
cache to full cache & Use precise match in assigner device & Print periodical
log when assigner has exceeded half of the capacity (#17313)
* [To dev/1.3] Deleted the useless device flush map & Pipe: Made the
assigner cache to full cache & Use precise match in assigner device & Print
periodical log when assigner has exceeded half of the capacity
* fix
* merge-problem
* false-po
* compile
* key-fix
---
.../realtime/assigner/DisruptorQueue.java | 26 ++++++++++++-
.../realtime/assigner/PipeDataRegionAssigner.java | 6 ++-
.../matcher/CachedSchemaPatternMatcher.java | 22 ++++-------
.../db/storageengine/dataregion/DataRegion.java | 7 ----
.../dataregion/HashLastFlushTimeMap.java | 44 ----------------------
.../dataregion/ILastFlushTimeMap.java | 6 ---
.../db/pipe/pattern/IoTDBPipePatternTest.java | 5 +++
.../datastructure/pattern/IoTDBPipePattern.java | 12 +++++-
.../pipe/datastructure/pattern/PipePattern.java | 15 ++++++--
.../datastructure/pattern/PrefixPipePattern.java | 7 +++-
.../pattern/UnionIoTDBPipePattern.java | 5 +++
.../datastructure/pattern/UnionPipePattern.java | 5 +++
.../pattern/WithExclusionIoTDBPipePattern.java | 5 +++
.../pattern/WithExclusionPipePattern.java | 6 +++
14 files changed, 92 insertions(+), 79 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index b2b4ea9b83d..c7add196e69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -32,6 +32,8 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.function.Consumer;
@@ -39,18 +41,23 @@ import static
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISR
public class DisruptorQueue {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DisruptorQueue.class);
private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
private final PipeMemoryBlock allocatedMemoryBlock;
private final Disruptor<EventContainer> disruptor;
private final RingBuffer<EventContainer> ringBuffer;
-
private volatile boolean isClosed = false;
+ private final int dataRegionId;
+ private volatile long lastLogTime = Long.MIN_VALUE;
+
public DisruptorQueue(
+ final int dataRegionId,
final EventHandler<PipeRealtimeEvent> eventHandler,
final Consumer<PipeRealtimeEvent> onAssignedHook) {
+ this.dataRegionId = dataRegionId;
final PipeConfig config = PipeConfig.getInstance();
final int ringBufferSize =
config.getPipeSourceAssignerDisruptorRingBufferSize();
final long ringBufferEntrySizeInBytes =
@@ -88,6 +95,7 @@ public class DisruptorQueue {
((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
}
ringBuffer.publishEvent((container, sequence, o) ->
container.setEvent(event), event);
+ mayPrintExceedingLog();
}
public void shutdown() {
@@ -101,6 +109,22 @@ public class DisruptorQueue {
return isClosed;
}
+ private void mayPrintExceedingLog() {
+ final long remainingCapacity = ringBuffer.remainingCapacity();
+ final long bufferSize = ringBuffer.getBufferSize();
+ if ((double) remainingCapacity / bufferSize >= 0.5
+ && System.currentTimeMillis()
+ -
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()
+ >= lastLogTime) {
+ LOGGER.warn(
+ "The assigner queue content has exceeded half, it may be stuck and
may block insertion. regionId: {}, capacity: {}, bufferSize: {}",
+ dataRegionId,
+ remainingCapacity,
+ bufferSize);
+ lastLogTime = System.currentTimeMillis();
+ }
+ }
+
private static class EventContainer {
private PipeRealtimeEvent event;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 6a4f877bf47..e3067a13126 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -64,7 +64,9 @@ public class PipeDataRegionAssigner implements Closeable {
public PipeDataRegionAssigner(final String dataRegionId) {
this.matcher = new CachedSchemaPatternMatcher();
- this.disruptor = new DisruptorQueue(this::assignToExtractor,
this::onAssignedHook);
+ this.disruptor =
+ new DisruptorQueue(
+ Integer.parseInt(dataRegionId), this::assignToSource,
this::onAssignedHook);
this.dataRegionId = dataRegionId;
PipeAssignerMetrics.getInstance().register(this);
}
@@ -105,7 +107,7 @@ public class PipeDataRegionAssigner implements Closeable {
eventCounter.decreaseEventCount(innerEvent);
}
- private void assignToExtractor(
+ private void assignToSource(
final PipeRealtimeEvent event, final long sequence, final boolean
endOfBatch) {
if (disruptor.isClosed()) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index a01b50e7e68..eae60bdbc32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -19,15 +19,12 @@
package org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +34,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,7 +45,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
protected final ReentrantReadWriteLock lock;
protected final Set<PipeRealtimeDataRegionSource> sources;
- protected final Cache<String, Set<PipeRealtimeDataRegionSource>>
deviceToSourcesCache;
+ protected final Map<String, Set<PipeRealtimeDataRegionSource>>
deviceToSourcesCache;
public CachedSchemaPatternMatcher() {
this.lock = new ReentrantReadWriteLock();
@@ -55,10 +53,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
// iterated by {@link #assignToSource}, at the same time the sources may
be added or
// removed by {@link #register} and {@link #deregister}.
this.sources = new CopyOnWriteArraySet<>();
- this.deviceToSourcesCache =
- Caffeine.newBuilder()
-
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
- .build();
+ this.deviceToSourcesCache = new ConcurrentHashMap<>();
}
@Override
@@ -66,7 +61,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
sources.add(source);
- deviceToSourcesCache.invalidateAll();
+ deviceToSourcesCache.clear();
} finally {
lock.writeLock().unlock();
}
@@ -77,7 +72,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
sources.remove(source);
- deviceToSourcesCache.invalidateAll();
+ deviceToSourcesCache.clear();
} finally {
lock.writeLock().unlock();
}
@@ -123,7 +118,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
// 1. try to get matched sources from cache, if not success, match
them by device
final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
- deviceToSourcesCache.get(device, this::filterSourcesByDevice);
+ deviceToSourcesCache.computeIfAbsent(device,
this::filterSourcesByDevice);
// this would not happen
if (sourcesFilteredByDevice == null) {
LOGGER.warn("Match result NPE when handle device {}", device);
@@ -200,7 +195,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
}
final PipePattern pipePattern = source.getPipePattern();
- if (Objects.isNull(pipePattern) ||
pipePattern.mayOverlapWithDevice(device)) {
+ if (Objects.isNull(pipePattern) ||
pipePattern.overlapWithDevice(device)) {
filteredSources.add(source);
}
}
@@ -213,8 +208,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
sources.clear();
- deviceToSourcesCache.invalidateAll();
- deviceToSourcesCache.cleanUp();
+ deviceToSourcesCache.clear();
} finally {
lock.writeLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index c9aff982c4b..3cad57962e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -699,9 +699,6 @@ public class DataRegion implements IDataRegionForQuery {
if (config.isEnableSeparateData()) {
lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
}
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
- }
}
protected void upgradeAndUpdateDeviceLastFlushTime(
@@ -718,9 +715,6 @@ public class DataRegion implements IDataRegionForQuery {
if (config.isEnableSeparateData()) {
lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId,
endTimeMap);
}
- if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
- }
}
public void initCompactionSchedule() {
@@ -1678,7 +1672,6 @@ public class DataRegion implements IDataRegionForQuery {
this.workUnsequenceTsFileProcessors.clear();
this.tsFileManager.clear();
lastFlushTimeMap.clearFlushedTime();
- lastFlushTimeMap.clearGlobalFlushedTime();
TimePartitionManager.getInstance()
.removeTimePartitionInfo(new
DataRegionId(Integer.parseInt(dataRegionIdString)));
} catch (InterruptedException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 3f0abfd3e24..f29c158cc0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,19 +19,13 @@
package org.apache.iotdb.db.storageengine.dataregion;
-import org.apache.iotdb.db.storageengine.StorageEngine;
-
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class HashLastFlushTimeMap implements ILastFlushTimeMap {
- private static final Logger logger =
LoggerFactory.getLogger(HashLastFlushTimeMap.class);
-
long LONG_SIZE = 24;
long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE;
@@ -47,17 +41,6 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
*/
private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new
ConcurrentHashMap<>();
- /**
- * global mapping of device -> largest timestamp of the latest memtable to *
be submitted to
- * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to
maintain global
- * latestFlushedTime of devices and will be updated along with
- * partitionLatestFlushedTimeForEachDevice
- *
- * <p>It is used to update last cache.
- */
- private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice =
- new ConcurrentHashMap<>();
-
/** record memory cost of map for each partitionId */
private final Map<Long, Long> memCostForEachPartition = new
ConcurrentHashMap<>();
@@ -137,13 +120,6 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
}
}
- @Override
- public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap) {
- for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
- globalLatestFlushedTimeForEachDevice.merge(entry.getKey(),
entry.getValue(), Math::max);
- }
- }
-
@Override
public boolean checkAndCreateFlushedTimePartition(
long timePartitionId, boolean usingDeviceFlushTime) {
@@ -165,10 +141,6 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
partitionLatestFlushedTime
.computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
.updateLastFlushTime(entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(),
Long.MIN_VALUE)
- < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(),
entry.getValue());
- }
}
}
@@ -177,27 +149,11 @@ public class HashLastFlushTimeMap implements
ILastFlushTimeMap {
return
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
}
- // This method is for creating last cache entry when insert
- @Override
- public long getGlobalFlushedTime(IDeviceID path) {
- // If TsFileResource is not fully recovered, we should return
Long.MAX_VALUE
- // to avoid create Last cache entry
- if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
- return Long.MAX_VALUE;
- }
- return globalLatestFlushedTimeForEachDevice.getOrDefault(path,
Long.MIN_VALUE);
- }
-
@Override
public void clearFlushedTime() {
partitionLatestFlushedTime.clear();
}
- @Override
- public void clearGlobalFlushedTime() {
- globalLatestFlushedTimeForEachDevice.clear();
- }
-
@Override
public void degradeLastFlushTime(long partitionId) {
partitionLatestFlushedTime.computeIfPresent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 7bdd141bf6b..b5947afbe89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -31,8 +31,6 @@ public interface ILastFlushTimeMap {
void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
- void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long>
globalFlushedTimeMap);
-
void upgradeAndUpdateMultiDeviceFlushedTime(
long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
@@ -49,15 +47,11 @@ public interface ILastFlushTimeMap {
// region read
long getFlushedTime(long timePartitionId, IDeviceID deviceId);
- long getGlobalFlushedTime(IDeviceID path);
-
// endregion
// region clear
void clearFlushedTime();
- void clearGlobalFlushedTime();
-
// endregion
void degradeLastFlushTime(long partitionId);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
index 78d96a920aa..cb5c7612fbe 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
@@ -85,12 +85,17 @@ public class IoTDBPipePatternTest {
final String[] patternsNotOverlapWithDevice = {
"root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**",
};
+ final String[] patternsFalsePositiveOverLap = {"root.**.d2.**"};
for (final String s : patternsOverlapWithDevice) {
Assert.assertTrue(new IoTDBPipePattern(s).mayOverlapWithDevice(device));
}
for (final String t : patternsNotOverlapWithDevice) {
Assert.assertFalse(new IoTDBPipePattern(t).mayOverlapWithDevice(device));
}
+ for (final String t : patternsFalsePositiveOverLap) {
+ Assert.assertTrue(new IoTDBPipePattern(t).mayOverlapWithDevice(device));
+ Assert.assertFalse(new IoTDBPipePattern(t).overlapWithDevice(device));
+ }
// Test pattern match measurement
final String measurement = "s1";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
index ff88f9521f0..279715c2e6a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
@@ -99,7 +99,7 @@ public class IoTDBPipePattern extends
IoTDBPipePatternOperations {
public boolean coversDevice(final String device) {
try {
return patternPartialPath.include(
- new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+ measurementPathGetter.apply(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
} catch (final IllegalPathException e) {
return false;
}
@@ -116,6 +116,16 @@ public class IoTDBPipePattern extends
IoTDBPipePatternOperations {
}
}
+ @Override
+ public boolean overlapWithDevice(final String device) {
+ try {
+ return patternPartialPath.overlapWith(
+ measurementPathGetter.apply(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+ } catch (final IllegalPathException e) {
+ return false;
+ }
+ }
+
@Override
public boolean mayOverlapWithDb(final String db) {
try {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
index a76306aab81..8c2b4ca0409 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
@@ -82,13 +82,22 @@ public abstract class PipePattern {
/**
* Check if a device may have some measurements matched by the pattern.
*
- * <p>NOTE1: this is only called when {@link PipePattern#coversDevice} is
false.
- *
- * <p>NOTE2: this is just a loose check and may have false positives. To
further check if a
+ * <p>NOTE: this is just a loose check and may have false positives. To
further check if a
* measurement matches the pattern, please use {@link
PipePattern#matchesMeasurement} after this.
*/
public abstract boolean mayOverlapWithDevice(final String device);
+ /**
+ * Check if a device has some measurements matched by the pattern.
+ *
+ * <p>NOTE: this is a precise check and will not have false positives. It
means that, you can
+ * always find a measurement(existing or non-existing) to match the pattern
with the device.
+ * However, it may not be precise now if there are any exclusions (e.g.
Inclusion: root.**.d*.*s,
+ * exclusion: root.**.device.*s, device: root.a.b.device). It may be
supported by incoming
+ * versions.
+ */
+ public abstract boolean overlapWithDevice(final String device);
+
/**
* Check if a full path with device and measurement can be matched by
pattern.
*
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
index 85af46656a1..aa735722dc7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
@@ -118,6 +118,11 @@ public class PrefixPipePattern extends PipePattern {
|| (pattern.length() > device.length() && pattern.startsWith(device));
}
+ @Override
+ public boolean overlapWithDevice(final String device) {
+ return mayOverlapWithDevice(device);
+ }
+
@Override
public boolean mayOverlapWithDb(final String db) {
return
@@ -128,7 +133,7 @@ public class PrefixPipePattern extends PipePattern {
}
@Override
- public boolean matchesMeasurement(final String device, final String
measurement) {
+ public boolean matchesMeasurement(final String device, String measurement) {
// We assume that the device is already matched.
if (pattern.length() <= device.length()) {
return true;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
index 5000ff22657..b2e2d9bca24 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
@@ -93,6 +93,11 @@ public class UnionIoTDBPipePattern extends
IoTDBPipePatternOperations {
return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
}
+ @Override
+ public boolean overlapWithDevice(final String device) {
+ return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+ }
+
@Override
public boolean matchesMeasurement(final String device, final String
measurement) {
return patterns.stream().anyMatch(p -> p.matchesMeasurement(device,
measurement));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
index 3e497804551..ac6d7263a77 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
@@ -77,6 +77,11 @@ public class UnionPipePattern extends PipePattern {
return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
}
+ @Override
+ public boolean overlapWithDevice(final String device) {
+ return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+ }
+
@Override
public boolean matchesMeasurement(final String device, final String
measurement) {
return patterns.stream().anyMatch(p -> p.matchesMeasurement(device,
measurement));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
index b68f10b54dc..b0ccb198e39 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
@@ -92,6 +92,11 @@ public class WithExclusionIoTDBPipePattern extends
IoTDBPipePatternOperations {
return inclusionPattern.mayOverlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
}
+ @Override
+ public boolean overlapWithDevice(final String device) {
+ return inclusionPattern.overlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
+ }
+
@Override
public boolean matchesMeasurement(final String device, final String
measurement) {
return inclusionPattern.matchesMeasurement(device, measurement)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
index 5a10613c8f2..3c3b3189393 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
@@ -91,6 +91,12 @@ public class WithExclusionPipePattern extends PipePattern {
return inclusionPattern.mayOverlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
}
+ @Override
+ public boolean overlapWithDevice(final String device) {
+ // May overlap if inclusion overlaps AND exclusion doesn't fully cover it.
+ return inclusionPattern.overlapWithDevice(device) &&
!exclusionPattern.coversDevice(device);
+ }
+
@Override
public boolean matchesMeasurement(final String device, final String
measurement) {
// The core logic: Must match inclusion AND NOT match exclusion.