This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch matcher-opti-13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d3f15beafaed54fe65bd74a1ff2e33a8bc37c0ee Author: Caideyipi <[email protected]> AuthorDate: Wed Mar 18 17:17:40 2026 +0800 fix --- .../dataregion/realtime/assigner/DisruptorQueue.java | 1 - .../realtime/matcher/CachedSchemaPatternMatcher.java | 20 +++++++------------- .../pipe/datastructure/pattern/IoTDBPipePattern.java | 2 +- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index 9c25296de51..f80e02f2d48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -32,7 +32,6 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java index a01b50e7e68..2cc719ea687 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java @@ -19,15 +19,12 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +34,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -47,7 +45,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { protected final ReentrantReadWriteLock lock; protected final Set<PipeRealtimeDataRegionSource> sources; - protected final Cache<String, Set<PipeRealtimeDataRegionSource>> deviceToSourcesCache; + protected final Map<String, Set<PipeRealtimeDataRegionSource>> deviceToSourcesCache; public CachedSchemaPatternMatcher() { this.lock = new ReentrantReadWriteLock(); @@ -55,10 +53,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { // iterated by {@link #assignToSource}, at the same time the sources may be added or // removed by {@link #register} and {@link #deregister}. this.sources = new CopyOnWriteArraySet<>(); - this.deviceToSourcesCache = - Caffeine.newBuilder() - .maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize()) - .build(); + this.deviceToSourcesCache = new ConcurrentHashMap<>(); } @Override @@ -66,7 +61,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.writeLock().lock(); try { sources.add(source); - deviceToSourcesCache.invalidateAll(); + deviceToSourcesCache.clear(); } finally { lock.writeLock().unlock(); } @@ -77,7 +72,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.writeLock().lock(); try { sources.remove(source); - deviceToSourcesCache.invalidateAll(); + deviceToSourcesCache.clear(); } finally { lock.writeLock().unlock(); } @@ -123,7 +118,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { // 1. try to get matched sources from cache, if not success, match them by device final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice = - deviceToSourcesCache.get(device, this::filterSourcesByDevice); + deviceToSourcesCache.computeIfAbsent(device, this::filterSourcesByDevice); // this would not happen if (sourcesFilteredByDevice == null) { LOGGER.warn("Match result NPE when handle device {}", device); @@ -213,8 +208,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.writeLock().lock(); try { sources.clear(); - deviceToSourcesCache.invalidateAll(); - deviceToSourcesCache.cleanUp(); + deviceToSourcesCache.clear(); } finally { lock.writeLock().unlock(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java index 007ed2dc751..279715c2e6a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java @@ -120,7 +120,7 @@ public class IoTDBPipePattern extends IoTDBPipePatternOperations { public boolean overlapWithDevice(final String device) { try { return patternPartialPath.overlapWith( - measurementPathGetter.apply(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); + measurementPathGetter.apply(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)); } catch (final IllegalPathException e) { return false; }
