This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch matcher-opti-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d3f15beafaed54fe65bd74a1ff2e33a8bc37c0ee
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 18 17:17:40 2026 +0800

    fix
---
 .../dataregion/realtime/assigner/DisruptorQueue.java |  1 -
 .../realtime/matcher/CachedSchemaPatternMatcher.java | 20 +++++++-------------
 .../pipe/datastructure/pattern/IoTDBPipePattern.java |  2 +-
 3 files changed, 8 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index 9c25296de51..f80e02f2d48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -32,7 +32,6 @@ import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index a01b50e7e68..2cc719ea687 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -19,15 +19,12 @@
 
 package org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +34,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -47,7 +45,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   protected final ReentrantReadWriteLock lock;
 
   protected final Set<PipeRealtimeDataRegionSource> sources;
-  protected final Cache<String, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
+  protected final Map<String, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
@@ -55,10 +53,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     // iterated by {@link #assignToSource}, at the same time the sources may 
be added or
     // removed by {@link #register} and {@link #deregister}.
     this.sources = new CopyOnWriteArraySet<>();
-    this.deviceToSourcesCache =
-        Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
-            .build();
+    this.deviceToSourcesCache = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -66,7 +61,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.add(source);
-      deviceToSourcesCache.invalidateAll();
+      deviceToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -77,7 +72,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.remove(source);
-      deviceToSourcesCache.invalidateAll();
+      deviceToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -123,7 +118,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
 
         // 1. try to get matched sources from cache, if not success, match 
them by device
         final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
-            deviceToSourcesCache.get(device, this::filterSourcesByDevice);
+            deviceToSourcesCache.computeIfAbsent(device, 
this::filterSourcesByDevice);
         // this would not happen
         if (sourcesFilteredByDevice == null) {
           LOGGER.warn("Match result NPE when handle device {}", device);
@@ -213,8 +208,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       sources.clear();
-      deviceToSourcesCache.invalidateAll();
-      deviceToSourcesCache.cleanUp();
+      deviceToSourcesCache.clear();
     } finally {
       lock.writeLock().unlock();
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
index 007ed2dc751..279715c2e6a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
@@ -120,7 +120,7 @@ public class IoTDBPipePattern extends 
IoTDBPipePatternOperations {
   public boolean overlapWithDevice(final String device) {
     try {
       return patternPartialPath.overlapWith(
-              measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+          measurementPathGetter.apply(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
     } catch (final IllegalPathException e) {
       return false;
     }

Reply via email to