This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch matcher-opti-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2add8d2ceb2f3f4aa1620dea23a8f1784aff5c2c
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 16:44:14 2026 +0800

    fix
---
 .../realtime/assigner/DisruptorQueue.java          | 22 ++++++++++-
 .../db/storageengine/dataregion/DataRegion.java    |  7 ----
 .../dataregion/HashLastFlushTimeMap.java           | 44 ----------------------
 .../dataregion/ILastFlushTimeMap.java              |  6 ---
 .../datastructure/pattern/IoTDBPipePattern.java    | 12 +++++-
 .../pipe/datastructure/pattern/PipePattern.java    | 15 ++++++--
 .../datastructure/pattern/PrefixPipePattern.java   | 21 ++++++++++-
 .../pattern/UnionIoTDBPipePattern.java             |  5 +++
 .../datastructure/pattern/UnionPipePattern.java    |  5 +++
 .../pattern/WithExclusionIoTDBPipePattern.java     |  5 +++
 .../pattern/WithExclusionPipePattern.java          |  6 +++
 11 files changed, 85 insertions(+), 63 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index b2b4ea9b83d..9c25296de51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -33,21 +33,26 @@ import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.function.Consumer;
 
 import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
 
 public class DisruptorQueue {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DisruptorQueue.class);
   private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
       new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   private final Disruptor<EventContainer> disruptor;
   private final RingBuffer<EventContainer> ringBuffer;
-
   private volatile boolean isClosed = false;
 
+  private volatile long lastLogTime = Long.MIN_VALUE;
+
   public DisruptorQueue(
       final EventHandler<PipeRealtimeEvent> eventHandler,
       final Consumer<PipeRealtimeEvent> onAssignedHook) {
@@ -88,6 +93,7 @@ public class DisruptorQueue {
       ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
     }
     ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
+    mayPrintExceedingLog();
   }
 
   public void shutdown() {
@@ -101,6 +107,20 @@ public class DisruptorQueue {
     return isClosed;
   }
 
+  private void mayPrintExceedingLog() {
+    final long capacity = ringBuffer.remainingCapacity();
+    final long bufferSize = ringBuffer.getBufferSize();
+    if ((double) capacity / bufferSize >= 0.5
+        && System.currentTimeMillis() - lastLogTime
+            >= 
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()) {
+      LOGGER.warn(
+          "The assigner queue content has exceeded half, it may be stuck and 
may block insertion. capacity: {}, bufferSize: {}",
+          capacity,
+          bufferSize);
+      lastLogTime = System.currentTimeMillis();
+    }
+  }
+
   private static class EventContainer {
 
     private PipeRealtimeEvent event;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index c9aff982c4b..3cad57962e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -699,9 +699,6 @@ public class DataRegion implements IDataRegionForQuery {
     if (config.isEnableSeparateData()) {
       lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
     }
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
-    }
   }
 
   protected void upgradeAndUpdateDeviceLastFlushTime(
@@ -718,9 +715,6 @@ public class DataRegion implements IDataRegionForQuery {
     if (config.isEnableSeparateData()) {
       lastFlushTimeMap.upgradeAndUpdateMultiDeviceFlushedTime(timePartitionId, 
endTimeMap);
     }
-    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
-      lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap);
-    }
   }
 
   public void initCompactionSchedule() {
@@ -1678,7 +1672,6 @@ public class DataRegion implements IDataRegionForQuery {
       this.workUnsequenceTsFileProcessors.clear();
       this.tsFileManager.clear();
       lastFlushTimeMap.clearFlushedTime();
-      lastFlushTimeMap.clearGlobalFlushedTime();
       TimePartitionManager.getInstance()
           .removeTimePartitionInfo(new 
DataRegionId(Integer.parseInt(dataRegionIdString)));
     } catch (InterruptedException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index 3f0abfd3e24..f29c158cc0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,19 +19,13 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
-import org.apache.iotdb.db.storageengine.StorageEngine;
-
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class HashLastFlushTimeMap implements ILastFlushTimeMap {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(HashLastFlushTimeMap.class);
-
   long LONG_SIZE = 24;
 
   long HASHMAP_NODE_BASIC_SIZE = 14 + LONG_SIZE;
@@ -47,17 +41,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
    */
   private final Map<Long, ILastFlushTime> partitionLatestFlushedTime = new 
ConcurrentHashMap<>();
 
-  /**
-   * global mapping of device -> largest timestamp of the latest memtable to * 
be submitted to
-   * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to 
maintain global
-   * latestFlushedTime of devices and will be updated along with
-   * partitionLatestFlushedTimeForEachDevice
-   *
-   * <p>It is used to update last cache.
-   */
-  private final Map<IDeviceID, Long> globalLatestFlushedTimeForEachDevice =
-      new ConcurrentHashMap<>();
-
   /** record memory cost of map for each partitionId */
   private final Map<Long, Long> memCostForEachPartition = new 
ConcurrentHashMap<>();
 
@@ -137,13 +120,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
     }
   }
 
-  @Override
-  public void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap) {
-    for (Map.Entry<IDeviceID, Long> entry : globalFlushedTimeMap.entrySet()) {
-      globalLatestFlushedTimeForEachDevice.merge(entry.getKey(), 
entry.getValue(), Math::max);
-    }
-  }
-
   @Override
   public boolean checkAndCreateFlushedTimePartition(
       long timePartitionId, boolean usingDeviceFlushTime) {
@@ -165,10 +141,6 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
       partitionLatestFlushedTime
           .computeIfAbsent(partitionId, id -> new DeviceLastFlushTime())
           .updateLastFlushTime(entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), 
Long.MIN_VALUE)
-          < entry.getValue()) {
-        globalLatestFlushedTimeForEachDevice.put(entry.getKey(), 
entry.getValue());
-      }
     }
   }
 
@@ -177,27 +149,11 @@ public class HashLastFlushTimeMap implements 
ILastFlushTimeMap {
     return 
partitionLatestFlushedTime.get(timePartitionId).getLastFlushTime(deviceId);
   }
 
-  // This method is for creating last cache entry when insert
-  @Override
-  public long getGlobalFlushedTime(IDeviceID path) {
-    // If TsFileResource is not fully recovered, we should return 
Long.MAX_VALUE
-    // to avoid create Last cache entry
-    if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
-      return Long.MAX_VALUE;
-    }
-    return globalLatestFlushedTimeForEachDevice.getOrDefault(path, 
Long.MIN_VALUE);
-  }
-
   @Override
   public void clearFlushedTime() {
     partitionLatestFlushedTime.clear();
   }
 
-  @Override
-  public void clearGlobalFlushedTime() {
-    globalLatestFlushedTimeForEachDevice.clear();
-  }
-
   @Override
   public void degradeLastFlushTime(long partitionId) {
     partitionLatestFlushedTime.computeIfPresent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 7bdd141bf6b..b5947afbe89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -31,8 +31,6 @@ public interface ILastFlushTimeMap {
 
   void updatePartitionFlushedTime(long timePartitionId, long maxFlushedTime);
 
-  void updateMultiDeviceGlobalFlushedTime(Map<IDeviceID, Long> 
globalFlushedTimeMap);
-
   void upgradeAndUpdateMultiDeviceFlushedTime(
       long timePartitionId, Map<IDeviceID, Long> flushedTimeMap);
 
@@ -49,15 +47,11 @@ public interface ILastFlushTimeMap {
   // region read
   long getFlushedTime(long timePartitionId, IDeviceID deviceId);
 
-  long getGlobalFlushedTime(IDeviceID path);
-
   // endregion
 
   // region clear
   void clearFlushedTime();
 
-  void clearGlobalFlushedTime();
-
   // endregion
 
   void degradeLastFlushTime(long partitionId);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
index ff88f9521f0..007ed2dc751 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
@@ -99,7 +99,7 @@ public class IoTDBPipePattern extends 
IoTDBPipePatternOperations {
   public boolean coversDevice(final String device) {
     try {
       return patternPartialPath.include(
-          new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+          measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
     } catch (final IllegalPathException e) {
       return false;
     }
@@ -116,6 +116,16 @@ public class IoTDBPipePattern extends 
IoTDBPipePatternOperations {
     }
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    try {
+      return patternPartialPath.overlapWith(
+              measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+    } catch (final IllegalPathException e) {
+      return false;
+    }
+  }
+
   @Override
   public boolean mayOverlapWithDb(final String db) {
     try {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
index a76306aab81..8c2b4ca0409 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
@@ -82,13 +82,22 @@ public abstract class PipePattern {
   /**
    * Check if a device may have some measurements matched by the pattern.
    *
-   * <p>NOTE1: this is only called when {@link PipePattern#coversDevice} is 
false.
-   *
-   * <p>NOTE2: this is just a loose check and may have false positives. To 
further check if a
+   * <p>NOTE: this is just a loose check and may have false positives. To 
further check if a
    * measurement matches the pattern, please use {@link 
PipePattern#matchesMeasurement} after this.
    */
   public abstract boolean mayOverlapWithDevice(final String device);
 
+  /**
+   * Check if a device has some measurements matched by the pattern.
+   *
+   * <p>NOTE: this is a precise check and will not have false positives. It 
means that, you can
+   * always find a measurement(existing or non-existing) to match the pattern 
with the device.
+   * However, it may not be precise now if there are any exclusions (e.g. 
Inclusion: root.**.d*.*s,
+   * exclusion: root.**.device.*s, device: root.a.b.device). It may be 
supported by incoming
+   * versions.
+   */
+  public abstract boolean overlapWithDevice(final String device);
+
   /**
    * Check if a full path with device and measurement can be matched by 
pattern.
    *
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
index 85af46656a1..9524f07f152 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
@@ -128,7 +128,26 @@ public class PrefixPipePattern extends PipePattern {
   }
 
   @Override
-  public boolean matchesMeasurement(final String device, final String 
measurement) {
+  public boolean mayOverlapWithDevice(final String device) {
+    final String deviceStr = device.toString();
+    return
+    // for example, pattern is root.a.b and device is root.a.b.c
+    // in this case, the extractor can be matched without checking the 
measurements
+    (pattern.length() <= deviceStr.length() && deviceStr.startsWith(pattern))
+        // for example, pattern is root.a.b.c and device is root.a.b
+        // in this case, the extractor can be selected as candidate, but the 
measurements should
+        // be checked further
+        || (pattern.length() > deviceStr.length() && 
pattern.startsWith(deviceStr));
+  }
+
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return mayOverlapWithDevice(device);
+  }
+
+  @Override
+  public boolean matchesMeasurement(final String device, String measurement) {
+    final String deviceStr = device.toString();
     // We assume that the device is already matched.
     if (pattern.length() <= device.length()) {
       return true;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
index 5000ff22657..b2e2d9bca24 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
@@ -93,6 +93,11 @@ public class UnionIoTDBPipePattern extends 
IoTDBPipePatternOperations {
     return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, 
measurement));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
index 3e497804551..ac6d7263a77 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
@@ -77,6 +77,11 @@ public class UnionPipePattern extends PipePattern {
     return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return patterns.stream().anyMatch(p -> p.overlapWithDevice(device));
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     return patterns.stream().anyMatch(p -> p.matchesMeasurement(device, 
measurement));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
index b68f10b54dc..b0ccb198e39 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionIoTDBPipePattern.java
@@ -92,6 +92,11 @@ public class WithExclusionIoTDBPipePattern extends 
IoTDBPipePatternOperations {
     return inclusionPattern.mayOverlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    return inclusionPattern.overlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     return inclusionPattern.matchesMeasurement(device, measurement)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
index 5a10613c8f2..3c3b3189393 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/WithExclusionPipePattern.java
@@ -91,6 +91,12 @@ public class WithExclusionPipePattern extends PipePattern {
     return inclusionPattern.mayOverlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
   }
 
+  @Override
+  public boolean overlapWithDevice(final String device) {
+    // May overlap if inclusion overlaps AND exclusion doesn't fully cover it.
+    return inclusionPattern.overlapWithDevice(device) && 
!exclusionPattern.coversDevice(device);
+  }
+
   @Override
   public boolean matchesMeasurement(final String device, final String 
measurement) {
     // The core logic: Must match inclusion AND NOT match exclusion.

Reply via email to