This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch matcher-opti-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2add8d2ceb2f3f4aa1620dea23a8f1784aff5c2c Author: Caideyipi <[email protected]> AuthorDate: Wed Mar 18 16:44:14 2026 +0800 fix --- .../realtime/assigner/DisruptorQueue.java | 22 ++++++++++- .../db/storageengine/dataregion/DataRegion.java | 7 ---- .../dataregion/HashLastFlushTimeMap.java | 44 ---------------------- .../dataregion/ILastFlushTimeMap.java | 6 --- .../datastructure/pattern/IoTDBPipePattern.java | 12 +++++- .../pipe/datastructure/pattern/PipePattern.java | 15 ++++++-- .../datastructure/pattern/PrefixPipePattern.java | 21 ++++++++++- .../pattern/UnionIoTDBPipePattern.java | 5 +++ .../datastructure/pattern/UnionPipePattern.java | 5 +++ .../pattern/WithExclusionIoTDBPipePattern.java | 5 +++ .../pattern/WithExclusionPipePattern.java | 6 +++ 11 files changed, 85 insertions(+), 63 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index b2b4ea9b83d..9c25296de51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -33,21 +33,26 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.function.Consumer; import static org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR; public class DisruptorQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorQueue.class); private static final IoTDBDaemonThreadFactory THREAD_FACTORY = new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName()); private final PipeMemoryBlock allocatedMemoryBlock; private final Disruptor<EventContainer> disruptor; private final RingBuffer<EventContainer> ringBuffer; - private volatile boolean isClosed = false; + private volatile long lastLogTime = Long.MIN_VALUE; + public DisruptorQueue( final EventHandler<PipeRealtimeEvent> eventHandler, final Consumer<PipeRealtimeEvent> onAssignedHook) { @@ -88,6 +93,7 @@ public class DisruptorQueue { ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer); } ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event); + mayPrintExceedingLog(); } public void shutdown() { @@ -101,6 +107,20 @@ public class DisruptorQueue { return isClosed; } + private void mayPrintExceedingLog() { + final long capacity = ringBuffer.remainingCapacity(); + final long bufferSize = ringBuffer.getBufferSize(); + if ((double) capacity / bufferSize >= 0.5 + && System.currentTimeMillis() - lastLogTime + >= PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()) { + LOGGER.warn( + "The assigner queue content has exceeded half, it may be stuck and may block insertion. capacity: {}, bufferSize: {}", + capacity, + bufferSize); + lastLogTime = System.currentTimeMillis(); + } + } + private static class EventContainer { private PipeRealtimeEvent event; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index c9aff982c4b..3cad57962e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -699,9 +699,6 @@ public class DataRegion implements IDataRegionForQuery { if (config.isEnableSeparateData()) { lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, endTimeMap); } - if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { - lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap); - } } protected void upgradeAndUpdateDeviceLastFlushTime( @@ -718,9 +715,6 @@ public class DataRegion implements IDataRegionForQuery { if (config.isEnableSeparateData()) { lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId, endTimeMap); } - if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { - lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap); - } } public void initCompactionSchedule() { @@ -1678,7 +1672,6 @@ public class DataRegion implements IDataRegionForQuery { this.workUnsequenceTsFileProcessors.clear(); this.tsFileManager.clear(); lastFlushTimeMap.clearFlushedTime(); - lastFlushTimeMap.clearGlobalFlushedTime(); TimePartitionManager.getInstance() .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionIdString))); } catch (InterruptedException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java index 3f0abfd3e24..f29c158cc0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java @@ -19,19 +19,13 @@ package org.apache.iotdb.db.storageengine.dataregion; -import org.apache.iotdb.db.storageengine.StorageEngine; - import org.apache.tsfile.file.metadata.IDeviceID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class HashLastFlushTimeMap implements ILastFlushTimeMap { - private static final Logger logger = LoggerFactory.getLogger(HashLastFlushTimeMap.class); - long LONG_SIZE = 24; long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE; @@ -47,17 +41,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { */ private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new ConcurrentHashMap<>(); - /** - * global mapping of device -> largest timestamp of the latest memtable to * be submitted to - * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global - * latestFlushedTime of devices and will be updated along with - * partitionLatestFlushedTimeForEachDevice - * - * <p>It is used to update last cache. - */ - private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice = - new ConcurrentHashMap<>(); - /** record memory cost of map for each partitionId */ private final Map<Long, Long> memCostForEachPartition = new ConcurrentHashMap<>(); @@ -137,13 +120,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { } } - @Override - public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> globalFlushedTimeMap) { - for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) { - globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), entry.getValue(), Math::max); - } - } - @Override public boolean checkAndCreateFlushedTimePartition( long timePartitionId, boolean usingDeviceFlushTime) { @@ -165,10 +141,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { partitionLatestFlushedTime .computeIfAbsent(partitionId, id -> new DeviceLastFlushTime()) .updateLastFlushTime(entry.getKey(), entry.getValue()); - if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE) - < entry.getValue()) { - globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue()); - } } } @@ -177,27 +149,11 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap { return partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId); } - // This method is for creating last cache entry when insert - @Override - public long getGlobalFlushedTime(IDeviceID path) { - // If TsFileResource is not fully recovered, we should return Long.MAX_VALUE - // to avoid create Last cache entry - if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) { - return Long.MAX_VALUE; - } - return globalLatestFlushedTimeForEachDevice.getOrDefault(path, Long.MIN_VALUE); - } - @Override public void clearFlushedTime() { partitionLatestFlushedTime.clear(); } - @Override - public void clearGlobalFlushedTime() { - globalLatestFlushedTimeForEachDevice.clear(); - } - @Override public void degradeLastFlushTime(long partitionId) { partitionLatestFlushedTime.computeIfPresent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java index 7bdd141bf6b..b5947afbe89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java @@ -31,8 +31,6 @@ public interface ILastFlushTimeMap { void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime); - void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> globalFlushedTimeMap); - void upgradeAndUpdateMultiDeviceFlushedTime( long timePartitionId, Map<IDeviceID, Long> flushedTimeMap); @@ -49,15 +47,11 @@ public interface ILastFlushTimeMap { // region read long getFlushedTime(long timePartitionId, IDeviceID deviceId); - long getGlobalFlushedTime(IDeviceID path); - // endregion // region clear void clearFlushedTime(); - void clearGlobalFlushedTime(); - // endregion void degradeLastFlushTime(long partitionId); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java index ff88f9521f0..007ed2dc751 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java @@ -99,7 +99,7 @@ public class IoTDBPipePattern extends IoTDBPipePatternOperations { public boolean coversDevice(final String device) { try { return patternPartialPath.include( - new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); + measurementPathGetter.apply(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); } catch (final IllegalPathException e) { return false; } @@ -116,6 +116,16 @@ public class IoTDBPipePattern extends IoTDBPipePatternOperations { } } + @Override + public boolean overlapWithDevice(final String device) { + try { + return patternPartialPath.overlapWith( + measurementPathGetter.apply(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); + } catch (final IllegalPathException e) { + return false; + } + } + @Override public boolean mayOverlapWithDb(final String db) { try { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java index a76306aab81..8c2b4ca0409 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java @@ -82,13 +82,22 @@ public abstract class PipePattern { /** * Check if a device may have some measurements matched by the pattern. * - * <p>NOTE1: this is only called when {@link PipePattern#coversDevice} is false. - * - * <p>NOTE2: this is just a loose check and may have false positives. To further check if a + * <p>NOTE: this is just a loose check and may have false positives. To further check if a * measurement matches the pattern, please use {@link PipePattern#matchesMeasurement} after this. */ public abstract boolean mayOverlapWithDevice(final String device); + /** + * Check if a device has some measurements matched by the pattern. + * + * <p>NOTE: this is a precise check and will not have false positives. It means that, you can + * always find a measurement(existing or non-existing) to match the pattern with the device. + * However, it may not be precise now if there are any exclusions (e.g. Inclusion: root.**.d*.*s, + * exclusion: root.**.device.*s, device: root.a.b.device). It may be supported by incoming + * versions. + */ + public abstract boolean overlapWithDevice(final String device); + /** * Check if a full path with device and measurement can be matched by pattern. * diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java index 85af46656a1..9524f07f152 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java @@ -128,7 +128,26 @@ public class PrefixPipePattern extends PipePattern { } @Override - public boolean matchesMeasurement(final String device, final String measurement) { + public boolean mayOverlapWithDevice(final String device) { + final String deviceStr = device.toString(); + return + // for example, pattern is root.a.b and device is root.a.b.c + // in this case, the extractor can be matched without checking the measurements + (pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern)) + // for example, pattern is root.a.b.c and device is root.a.b + // in this case, the extractor can be selected as candidate, but the measurements should + // be checked further + || (pattern.length() > deviceStr.length() && pattern.startsWith(deviceStr)); + } + + @Override + public boolean overlapWithDevice(final String device) { + return mayOverlapWithDevice(device); + } + + @Override + public boolean matchesMeasurement(final String device, String measurement) { + final String deviceStr = device.toString(); // We assume that the device is already matched. if (pattern.length() <= device.length()) { return true; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java index 5000ff22657..b2e2d9bca24 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java @@ -93,6 +93,11 @@ public class UnionIoTDBPipePattern extends IoTDBPipePatternOperations { return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device)); } + @Override + public boolean overlapWithDevice(final String device) { + return patterns.stream().anyMatch(p -> p.overlapWithDevice(device)); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, measurement)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java index 3e497804551..ac6d7263a77 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java @@ -77,6 +77,11 @@ public class UnionPipePattern extends PipePattern { return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device)); } + @Override + public boolean overlapWithDevice(final String device) { + return patterns.stream().anyMatch(p -> p.overlapWithDevice(device)); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, measurement)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java index b68f10b54dc..b0ccb198e39 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java @@ -92,6 +92,11 @@ public class WithExclusionIoTDBPipePattern extends IoTDBPipePatternOperations { return inclusionPattern.mayOverlapWithDevice(device) && !exclusionPattern.coversDevice(device); } + @Override + public boolean overlapWithDevice(final String device) { + return inclusionPattern.overlapWithDevice(device) && !exclusionPattern.coversDevice(device); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { return inclusionPattern.matchesMeasurement(device, measurement) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java index 5a10613c8f2..3c3b3189393 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java @@ -91,6 +91,12 @@ public class WithExclusionPipePattern extends PipePattern { return inclusionPattern.mayOverlapWithDevice(device) && !exclusionPattern.coversDevice(device); } + @Override + public boolean overlapWithDevice(final String device) { + // May overlap if inclusion overlaps AND exclusion doesn't fully cover it. + return inclusionPattern.overlapWithDevice(device) && !exclusionPattern.coversDevice(device); + } + @Override public boolean matchesMeasurement(final String device, final String measurement) { // The core logic: Must match inclusion AND NOT match exclusion.
