This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3446a313388 Pipe: Optimized the path construction efficiency in 
pattern match (#16265)
3446a313388 is described below

commit 3446a313388c6075e58ce5eb3c58d3fd66e480cd
Author: Caideyipi <87789683+caidey...@users.noreply.github.com>
AuthorDate: Tue Aug 26 16:41:54 2025 +0800

    Pipe: Optimized the path construction efficiency in pattern match (#16265)
    
    * refactpr
    
    * fix-optimize
    
    * optimie
    
    * fix
---
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    |   7 +-
 .../matcher/CachedSchemaPatternMatcher.java        | 180 ++++++++++-----------
 .../cache/schema/DataNodeDevicePathCache.java      |   3 -
 .../execute/utils/CompactionPathUtils.java         |  23 +--
 .../apache/iotdb/commons/conf/CommonConfig.java    |  14 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |   6 +-
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   6 +-
 .../datastructure/pattern/IoTDBTreePattern.java    |  23 ++-
 8 files changed, 143 insertions(+), 119 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 593ba6a297b..67621e3033a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -39,6 +40,7 @@ import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStar
 import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.slf4j.Logger;
@@ -69,7 +71,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
   //////////////////////////// System Service Interface 
////////////////////////////
 
   public synchronized void preparePipeResources(
-      ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
+      final ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
     // Clean sender (connector) hardlink file dir and snapshot dir
     PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
 
@@ -78,6 +80,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
 
     PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
     simpleProgressIndexAssigner.start();
+
+    IoTDBTreePattern.setDevicePathGetter(CompactionPathUtils::getPath);
+    IoTDBTreePattern.setMeasurementPathGetter(CompactionPathUtils::getPath);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 884bad39bb5..4411432e1da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -58,47 +58,47 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
 
   protected final ReentrantReadWriteLock lock;
   private final AccessControl accessControl = 
Coordinator.getInstance().getAccessControl();
-  protected final Set<PipeRealtimeDataRegionSource> extractors;
+  protected final Set<PipeRealtimeDataRegionSource> sources;
 
-  protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>> 
deviceToExtractorsCache;
+  protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
   protected final Cache<Pair<String, IDeviceID>, 
Set<PipeRealtimeDataRegionSource>>
-      databaseAndTableToExtractorsCache;
+      databaseAndTableToSourcesCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
-    // Should be thread-safe because the extractors will be returned by {@link 
#match} and
-    // iterated by {@link #assignToExtractor}, at the same time the extractors 
may be added or
+    // Should be thread-safe because the sources will be returned by {@link 
#match} and
+    // iterated by {@link #assignToSource}, at the same time the sources may 
be added or
     // removed by {@link #register} and {@link #deregister}.
-    this.extractors = new CopyOnWriteArraySet<>();
-    this.deviceToExtractorsCache =
+    this.sources = new CopyOnWriteArraySet<>();
+    this.deviceToSourcesCache =
         Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
+            
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
             .build();
-    this.databaseAndTableToExtractorsCache =
+    this.databaseAndTableToSourcesCache =
         Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
+            
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
             .build();
   }
 
   @Override
-  public void register(final PipeRealtimeDataRegionSource extractor) {
+  public void register(final PipeRealtimeDataRegionSource source) {
     lock.writeLock().lock();
     try {
-      extractors.add(extractor);
-      deviceToExtractorsCache.invalidateAll();
-      databaseAndTableToExtractorsCache.invalidateAll();
+      sources.add(source);
+      deviceToSourcesCache.invalidateAll();
+      databaseAndTableToSourcesCache.invalidateAll();
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   @Override
-  public void deregister(final PipeRealtimeDataRegionSource extractor) {
+  public void deregister(final PipeRealtimeDataRegionSource source) {
     lock.writeLock().lock();
     try {
-      extractors.remove(extractor);
-      deviceToExtractorsCache.invalidateAll();
-      databaseAndTableToExtractorsCache.invalidateAll();
+      sources.remove(source);
+      deviceToSourcesCache.invalidateAll();
+      databaseAndTableToSourcesCache.invalidateAll();
     } finally {
       lock.writeLock().unlock();
     }
@@ -109,7 +109,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     lock.writeLock().lock();
     try {
       // Will invalidate device cache
-      databaseAndTableToExtractorsCache.invalidateAll();
+      databaseAndTableToSourcesCache.invalidateAll();
     } finally {
       lock.writeLock().unlock();
     }
@@ -119,7 +119,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   public int getRegisterCount() {
     lock.readLock().lock();
     try {
-      return extractors.size();
+      return sources.size();
     } finally {
       lock.readLock().unlock();
     }
@@ -128,26 +128,26 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   @Override
   public Pair<Set<PipeRealtimeDataRegionSource>, 
Set<PipeRealtimeDataRegionSource>> match(
       final PipeRealtimeEvent event) {
-    final Set<PipeRealtimeDataRegionSource> matchedExtractors = new 
HashSet<>();
+    final Set<PipeRealtimeDataRegionSource> matchedSources = new HashSet<>();
 
     lock.readLock().lock();
     try {
-      if (extractors.isEmpty()) {
-        return new Pair<>(matchedExtractors, extractors);
+      if (sources.isEmpty()) {
+        return new Pair<>(matchedSources, sources);
       }
 
-      // HeartbeatEvent will be assigned to all extractors
+      // HeartbeatEvent will be assigned to all sources
       if (event.getEvent() instanceof PipeHeartbeatEvent) {
-        return new Pair<>(extractors, Collections.EMPTY_SET);
+        return new Pair<>(sources, Collections.EMPTY_SET);
       }
 
       // TODO: consider table pattern?
-      // Deletion event will be assigned to extractors listened to it
+      // Deletion event will be assigned to sources listened to it
       if (event.getEvent() instanceof PipeDeleteDataNodeEvent) {
-        extractors.stream()
+        sources.stream()
             .filter(PipeRealtimeDataRegionSource::shouldExtractDeletion)
-            .forEach(matchedExtractors::add);
-        return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
+            .forEach(matchedSources::add);
+        return new Pair<>(matchedSources, 
findUnmatchedSources(matchedSources));
       }
 
       for (final Map.Entry<IDeviceID, String[]> entry : 
event.getSchemaInfo().entrySet()) {
@@ -158,7 +158,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
             || 
deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
             || deviceID.getTableName().equals(PATH_ROOT)) {
           event.markAsTreeModelEvent();
-          matchTreeModelEvent(deviceID, entry.getValue(), matchedExtractors);
+          matchTreeModelEvent(deviceID, entry.getValue(), matchedSources);
         } else {
           event.markAsTableModelEvent();
           matchTableModelEvent(
@@ -166,70 +166,70 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
                   ? ((PipeInsertionEvent) 
event.getEvent()).getTableModelDatabaseName()
                   : null,
               deviceID,
-              matchedExtractors);
+              matchedSources);
         }
 
-        if (matchedExtractors.size() == extractors.size()) {
+        if (matchedSources.size() == sources.size()) {
           break;
         }
       }
 
-      return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
+      return new Pair<>(matchedSources, findUnmatchedSources(matchedSources));
     } finally {
       lock.readLock().unlock();
     }
   }
 
-  private Set<PipeRealtimeDataRegionSource> findUnmatchedExtractors(
-      final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
-    final Set<PipeRealtimeDataRegionSource> unmatchedExtractors = new 
HashSet<>();
-    for (final PipeRealtimeDataRegionSource extractor : extractors) {
-      if (!matchedExtractors.contains(extractor)) {
-        unmatchedExtractors.add(extractor);
+  private Set<PipeRealtimeDataRegionSource> findUnmatchedSources(
+      final Set<PipeRealtimeDataRegionSource> matchedSources) {
+    final Set<PipeRealtimeDataRegionSource> unmatchedSources = new HashSet<>();
+    for (final PipeRealtimeDataRegionSource source : sources) {
+      if (!matchedSources.contains(source)) {
+        unmatchedSources.add(source);
       }
     }
-    return unmatchedExtractors;
+    return unmatchedSources;
   }
 
   protected void matchTreeModelEvent(
       final IDeviceID device,
       final String[] measurements,
-      final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
-    // 1. try to get matched extractors from cache, if not success, match them 
by device
-    final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDevice =
-        deviceToExtractorsCache.get(device, this::filterExtractorsByDevice);
+      final Set<PipeRealtimeDataRegionSource> matchedSources) {
+    // 1. try to get matched sources from cache, if not success, match them by 
device
+    final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
+        deviceToSourcesCache.get(device, this::filterSourcesByDevice);
     // this would not happen
-    if (extractorsFilteredByDevice == null) {
+    if (sourcesFilteredByDevice == null) {
       LOGGER.warn(
-          "Extractors filtered by device is null when matching extractors for 
tree model event.",
+          "Sources filtered by device is null when matching sources for tree 
model event.",
           new Exception());
       return;
     }
 
-    // 2. filter matched candidate extractors by measurements
+    // 2. filter matched candidate sources by measurements
     if (measurements.length == 0) {
-      // `measurements` is empty (only in case of tsfile event). match all 
extractors.
+      // `measurements` is empty (only in case of tsfile event). match all 
sources.
       //
       // case 1: the pattern can match all measurements of the device.
-      // in this case, the extractor can be matched without checking the 
measurements.
+      // in this case, the source can be matched without checking the 
measurements.
       //
       // case 2: the pattern may match some measurements of the device.
       // in this case, we can't get all measurements efficiently here,
-      // so we just ASSUME the extractor matches and do more checks later.
-      matchedExtractors.addAll(extractorsFilteredByDevice);
+      // so we just ASSUME the source matches and do more checks later.
+      matchedSources.addAll(sourcesFilteredByDevice);
     } else {
       // `measurements` is not empty (only in case of tablet event).
-      // Match extractors by measurements.
-      extractorsFilteredByDevice.forEach(
-          extractor -> {
-            if (matchedExtractors.size() == extractors.size()) {
+      // Match sources by measurements.
+      sourcesFilteredByDevice.forEach(
+          source -> {
+            if (matchedSources.size() == sources.size()) {
               return;
             }
 
-            final TreePattern pattern = extractor.getTreePattern();
+            final TreePattern pattern = source.getTreePattern();
             if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(device)) {
               // The pattern can match all measurements of the device.
-              matchedExtractors.add(extractor);
+              matchedSources.add(source);
             } else {
               for (final String measurement : measurements) {
                 // Ignore null measurement for partial insert
@@ -238,8 +238,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
                 }
 
                 if (pattern.matchesMeasurement(device, measurement)) {
-                  matchedExtractors.add(extractor);
-                  // There would be no more matched extractors because the 
measurements are
+                  matchedSources.add(source);
+                  // There would be no more matched sources because the 
measurements are
                   // unique
                   break;
                 }
@@ -249,69 +249,69 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     }
   }
 
-  protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDevice(final 
IDeviceID device) {
-    final Set<PipeRealtimeDataRegionSource> filteredExtractors = new 
HashSet<>();
+  protected Set<PipeRealtimeDataRegionSource> filterSourcesByDevice(final 
IDeviceID device) {
+    final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
 
-    for (final PipeRealtimeDataRegionSource extractor : extractors) {
-      // Return if the extractor only extract deletion
-      if (!extractor.shouldExtractInsertion()) {
+    for (final PipeRealtimeDataRegionSource source : sources) {
+      // Return if the source only extract deletion
+      if (!source.shouldExtractInsertion()) {
         continue;
       }
 
-      final TreePattern treePattern = extractor.getTreePattern();
+      final TreePattern treePattern = source.getTreePattern();
       if (Objects.isNull(treePattern)
           || (treePattern.isTreeModelDataAllowedToBeCaptured()
               && treePattern.mayOverlapWithDevice(device))) {
-        filteredExtractors.add(extractor);
+        filteredSources.add(source);
       }
     }
 
-    return filteredExtractors;
+    return filteredSources;
   }
 
   protected void matchTableModelEvent(
       final String databaseName,
       final IDeviceID tableName,
-      final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
+      final Set<PipeRealtimeDataRegionSource> matchedSources) {
     // this would not happen
     if (databaseName == null) {
       LOGGER.warn(
-          "Database name is null when matching extractors for table model 
event.", new Exception());
+          "Database name is null when matching sources for table model 
event.", new Exception());
       return;
     }
 
-    final Set<PipeRealtimeDataRegionSource> 
extractorsFilteredByDatabaseAndTable =
-        databaseAndTableToExtractorsCache.get(
-            new Pair<>(databaseName, tableName), 
this::filterExtractorsByDatabaseAndTable);
+    final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDatabaseAndTable =
+        databaseAndTableToSourcesCache.get(
+            new Pair<>(databaseName, tableName), 
this::filterSourcesByDatabaseAndTable);
     // this would not happen
-    if (extractorsFilteredByDatabaseAndTable == null) {
+    if (sourcesFilteredByDatabaseAndTable == null) {
       LOGGER.warn(
-          "Extractors filtered by database and table is null when matching 
extractors for table model event.",
+          "Sources filtered by database and table is null when matching 
sources for table model event.",
           new Exception());
       return;
     }
-    matchedExtractors.addAll(extractorsFilteredByDatabaseAndTable);
+    matchedSources.addAll(sourcesFilteredByDatabaseAndTable);
   }
 
-  protected Set<PipeRealtimeDataRegionSource> 
filterExtractorsByDatabaseAndTable(
+  protected Set<PipeRealtimeDataRegionSource> filterSourcesByDatabaseAndTable(
       final Pair<String, IDeviceID> databaseNameAndTableName) {
-    final Set<PipeRealtimeDataRegionSource> filteredExtractors = new 
HashSet<>();
+    final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
 
-    for (final PipeRealtimeDataRegionSource extractor : extractors) {
-      // Return if the extractor only extract deletion
-      if (!extractor.shouldExtractInsertion()) {
+    for (final PipeRealtimeDataRegionSource source : sources) {
+      // Return if the source only extract deletion
+      if (!source.shouldExtractInsertion()) {
         continue;
       }
 
-      final TablePattern tablePattern = extractor.getTablePattern();
+      final TablePattern tablePattern = source.getTablePattern();
       if (matchesTablePattern(tablePattern, databaseNameAndTableName)
-          && (!extractor.isSkipIfNoPrivileges()
-              || notFilteredByAccess(extractor.getUserName(), 
databaseNameAndTableName))) {
-        filteredExtractors.add(extractor);
+          && (!source.isSkipIfNoPrivileges()
+              || notFilteredByAccess(source.getUserName(), 
databaseNameAndTableName))) {
+        filteredSources.add(source);
       }
     }
 
-    return filteredExtractors;
+    return filteredSources;
   }
 
   private boolean matchesTablePattern(
@@ -335,11 +335,11 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   public void clear() {
     lock.writeLock().lock();
     try {
-      extractors.clear();
-      deviceToExtractorsCache.invalidateAll();
-      deviceToExtractorsCache.cleanUp();
-      databaseAndTableToExtractorsCache.invalidateAll();
-      databaseAndTableToExtractorsCache.cleanUp();
+      sources.clear();
+      deviceToSourcesCache.invalidateAll();
+      deviceToSourcesCache.cleanUp();
+      databaseAndTableToSourcesCache.invalidateAll();
+      databaseAndTableToSourcesCache.cleanUp();
     } finally {
       lock.writeLock().unlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
index 07293bc4ca3..a9458e45f77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
@@ -29,12 +29,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.Weigher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** This cache is for reducing duplicated DeviceId PartialPath initialization 
in write process. */
 public class DataNodeDevicePathCache {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeDevicePathCache.class);
 
   private static final DataNodeMemoryConfig memoryConfig =
       IoTDBDescriptor.getInstance().getMemoryConfig();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
index 8b7a29fbf3b..5e452708bca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
@@ -24,31 +24,34 @@ import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
 
+import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.file.metadata.IDeviceID;
 
 public class CompactionPathUtils {
 
   private CompactionPathUtils() {}
 
-  public static PartialPath getPath(IDeviceID device, String measurement)
+  public static MeasurementPath getPath(final IDeviceID device, final String 
measurement)
       throws IllegalPathException {
+    return getPath(device).concatAsMeasurementPath(measurement);
+  }
+
+  public static PartialPath getPath(final IDeviceID device) throws 
IllegalPathException {
     if (device.isTableModel()) {
-      String[] tableNameSegments =
+      final String[] tableNameSegments =
           
DataNodeDevicePathCache.getInstance().getPartialPath(device.getTableName()).getNodes();
-      String[] nodes = new String[device.segmentNum() + 
tableNameSegments.length];
+      final String[] nodes = new String[device.segmentNum() + 
tableNameSegments.length - 1];
       System.arraycopy(tableNameSegments, 0, nodes, 0, 
tableNameSegments.length);
       for (int i = 0; i < device.segmentNum() - 1; i++) {
         nodes[i + tableNameSegments.length] =
             device.segment(i + 1) == null ? null : device.segment(i + 
1).toString();
       }
-      nodes[device.segmentNum() + tableNameSegments.length - 1] = measurement;
-      MeasurementPath path = new MeasurementPath(nodes);
-      path.setDevice(device);
-      return path;
+      return new PartialPath(nodes);
     } else {
-      return DataNodeDevicePathCache.getInstance()
-          .getPartialPath(device.toString())
-          .concatAsMeasurementPath(measurement);
+      final String deviceId = device.toString();
+      return deviceId.contains(TsFileConstant.BACK_QUOTE_STRING)
+          ? DataNodeDevicePathCache.getInstance().getPartialPath(deviceId)
+          : new 
PartialPath(deviceId.split(TsFileConstant.PATH_SEPARATER_NO_REGEX));
     }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b7b8bade89d..989d663e0ba 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -255,7 +255,7 @@ public class CommonConfig {
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * 
KB;
-  private long pipeExtractorMatcherCacheSize = 1024;
+  private long pipeSourceMatcherCacheSize = 1024;
 
   private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
   private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
@@ -988,16 +988,16 @@ public class CommonConfig {
         pipeExtractorAssignerDisruptorRingBufferEntrySize);
   }
 
-  public long getPipeExtractorMatcherCacheSize() {
-    return pipeExtractorMatcherCacheSize;
+  public long getPipeSourceMatcherCacheSize() {
+    return pipeSourceMatcherCacheSize;
   }
 
-  public void setPipeExtractorMatcherCacheSize(long 
pipeExtractorMatcherCacheSize) {
-    if (this.pipeExtractorMatcherCacheSize == pipeExtractorMatcherCacheSize) {
+  public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) {
+    if (this.pipeSourceMatcherCacheSize == pipeSourceMatcherCacheSize) {
       return;
     }
-    this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
-    logger.info("pipeExtractorMatcherCacheSize is set to {}.", 
pipeExtractorMatcherCacheSize);
+    this.pipeSourceMatcherCacheSize = pipeSourceMatcherCacheSize;
+    logger.info("pipeSourceMatcherCacheSize is set to {}.", 
pipeSourceMatcherCacheSize);
   }
 
   public int getPipeConnectorHandshakeTimeoutMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2b47d4b525d..03aa5342753 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -159,8 +159,8 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
   }
 
-  public long getPipeExtractorMatcherCacheSize() {
-    return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
+  public long getPipeSourceMatcherCacheSize() {
+    return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
   }
 
   /////////////////////////////// Connector ///////////////////////////////
@@ -474,7 +474,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
         getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
-    LOGGER.info("PipeExtractorMatcherCacheSize: {}", 
getPipeExtractorMatcherCacheSize());
+    LOGGER.info("PipeExtractorMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
     LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeConnectorHandshakeTimeoutMs());
     LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 45a3678ec9f..970d920313d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -322,13 +322,13 @@ public class PipeDescriptor {
                             config
                                 
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
 
-    config.setPipeExtractorMatcherCacheSize(
+    config.setPipeSourceMatcherCacheSize(
         Integer.parseInt(
-            
Optional.ofNullable(properties.getProperty("pipe_extractor_matcher_cache_size"))
+            
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
                 .orElse(
                     properties.getProperty(
                         "pipe_extractor_matcher_cache_size",
-                        
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));
+                        
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
 
     config.setPipeConnectorHandshakeTimeoutMs(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
index 10557f02ae8..9a1d817dc60 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
@@ -40,6 +40,8 @@ import java.util.stream.Collectors;
 public class IoTDBTreePattern extends TreePattern {
 
   private final PartialPath patternPartialPath;
+  private static volatile DevicePathGetter devicePathGetter = PartialPath::new;
+  private static volatile MeasurementPathGetter measurementPathGetter = 
MeasurementPath::new;
 
   public IoTDBTreePattern(final boolean isTreeModelDataAllowedToBeCaptured, 
final String pattern) {
     super(isTreeModelDataAllowedToBeCaptured, pattern);
@@ -115,7 +117,7 @@ public class IoTDBTreePattern extends TreePattern {
     try {
       // Another way is to use patternPath.overlapWith("device.*"),
       // there will be no false positives but time cost may be higher.
-      return patternPartialPath.matchPrefixPath(new PartialPath(device));
+      return 
patternPartialPath.matchPrefixPath(devicePathGetter.apply(device));
     } catch (final IllegalPathException e) {
       return false;
     }
@@ -129,7 +131,7 @@ public class IoTDBTreePattern extends TreePattern {
     }
 
     try {
-      return patternPartialPath.matchFullPath(new MeasurementPath(device, 
measurement));
+      return 
patternPartialPath.matchFullPath(measurementPathGetter.apply(device, 
measurement));
     } catch (final IllegalPathException e) {
       return false;
     }
@@ -208,8 +210,25 @@ public class IoTDBTreePattern extends TreePattern {
     return PathPatternUtil.hasWildcard(patternPartialPath.getTailNode());
   }
 
+  public static void setDevicePathGetter(final DevicePathGetter 
devicePathGetter) {
+    IoTDBTreePattern.devicePathGetter = devicePathGetter;
+  }
+
+  public static void setMeasurementPathGetter(final MeasurementPathGetter 
measurementPathGetter) {
+    IoTDBTreePattern.measurementPathGetter = measurementPathGetter;
+  }
+
   @Override
   public String toString() {
     return "IoTDBPipePattern" + super.toString();
   }
+
+  public interface DevicePathGetter {
+    PartialPath apply(final IDeviceID deviceId) throws IllegalPathException;
+  }
+
+  public interface MeasurementPathGetter {
+    MeasurementPath apply(final IDeviceID deviceId, final String measurement)
+        throws IllegalPathException;
+  }
 }

Reply via email to