This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch matcher-opti-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27d4f2ceacf209f3f8827e2af142f777f576f34a Author: Caideyipi <[email protected]> AuthorDate: Wed Mar 18 16:44:14 2026 +0800 [To dev/1.3] Deleted the useless device flush map & Pipe: Made the assigner cache to full cache & Use precise match in assigner device & Print periodical log when assigner has exceeded half of the capacity --- .../realtime/assigner/DisruptorQueue.java | 26 ++++++++++++- .../realtime/assigner/PipeDataRegionAssigner.java | 6 ++- .../matcher/CachedSchemaPatternMatcher.java | 20 ++++------ .../db/storageengine/dataregion/DataRegion.java | 7 ---- .../dataregion/HashLastFlushTimeMap.java | 44 ---------------------- .../dataregion/ILastFlushTimeMap.java | 6 --- .../datastructure/pattern/IoTDBPipePattern.java | 12 +++++- .../pipe/datastructure/pattern/PipePattern.java | 15 ++++++-- .../datastructure/pattern/PrefixPipePattern.java | 21 ++++++++++- .../pattern/UnionIoTDBPipePattern.java | 5 +++ .../datastructure/pattern/UnionPipePattern.java | 5 +++ .../pattern/WithExclusionIoTDBPipePattern.java | 5 +++ .../pattern/WithExclusionPipePattern.java | 6 +++ 13 files changed, 100 insertions(+), 78 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index b2b4ea9b83d..c7add196e69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -32,6 +32,8 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.function.Consumer; @@ -39,18 +41,23 @@ import static org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISR public class DisruptorQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorQueue.class); private static final IoTDBDaemonThreadFactory THREAD_FACTORY = new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName()); private final PipeMemoryBlock allocatedMemoryBlock; private final Disruptor<EventContainer> disruptor; private final RingBuffer<EventContainer> ringBuffer; - private volatile boolean isClosed = false; + private final int dataRegionId; + private volatile long lastLogTime = Long.MIN_VALUE; + public DisruptorQueue( + final int dataRegionId, final EventHandler<PipeRealtimeEvent> eventHandler, final Consumer<PipeRealtimeEvent> onAssignedHook) { + this.dataRegionId = dataRegionId; final PipeConfig config = PipeConfig.getInstance(); final int ringBufferSize = config.getPipeSourceAssignerDisruptorRingBufferSize(); final long ringBufferEntrySizeInBytes = @@ -88,6 +95,7 @@ public class DisruptorQueue { ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer); } ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event); + mayPrintExceedingLog(); } public void shutdown() { @@ -101,6 +109,22 @@ public class DisruptorQueue { return isClosed; } + private void mayPrintExceedingLog() { + final long remainingCapacity = ringBuffer.remainingCapacity(); + final long bufferSize = ringBuffer.getBufferSize(); + if ((double) remainingCapacity / bufferSize >= 0.5 + && System.currentTimeMillis() + - PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds() + >= lastLogTime) { + LOGGER.warn( + "The assigner queue content has exceeded half, it may be stuck and may block insertion. regionId: {}, capacity: {}, bufferSize: {}", + dataRegionId, + remainingCapacity, + bufferSize); + lastLogTime = System.currentTimeMillis(); + } + } + private static class EventContainer { private PipeRealtimeEvent event; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 6a4f877bf47..e3067a13126 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -64,7 +64,9 @@ public class PipeDataRegionAssigner implements Closeable { public PipeDataRegionAssigner(final String dataRegionId) { this.matcher = new CachedSchemaPatternMatcher(); - this.disruptor = new DisruptorQueue(this::assignToExtractor, this::onAssignedHook); + this.disruptor = + new DisruptorQueue( + Integer.parseInt(dataRegionId), this::assignToSource, this::onAssignedHook); this.dataRegionId = dataRegionId; PipeAssignerMetrics.getInstance().register(this); } @@ -105,7 +107,7 @@ public class PipeDataRegionAssigner implements Closeable { eventCounter.decreaseEventCount(innerEvent); } - private void assignToExtractor( + private void assignToSource( final PipeRealtimeEvent event, final long sequence, final boolean endOfBatch) { if (disruptor.isClosed()) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java index a01b50e7e68..2cc719ea687 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java @@ -19,15 +19,12 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +34,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -47,7 +45,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { protected final ReentrantReadWriteLock lock; protected final Set<PipeRealtimeDataRegionSource> sources; - protected final Cache<String, Set<PipeRealtimeDataRegionSource>> deviceToSourcesCache; + protected final Map<String, Set<PipeRealtimeDataRegionSource>> deviceToSourcesCache; public CachedSchemaPatternMatcher() { this.lock = new ReentrantReadWriteLock(); @@ -55,10 +53,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { // iterated by {@link #assignToSource}, at the same time the sources may be added or // removed by {@link #register} and {@link #deregister}. this.sources = new CopyOnWriteArraySet<>(); - this.deviceToSourcesCache = - Caffeine.newBuilder() - .maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize()) - .build(); + this.deviceToSourcesCache = new ConcurrentHashMap<>(); } @Override @@ -66,7 +61,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.writeLock().lock(); try { sources.add(source); - deviceToSourcesCache.invalidateAll(); + deviceToSourcesCache.clear(); } finally { lock.writeLock().unlock(); } @@ -77,7 +72,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.writeLock().lock(); try { sources.remove(source); - deviceToSourcesCache.invalidateAll(); + deviceToSourcesCache.clear(); } finally { lock.writeLock().unlock(); } @@ -123,7 +118,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { // 1. try to get matched sources from cache, if not success, match them by device final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice = - deviceToSourcesCache.get(device, this::filterSourcesByDevice); + deviceToSourcesCache.computeIfAbsent(device, this::filterSourcesByDevice); // this would not happen if (sourcesFilteredByDevice == null) { LOGGER.warn("Match result NPE when handle device {}", device); @@ -213,8 +208,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.writeLock().lock(); try { sources.clear(); - deviceToSourcesCache.invalidateAll(); - deviceToSourcesCache.cleanUp(); + deviceToSourcesCache.clear(); } finally { lock.writeLock().unlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index c9aff982c4b..3cad57962e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -699,9 +699,6 @@ public class DataRegion implements IDataRegionForQuery { if (config.isEnableSeparateData()) { lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, endTimeMap); } - if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { - lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap); - } } protected void upgradeAndUpdateDeviceLastFlushTime( @@ -718,9 +715,6 @@ public class DataRegion implements IDataRegionForQuery { if (config.isEnableSeparateData()) { lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId, endTimeMap); } - if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { - lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap); - } } public void initCompactionSchedule() { @@ -1678,7 +1672,6 @@ public class DataRegion implements IDataRegionForQuery { this.workUnsequenceTsFileProcessors.clear(); this.tsFileManager.clear(); lastFlushTimeMap.clearFlushedTime(); - lastFlushTimeMap.clearGlobalFlushedTime(); TimePartitionManager.getInstance() .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionIdString))); } catch (InterruptedException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java index 3f0abfd3e24..f29c158cc0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java @@ -19,19 +19,13 @@ package org.apache.iotdb.db.storageengine.dataregion; -import org.apache.iotdb.db.storageengine.StorageEngine; - import org.apache.tsfile.file.metadata.IDeviceID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class HashLastFlushTimeMap implements ILastFlushTimeMap { - private static final Logger logger = LoggerFactory.getLogger(HashLastFlushTimeMap.class); - long LONG_SIZE = 24; long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE; @@ -47,17 +41,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { */ private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new ConcurrentHashMap<>(); - /** - * global mapping of device -> largest timestamp of the latest memtable to * be submitted to - * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global - * latestFlushedTime of devices and will be updated along with - * partitionLatestFlushedTimeForEachDevice - * - * <p>It is used to update last cache. - */ - private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice = - new ConcurrentHashMap<>(); - /** record memory cost of map for each partitionId */ private final Map<Long, Long> memCostForEachPartition = new ConcurrentHashMap<>(); @@ -137,13 +120,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { } } - @Override - public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> globalFlushedTimeMap) { - for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) { - globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), entry.getValue(), Math::max); - } - } - @Override public boolean checkAndCreateFlushedTimePartition( long timePartitionId, boolean usingDeviceFlushTime) { @@ -165,10 +141,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { partitionLatestFlushedTime .computeIfAbsent(partitionId, id -> new DeviceLastFlushTime()) .updateLastFlushTime(entry.getKey(), entry.getValue()); - if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE) - < entry.getValue()) { - globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue()); - } } } @@ -177,27 +149,11 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { return partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId); } - // This method is for creating last cache entry when insert - @Override - public long getGlobalFlushedTime(IDeviceID path) { - // If TsFileResource is not fully recovered, we should return Long.MAX_VALUE - // to avoid create Last cache entry - if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { - return Long.MAX_VALUE; - } - return globalLatestFlushedTimeForEachDevice.getOrDefault(path, Long.MIN_VALUE); - } - @Override public void clearFlushedTime() { partitionLatestFlushedTime.clear(); } - @Override - public void clearGlobalFlushedTime() { - globalLatestFlushedTimeForEachDevice.clear(); - } - @Override public void degradeLastFlushTime(long partitionId) { partitionLatestFlushedTime.computeIfPresent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java index 7bdd141bf6b..b5947afbe89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java @@ -31,8 +31,6 @@ public interface ILastFlushTimeMap { void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime); - void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> globalFlushedTimeMap); - void upgradeAndUpdateMultiDeviceFlushedTime( long timePartitionId, Map<IDeviceID, Long> flushedTimeMap); @@ -49,15 +47,11 @@ public interface ILastFlushTimeMap { // region read long getFlushedTime(long timePartitionId, IDeviceID deviceId); - long getGlobalFlushedTime(IDeviceID path); - // endregion // region clear void clearFlushedTime(); - void clearGlobalFlushedTime(); - // endregion void degradeLastFlushTime(long partitionId); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java index ff88f9521f0..279715c2e6a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java @@ -99,7 +99,7 @@ public class IoTDBPipePattern extends IoTDBPipePatternOperations { public boolean coversDevice(final String device) { try { return patternPartialPath.include( - new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); + measurementPathGetter.apply(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); } catch (final IllegalPathException e) { return false; } @@ -116,6 +116,16 @@ public class IoTDBPipePattern extends IoTDBPipePatternOperations { } } + @Override + public boolean overlapWithDevice(final String device) { + try { + return patternPartialPath.overlapWith( + measurementPathGetter.apply(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); + } catch (final IllegalPathException e) { + return false; + } + } + @Override public boolean mayOverlapWithDb(final String db) { try { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java index a76306aab81..8c2b4ca0409 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java @@ -82,13 +82,22 @@ public abstract class PipePattern { /** * Check if a device may have some measurements matched by the pattern. * - * <p>NOTE1: this is only called when {@link PipePattern#coversDevice} is false. - * - * <p>NOTE2: this is just a loose check and may have false positives. To further check if a + * <p>NOTE: this is just a loose check and may have false positives. To further check if a * measurement matches the pattern, please use {@link PipePattern#matchesMeasurement} after this. */ public abstract boolean mayOverlapWithDevice(final String device); + /** + * Check if a device has some measurements matched by the pattern. + * + * <p>NOTE: this is a precise check and will not have false positives. It means that, you can + * always find a measurement(existing or non-existing) to match the pattern with the device. + * However, it may not be precise now if there are any exclusions (e.g. Inclusion: root.**.d*.*s, + * exclusion: root.**.device.*s, device: root.a.b.device). It may be supported by incoming + * versions. + */ + public abstract boolean overlapWithDevice(final String device); + /** * Check if a full path with device and measurement can be matched by pattern. * diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java index 85af46656a1..9524f07f152 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java @@ -128,7 +128,26 @@ public class PrefixPipePattern extends PipePattern { } @Override - public boolean matchesMeasurement(final String device, final String measurement) { + public boolean mayOverlapWithDevice(final String device) { + final String deviceStr = device.toString(); + return + // for example, pattern is root.a.b and device is root.a.b.c + // in this case, the extractor can be matched without checking the measurements + (pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern)) + // for example, pattern is root.a.b.c and device is root.a.b + // in this case, the extractor can be selected as candidate, but the measurements should + // be checked further + || (pattern.length() > deviceStr.length() && pattern.startsWith(deviceStr)); + } + + @Override + public boolean overlapWithDevice(final String device) { + return mayOverlapWithDevice(device); + } + + @Override + public boolean matchesMeasurement(final String device, String measurement) { + final String deviceStr = device.toString(); // We assume that the device is already matched. if (pattern.length() <= device.length()) { return true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java index 5000ff22657..b2e2d9bca24 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java @@ -93,6 +93,11 @@ public class UnionIoTDBPipePattern extends IoTDBPipePatternOperations { return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device)); } + @Override + public boolean overlapWithDevice(final String device) { + return patterns.stream().anyMatch(p -> p.overlapWithDevice(device)); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, measurement)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java index 3e497804551..ac6d7263a77 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java @@ -77,6 +77,11 @@ public class UnionPipePattern extends PipePattern { return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device)); } + @Override + public boolean overlapWithDevice(final String device) { + return patterns.stream().anyMatch(p -> p.overlapWithDevice(device)); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, measurement)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java index b68f10b54dc..b0ccb198e39 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java @@ -92,6 +92,11 @@ public class WithExclusionIoTDBPipePattern extends IoTDBPipePatternOperations { return inclusionPattern.mayOverlapWithDevice(device) && !exclusionPattern.coversDevice(device); } + @Override + public boolean overlapWithDevice(final String device) { + return inclusionPattern.overlapWithDevice(device) && !exclusionPattern.coversDevice(device); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { return inclusionPattern.matchesMeasurement(device, measurement) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java index 5a10613c8f2..3c3b3189393 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java @@ -91,6 +91,12 @@ public class WithExclusionPipePattern extends PipePattern { return inclusionPattern.mayOverlapWithDevice(device) && !exclusionPattern.coversDevice(device); } + @Override + public boolean overlapWithDevice(final String device) { + // May overlap if inclusion overlaps AND exclusion doesn't fully cover it. + return inclusionPattern.overlapWithDevice(device) && !exclusionPattern.coversDevice(device); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { // The core logic: Must match inclusion AND NOT match exclusion.
