This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push: new 3446a313388 Pipe: Optimized the path construction efficiency in pattern match (#16265) 3446a313388 is described below commit 3446a313388c6075e58ce5eb3c58d3fd66e480cd Author: Caideyipi <87789683+caidey...@users.noreply.github.com> AuthorDate: Tue Aug 26 16:41:54 2025 +0800 Pipe: Optimized the path construction efficiency in pattern match (#16265) * refactpr * fix-optimize * optimie * fix --- .../agent/runtime/PipeDataNodeRuntimeAgent.java | 7 +- .../matcher/CachedSchemaPatternMatcher.java | 180 ++++++++++----------- .../cache/schema/DataNodeDevicePathCache.java | 3 - .../execute/utils/CompactionPathUtils.java | 23 +-- .../apache/iotdb/commons/conf/CommonConfig.java | 14 +- .../iotdb/commons/pipe/config/PipeConfig.java | 6 +- .../iotdb/commons/pipe/config/PipeDescriptor.java | 6 +- .../datastructure/pattern/IoTDBTreePattern.java | 23 ++- 8 files changed, 143 insertions(+), 119 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index 593ba6a297b..67621e3033a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; @@ -39,6 +40,7 @@ import org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStar import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.service.ResourcesInformationHolder; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.slf4j.Logger; @@ -69,7 +71,7 @@ public class PipeDataNodeRuntimeAgent implements IService { //////////////////////////// System Service Interface //////////////////////////// public synchronized void preparePipeResources( - ResourcesInformationHolder resourcesInformationHolder) throws StartupException { + final ResourcesInformationHolder resourcesInformationHolder) throws StartupException { // Clean sender (connector) hardlink file dir and snapshot dir PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean(); @@ -78,6 +80,9 @@ public class PipeDataNodeRuntimeAgent implements IService { PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder); simpleProgressIndexAssigner.start(); + + IoTDBTreePattern.setDevicePathGetter(CompactionPathUtils::getPath); + IoTDBTreePattern.setMeasurementPathGetter(CompactionPathUtils::getPath); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java index 884bad39bb5..4411432e1da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java @@ -58,47 +58,47 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { protected final ReentrantReadWriteLock lock; private final AccessControl accessControl = Coordinator.getInstance().getAccessControl(); - protected final Set<PipeRealtimeDataRegionSource> extractors; + protected final Set<PipeRealtimeDataRegionSource> sources; - protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>> deviceToExtractorsCache; + protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>> deviceToSourcesCache; protected final Cache<Pair<String, IDeviceID>, Set<PipeRealtimeDataRegionSource>> - databaseAndTableToExtractorsCache; + databaseAndTableToSourcesCache; public CachedSchemaPatternMatcher() { this.lock = new ReentrantReadWriteLock(); - // Should be thread-safe because the extractors will be returned by {@link #match} and - // iterated by {@link #assignToExtractor}, at the same time the extractors may be added or + // Should be thread-safe because the sources will be returned by {@link #match} and + // iterated by {@link #assignToSource}, at the same time the sources may be added or // removed by {@link #register} and {@link #deregister}. - this.extractors = new CopyOnWriteArraySet<>(); - this.deviceToExtractorsCache = + this.sources = new CopyOnWriteArraySet<>(); + this.deviceToSourcesCache = Caffeine.newBuilder() - .maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize()) + .maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize()) .build(); - this.databaseAndTableToExtractorsCache = + this.databaseAndTableToSourcesCache = Caffeine.newBuilder() - .maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize()) + .maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize()) .build(); } @Override - public void register(final PipeRealtimeDataRegionSource extractor) { + public void register(final PipeRealtimeDataRegionSource source) { lock.writeLock().lock(); try { - extractors.add(extractor); - deviceToExtractorsCache.invalidateAll(); - databaseAndTableToExtractorsCache.invalidateAll(); + sources.add(source); + deviceToSourcesCache.invalidateAll(); + databaseAndTableToSourcesCache.invalidateAll(); } finally { lock.writeLock().unlock(); } } @Override - public void deregister(final PipeRealtimeDataRegionSource extractor) { + public void deregister(final PipeRealtimeDataRegionSource source) { lock.writeLock().lock(); try { - extractors.remove(extractor); - deviceToExtractorsCache.invalidateAll(); - databaseAndTableToExtractorsCache.invalidateAll(); + sources.remove(source); + deviceToSourcesCache.invalidateAll(); + databaseAndTableToSourcesCache.invalidateAll(); } finally { lock.writeLock().unlock(); } @@ -109,7 +109,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { lock.writeLock().lock(); try { // Will invalidate device cache - databaseAndTableToExtractorsCache.invalidateAll(); + databaseAndTableToSourcesCache.invalidateAll(); } finally { lock.writeLock().unlock(); } @@ -119,7 +119,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { public int getRegisterCount() { lock.readLock().lock(); try { - return extractors.size(); + return sources.size(); } finally { lock.readLock().unlock(); } @@ -128,26 +128,26 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { @Override public Pair<Set<PipeRealtimeDataRegionSource>, Set<PipeRealtimeDataRegionSource>> match( final PipeRealtimeEvent event) { - final Set<PipeRealtimeDataRegionSource> matchedExtractors = new HashSet<>(); + final Set<PipeRealtimeDataRegionSource> matchedSources = new HashSet<>(); lock.readLock().lock(); try { - if (extractors.isEmpty()) { - return new Pair<>(matchedExtractors, extractors); + if (sources.isEmpty()) { + return new Pair<>(matchedSources, sources); } - // HeartbeatEvent will be assigned to all extractors + // HeartbeatEvent will be assigned to all sources if (event.getEvent() instanceof PipeHeartbeatEvent) { - return new Pair<>(extractors, Collections.EMPTY_SET); + return new Pair<>(sources, Collections.EMPTY_SET); } // TODO: consider table pattern? - // Deletion event will be assigned to extractors listened to it + // Deletion event will be assigned to sources listened to it if (event.getEvent() instanceof PipeDeleteDataNodeEvent) { - extractors.stream() + sources.stream() .filter(PipeRealtimeDataRegionSource::shouldExtractDeletion) - .forEach(matchedExtractors::add); - return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors)); + .forEach(matchedSources::add); + return new Pair<>(matchedSources, findUnmatchedSources(matchedSources)); } for (final Map.Entry<IDeviceID, String[]> entry : event.getSchemaInfo().entrySet()) { @@ -158,7 +158,7 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { || deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX) || deviceID.getTableName().equals(PATH_ROOT)) { event.markAsTreeModelEvent(); - matchTreeModelEvent(deviceID, entry.getValue(), matchedExtractors); + matchTreeModelEvent(deviceID, entry.getValue(), matchedSources); } else { event.markAsTableModelEvent(); matchTableModelEvent( @@ -166,70 +166,70 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { ? ((PipeInsertionEvent) event.getEvent()).getTableModelDatabaseName() : null, deviceID, - matchedExtractors); + matchedSources); } - if (matchedExtractors.size() == extractors.size()) { + if (matchedSources.size() == sources.size()) { break; } } - return new Pair<>(matchedExtractors, findUnmatchedExtractors(matchedExtractors)); + return new Pair<>(matchedSources, findUnmatchedSources(matchedSources)); } finally { lock.readLock().unlock(); } } - private Set<PipeRealtimeDataRegionSource> findUnmatchedExtractors( - final Set<PipeRealtimeDataRegionSource> matchedExtractors) { - final Set<PipeRealtimeDataRegionSource> unmatchedExtractors = new HashSet<>(); - for (final PipeRealtimeDataRegionSource extractor : extractors) { - if (!matchedExtractors.contains(extractor)) { - unmatchedExtractors.add(extractor); + private Set<PipeRealtimeDataRegionSource> findUnmatchedSources( + final Set<PipeRealtimeDataRegionSource> matchedSources) { + final Set<PipeRealtimeDataRegionSource> unmatchedSources = new HashSet<>(); + for (final PipeRealtimeDataRegionSource source : sources) { + if (!matchedSources.contains(source)) { + unmatchedSources.add(source); } } - return unmatchedExtractors; + return unmatchedSources; } protected void matchTreeModelEvent( final IDeviceID device, final String[] measurements, - final Set<PipeRealtimeDataRegionSource> matchedExtractors) { - // 1. try to get matched extractors from cache, if not success, match them by device - final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDevice = - deviceToExtractorsCache.get(device, this::filterExtractorsByDevice); + final Set<PipeRealtimeDataRegionSource> matchedSources) { + // 1. try to get matched sources from cache, if not success, match them by device + final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice = + deviceToSourcesCache.get(device, this::filterSourcesByDevice); // this would not happen - if (extractorsFilteredByDevice == null) { + if (sourcesFilteredByDevice == null) { LOGGER.warn( - "Extractors filtered by device is null when matching extractors for tree model event.", + "Sources filtered by device is null when matching sources for tree model event.", new Exception()); return; } - // 2. filter matched candidate extractors by measurements + // 2. filter matched candidate sources by measurements if (measurements.length == 0) { - // `measurements` is empty (only in case of tsfile event). match all extractors. + // `measurements` is empty (only in case of tsfile event). match all sources. // // case 1: the pattern can match all measurements of the device. - // in this case, the extractor can be matched without checking the measurements. + // in this case, the source can be matched without checking the measurements. // // case 2: the pattern may match some measurements of the device. // in this case, we can't get all measurements efficiently here, - // so we just ASSUME the extractor matches and do more checks later. - matchedExtractors.addAll(extractorsFilteredByDevice); + // so we just ASSUME the source matches and do more checks later. + matchedSources.addAll(sourcesFilteredByDevice); } else { // `measurements` is not empty (only in case of tablet event). - // Match extractors by measurements. - extractorsFilteredByDevice.forEach( - extractor -> { - if (matchedExtractors.size() == extractors.size()) { + // Match sources by measurements. + sourcesFilteredByDevice.forEach( + source -> { + if (matchedSources.size() == sources.size()) { return; } - final TreePattern pattern = extractor.getTreePattern(); + final TreePattern pattern = source.getTreePattern(); if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(device)) { // The pattern can match all measurements of the device. - matchedExtractors.add(extractor); + matchedSources.add(source); } else { for (final String measurement : measurements) { // Ignore null measurement for partial insert @@ -238,8 +238,8 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { } if (pattern.matchesMeasurement(device, measurement)) { - matchedExtractors.add(extractor); - // There would be no more matched extractors because the measurements are + matchedSources.add(source); + // There would be no more matched sources because the measurements are // unique break; } @@ -249,69 +249,69 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { } } - protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDevice(final IDeviceID device) { - final Set<PipeRealtimeDataRegionSource> filteredExtractors = new HashSet<>(); + protected Set<PipeRealtimeDataRegionSource> filterSourcesByDevice(final IDeviceID device) { + final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>(); - for (final PipeRealtimeDataRegionSource extractor : extractors) { - // Return if the extractor only extract deletion - if (!extractor.shouldExtractInsertion()) { + for (final PipeRealtimeDataRegionSource source : sources) { + // Return if the source only extract deletion + if (!source.shouldExtractInsertion()) { continue; } - final TreePattern treePattern = extractor.getTreePattern(); + final TreePattern treePattern = source.getTreePattern(); if (Objects.isNull(treePattern) || (treePattern.isTreeModelDataAllowedToBeCaptured() && treePattern.mayOverlapWithDevice(device))) { - filteredExtractors.add(extractor); + filteredSources.add(source); } } - return filteredExtractors; + return filteredSources; } protected void matchTableModelEvent( final String databaseName, final IDeviceID tableName, - final Set<PipeRealtimeDataRegionSource> matchedExtractors) { + final Set<PipeRealtimeDataRegionSource> matchedSources) { // this would not happen if (databaseName == null) { LOGGER.warn( - "Database name is null when matching extractors for table model event.", new Exception()); + "Database name is null when matching sources for table model event.", new Exception()); return; } - final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDatabaseAndTable = - databaseAndTableToExtractorsCache.get( - new Pair<>(databaseName, tableName), this::filterExtractorsByDatabaseAndTable); + final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDatabaseAndTable = + databaseAndTableToSourcesCache.get( + new Pair<>(databaseName, tableName), this::filterSourcesByDatabaseAndTable); // this would not happen - if (extractorsFilteredByDatabaseAndTable == null) { + if (sourcesFilteredByDatabaseAndTable == null) { LOGGER.warn( - "Extractors filtered by database and table is null when matching extractors for table model event.", + "Sources filtered by database and table is null when matching sources for table model event.", new Exception()); return; } - matchedExtractors.addAll(extractorsFilteredByDatabaseAndTable); + matchedSources.addAll(sourcesFilteredByDatabaseAndTable); } - protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDatabaseAndTable( + protected Set<PipeRealtimeDataRegionSource> filterSourcesByDatabaseAndTable( final Pair<String, IDeviceID> databaseNameAndTableName) { - final Set<PipeRealtimeDataRegionSource> filteredExtractors = new HashSet<>(); + final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>(); - for (final PipeRealtimeDataRegionSource extractor : extractors) { - // Return if the extractor only extract deletion - if (!extractor.shouldExtractInsertion()) { + for (final PipeRealtimeDataRegionSource source : sources) { + // Return if the source only extract deletion + if (!source.shouldExtractInsertion()) { continue; } - final TablePattern tablePattern = extractor.getTablePattern(); + final TablePattern tablePattern = source.getTablePattern(); if (matchesTablePattern(tablePattern, databaseNameAndTableName) - && (!extractor.isSkipIfNoPrivileges() - || notFilteredByAccess(extractor.getUserName(), databaseNameAndTableName))) { - filteredExtractors.add(extractor); + && (!source.isSkipIfNoPrivileges() + || notFilteredByAccess(source.getUserName(), databaseNameAndTableName))) { + filteredSources.add(source); } } - return filteredExtractors; + return filteredSources; } private boolean matchesTablePattern( @@ -335,11 +335,11 @@ public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher { public void clear() { lock.writeLock().lock(); try { - extractors.clear(); - deviceToExtractorsCache.invalidateAll(); - deviceToExtractorsCache.cleanUp(); - databaseAndTableToExtractorsCache.invalidateAll(); - databaseAndTableToExtractorsCache.cleanUp(); + sources.clear(); + deviceToSourcesCache.invalidateAll(); + deviceToSourcesCache.cleanUp(); + databaseAndTableToSourcesCache.invalidateAll(); + databaseAndTableToSourcesCache.cleanUp(); } finally { lock.writeLock().unlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java index 07293bc4ca3..a9458e45f77 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java @@ -29,12 +29,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Weigher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** This cache is for reducing duplicated DeviceId PartialPath initialization in write process. */ public class DataNodeDevicePathCache { - private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeDevicePathCache.class); private static final DataNodeMemoryConfig memoryConfig = IoTDBDescriptor.getInstance().getMemoryConfig(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java index 8b7a29fbf3b..5e452708bca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java @@ -24,31 +24,34 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; +import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.file.metadata.IDeviceID; public class CompactionPathUtils { private CompactionPathUtils() {} - public static PartialPath getPath(IDeviceID device, String measurement) + public static MeasurementPath getPath(final IDeviceID device, final String measurement) throws IllegalPathException { + return getPath(device).concatAsMeasurementPath(measurement); + } + + public static PartialPath getPath(final IDeviceID device) throws IllegalPathException { if (device.isTableModel()) { - String[] tableNameSegments = + final String[] tableNameSegments = DataNodeDevicePathCache.getInstance().getPartialPath(device.getTableName()).getNodes(); - String[] nodes = new String[device.segmentNum() + tableNameSegments.length]; + final String[] nodes = new String[device.segmentNum() + tableNameSegments.length - 1]; System.arraycopy(tableNameSegments, 0, nodes, 0, tableNameSegments.length); for (int i = 0; i < device.segmentNum() - 1; i++) { nodes[i + tableNameSegments.length] = device.segment(i + 1) == null ? null : device.segment(i + 1).toString(); } - nodes[device.segmentNum() + tableNameSegments.length - 1] = measurement; - MeasurementPath path = new MeasurementPath(nodes); - path.setDevice(device); - return path; + return new PartialPath(nodes); } else { - return DataNodeDevicePathCache.getInstance() - .getPartialPath(device.toString()) - .concatAsMeasurementPath(measurement); + final String deviceId = device.toString(); + return deviceId.contains(TsFileConstant.BACK_QUOTE_STRING) + ? DataNodeDevicePathCache.getInstance().getPartialPath(deviceId) + : new PartialPath(deviceId.split(TsFileConstant.PATH_SEPARATER_NO_REGEX)); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index b7b8bade89d..989d663e0ba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -255,7 +255,7 @@ public class CommonConfig { private int pipeExtractorAssignerDisruptorRingBufferSize = 128; private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB; - private long pipeExtractorMatcherCacheSize = 1024; + private long pipeSourceMatcherCacheSize = 1024; private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes @@ -988,16 +988,16 @@ public class CommonConfig { pipeExtractorAssignerDisruptorRingBufferEntrySize); } - public long getPipeExtractorMatcherCacheSize() { - return pipeExtractorMatcherCacheSize; + public long getPipeSourceMatcherCacheSize() { + return pipeSourceMatcherCacheSize; } - public void setPipeExtractorMatcherCacheSize(long pipeExtractorMatcherCacheSize) { - if (this.pipeExtractorMatcherCacheSize == pipeExtractorMatcherCacheSize) { + public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) { + if (this.pipeSourceMatcherCacheSize == pipeSourceMatcherCacheSize) { return; } - this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize; - logger.info("pipeExtractorMatcherCacheSize is set to {}.", pipeExtractorMatcherCacheSize); + this.pipeSourceMatcherCacheSize = pipeSourceMatcherCacheSize; + logger.info("pipeSourceMatcherCacheSize is set to {}.", pipeSourceMatcherCacheSize); } public int getPipeConnectorHandshakeTimeoutMs() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 2b47d4b525d..03aa5342753 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -159,8 +159,8 @@ public class PipeConfig { return COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(); } - public long getPipeExtractorMatcherCacheSize() { - return COMMON_CONFIG.getPipeExtractorMatcherCacheSize(); + public long getPipeSourceMatcherCacheSize() { + return COMMON_CONFIG.getPipeSourceMatcherCacheSize(); } /////////////////////////////// Connector /////////////////////////////// @@ -474,7 +474,7 @@ public class PipeConfig { LOGGER.info( "PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}", getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()); - LOGGER.info("PipeExtractorMatcherCacheSize: {}", getPipeExtractorMatcherCacheSize()); + LOGGER.info("PipeExtractorMatcherCacheSize: {}", getPipeSourceMatcherCacheSize()); LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", getPipeConnectorHandshakeTimeoutMs()); LOGGER.info("PipeConnectorTransferTimeoutMs: {}", getPipeConnectorTransferTimeoutMs()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 45a3678ec9f..970d920313d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -322,13 +322,13 @@ public class PipeDescriptor { config .getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()))))); - config.setPipeExtractorMatcherCacheSize( + config.setPipeSourceMatcherCacheSize( Integer.parseInt( - Optional.ofNullable(properties.getProperty("pipe_extractor_matcher_cache_size")) + Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size")) .orElse( properties.getProperty( "pipe_extractor_matcher_cache_size", - String.valueOf(config.getPipeExtractorMatcherCacheSize()))))); + String.valueOf(config.getPipeSourceMatcherCacheSize()))))); config.setPipeConnectorHandshakeTimeoutMs( Long.parseLong( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java index 10557f02ae8..9a1d817dc60 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java @@ -40,6 +40,8 @@ import java.util.stream.Collectors; public class IoTDBTreePattern extends TreePattern { private final PartialPath patternPartialPath; + private static volatile DevicePathGetter devicePathGetter = PartialPath::new; + private static volatile MeasurementPathGetter measurementPathGetter = MeasurementPath::new; public IoTDBTreePattern(final boolean isTreeModelDataAllowedToBeCaptured, final String pattern) { super(isTreeModelDataAllowedToBeCaptured, pattern); @@ -115,7 +117,7 @@ public class IoTDBTreePattern extends TreePattern { try { // Another way is to use patternPath.overlapWith("device.*"), // there will be no false positives but time cost may be higher. - return patternPartialPath.matchPrefixPath(new PartialPath(device)); + return patternPartialPath.matchPrefixPath(devicePathGetter.apply(device)); } catch (final IllegalPathException e) { return false; } @@ -129,7 +131,7 @@ public class IoTDBTreePattern extends TreePattern { } try { - return patternPartialPath.matchFullPath(new MeasurementPath(device, measurement)); + return patternPartialPath.matchFullPath(measurementPathGetter.apply(device, measurement)); } catch (final IllegalPathException e) { return false; } @@ -208,8 +210,25 @@ public class IoTDBTreePattern extends TreePattern { return PathPatternUtil.hasWildcard(patternPartialPath.getTailNode()); } + public static void setDevicePathGetter(final DevicePathGetter devicePathGetter) { + IoTDBTreePattern.devicePathGetter = devicePathGetter; + } + + public static void setMeasurementPathGetter(final MeasurementPathGetter measurementPathGetter) { + IoTDBTreePattern.measurementPathGetter = measurementPathGetter; + } + @Override public String toString() { return "IoTDBPipePattern" + super.toString(); } + + public interface DevicePathGetter { + PartialPath apply(final IDeviceID deviceId) throws IllegalPathException; + } + + public interface MeasurementPathGetter { + MeasurementPath apply(final IDeviceID deviceId, final String measurement) + throws IllegalPathException; + } }