This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f00763dc5cd Pipe: support IoTDB-style pattern (#12085)
f00763dc5cd is described below
commit f00763dc5cd24fb835d827f61fb0b76514c0f7ac
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Mar 12 21:54:18 2024 +0800
Pipe: support IoTDB-style pattern (#12085)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/PipePatternFormatIT.java | 197 ++++++++
.../pipe/event/PipeConfigRegionSnapshotEvent.java | 9 +-
.../pipe/event/PipeConfigRegionWritePlanEvent.java | 9 +-
.../db/pipe/event/UserDefinedEnrichedEvent.java | 9 +-
.../event/common/heartbeat/PipeHeartbeatEvent.java | 7 +-
.../schema/PipeSchemaRegionSnapshotEvent.java | 9 +-
.../schema/PipeSchemaRegionWritePlanEvent.java | 9 +-
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 18 +-
.../common/tablet/PipeRawTabletInsertionEvent.java | 17 +-
.../tablet/TabletInsertionDataContainer.java | 28 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 11 +-
.../tsfile/TsFileInsertionDataContainer.java | 21 +-
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 11 +-
.../dataregion/IoTDBDataRegionExtractor.java | 68 ++-
.../PipeHistoricalDataRegionTsFileExtractor.java | 20 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 29 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 6 +-
.../realtime/epoch/TsFileEpochManager.java | 4 +-
.../matcher/CachedSchemaPatternMatcher.java | 67 +--
.../matcher/PipeDataRegionMatcher.java | 2 +-
.../pipe/event/PipeTabletInsertionEventTest.java | 29 +-
.../event/TsFileInsertionDataContainerTest.java | 493 +++++++++++++--------
.../CachedSchemaPatternMatcherTest.java | 20 +-
.../db/pipe/pattern/IoTDBPipePatternTest.java | 110 +++++
.../db/pipe/pattern/PrefixPipePatternTest.java | 105 +++++
.../config/constant/PipeExtractorConstant.java | 9 +-
.../iotdb/commons/pipe/event/EnrichedEvent.java | 40 +-
.../commons/pipe/event/PipeSnapshotEvent.java | 3 +-
.../commons/pipe/event/PipeWritePlanEvent.java | 3 +-
.../commons/pipe/pattern/IoTDBPipePattern.java | 113 +++++
.../iotdb/commons/pipe/pattern/PipePattern.java | 133 ++++++
.../commons/pipe/pattern/PrefixPipePattern.java | 121 +++++
32 files changed, 1341 insertions(+), 389 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipePatternFormatIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipePatternFormatIT.java
new file mode 100644
index 00000000000..e93b969d532
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipePatternFormatIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class PipePatternFormatIT extends AbstractPipeDualAutoIT {
+ @Test
+ public void testPrefixPattern() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
+ "insert into root.db.d2(time, s) values (1, 1)",
+ "insert into root.db2.d1(time, s) values (1, 1)"))) {
+ return;
+ }
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.db.d1.s");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,1.0,1.0,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "select * from root.**",
"Time,root.db.d1.s,root.db.d1.s1,", expectedResSet);
+ }
+ }
+
+ @Test
+ public void testIotdbPattern() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
+ "insert into root.db.d2(time, s) values (1, 1)",
+ "insert into root.db2.d1(time, s) values (1, 1)"))) {
+ return;
+ }
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.path", "root.**.d1.s*");
+ // When path is set, pattern should be ignored
+ extractorAttributes.put("extractor.pattern", "root");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,1.0,1.0,1.0,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
+ expectedResSet);
+ }
+ }
+
+ @Test
+ public void testIotdbPatternWithLegacySyntax() throws Exception {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ String receiverIp = receiverDataNode.getIp();
+ int receiverPort = receiverDataNode.getPort();
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
+ "insert into root.db.d2(time, s) values (1, 1)",
+ "insert into root.db2.d1(time, s) values (1, 1)"))) {
+ return;
+ }
+
+ Map<String, String> extractorAttributes = new HashMap<>();
+ Map<String, String> processorAttributes = new HashMap<>();
+ Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.**.d1.s*");
+ extractorAttributes.put("extractor.pattern.format", "iotdb");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,1.0,1.0,1.0,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
+ expectedResSet);
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index a13008b4caf..48fe334e6df 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.event;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -39,7 +40,7 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
}
public PipeConfigRegionSnapshotEvent(
- String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta, String
pattern) {
+ String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta,
PipePattern pattern) {
super(
snapshotPath,
pipeName,
@@ -50,7 +51,11 @@ public class PipeConfigRegionSnapshotEvent extends
PipeSnapshotEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeConfigRegionSnapshotEvent(snapshotPath, pipeName,
pipeTaskMeta, pattern);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
index 980bb5308c0..d94b58ea892 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.event;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -46,7 +47,7 @@ public class PipeConfigRegionWritePlanEvent extends
PipeWritePlanEvent {
ConfigPhysicalPlan configPhysicalPlan,
String pipeName,
PipeTaskMeta pipeTaskMeta,
- String pattern,
+ PipePattern pattern,
boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
this.configPhysicalPlan = configPhysicalPlan;
@@ -58,7 +59,11 @@ public class PipeConfigRegionWritePlanEvent extends
PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeConfigRegionWritePlanEvent(
configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index 2a2eb25721f..ff6e34fffcf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.UserDefinedEvent;
@@ -42,7 +43,7 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
super(
enrichedEvent.getPipeName(),
enrichedEvent.getPipeTaskMeta(),
- enrichedEvent.getPattern(),
+ enrichedEvent.getPipePattern(),
enrichedEvent.getStartTime(),
enrichedEvent.getEndTime());
this.userDefinedEvent = userDefinedEvent;
@@ -70,7 +71,11 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
pipeName, pipeTaskMeta, pattern, startTime, endTime);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index a5c0e267488..c675ff5542c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.heartbeat;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -107,7 +108,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
// Should record PipeTaskMeta, for sometimes HeartbeatEvents should report
exceptions.
// Here we ignore parameters `pattern`, `startTime`, and `endTime`.
return new PipeHeartbeatEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index b4a8b05b2e5..dc66d4f006f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.schema;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -38,13 +39,17 @@ public class PipeSchemaRegionSnapshotEvent extends
PipeSnapshotEvent {
}
public PipeSchemaRegionSnapshotEvent(
- String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta, String
pattern) {
+ String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta,
PipePattern pattern) {
super(snapshotPath, pipeName, pipeTaskMeta, pattern,
PipeResourceManager.snapshot());
}
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeSchemaRegionSnapshotEvent(snapshotPath, pipeName,
pipeTaskMeta, pattern);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
index dacfddfdda9..96892c7004b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.schema;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -45,7 +46,7 @@ public class PipeSchemaRegionWritePlanEvent extends
PipeWritePlanEvent {
PlanNode planNode,
String pipeName,
PipeTaskMeta pipeTaskMeta,
- String pattern,
+ PipePattern pattern,
boolean isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
this.planNode = planNode;
@@ -57,7 +58,11 @@ public class PipeSchemaRegionWritePlanEvent extends
PipeWritePlanEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeSchemaRegionWritePlanEvent(
planNode, pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 3a09e865b09..19368be2412 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -80,7 +81,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
boolean isGeneratedByPipe,
String pipeName,
PipeTaskMeta pipeTaskMeta,
- String pattern,
+ PipePattern pattern,
long startTime,
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
@@ -147,7 +148,11 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
@Override
public PipeInsertNodeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
progressIndex,
@@ -201,7 +206,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
try {
if (dataContainer == null) {
dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), getPattern());
+ new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), pipePattern);
}
return dataContainer.processRowByRow(consumer);
} catch (Exception e) {
@@ -214,7 +219,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
try {
if (dataContainer == null) {
dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), getPattern());
+ new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), pipePattern);
}
return dataContainer.processTablet(consumer);
} catch (Exception e) {
@@ -232,7 +237,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
try {
if (dataContainer == null) {
dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), getPattern());
+ new TabletInsertionDataContainer(pipeTaskMeta, this,
getInsertNode(), pipePattern);
}
return dataContainer.convertToTablet();
} catch (Exception e) {
@@ -246,7 +251,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
public boolean shouldParsePattern() {
final InsertNode node = getInsertNodeViaCacheIfPossible();
return super.shouldParsePattern()
- && (Objects.isNull(node) ||
!node.getDevicePath().getFullPath().startsWith(pattern));
+ && Objects.nonNull(pipePattern)
+ && (Objects.isNull(node) ||
!pipePattern.coversDevice(node.getDevicePath().getFullPath()));
}
public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 02386f1e375..beb6c3ea9e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -53,7 +54,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
boolean needToReport,
String pipeName,
PipeTaskMeta pipeTaskMeta,
- String pattern,
+ PipePattern pattern,
long startTime,
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
@@ -88,7 +89,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
@TestOnly
- public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, String
pattern) {
+ public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned,
PipePattern pattern) {
this(tablet, isAligned, null, false, null, null, pattern, Long.MIN_VALUE,
Long.MAX_VALUE);
}
@@ -127,7 +128,11 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeRawTabletInsertionEvent(
tablet,
isAligned,
@@ -169,7 +174,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row,
RowCollector> consumer) {
if (dataContainer == null) {
dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, getPattern());
+ new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, pipePattern);
}
return dataContainer.processRowByRow(consumer);
}
@@ -178,7 +183,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet,
RowCollector> consumer) {
if (dataContainer == null) {
dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, getPattern());
+ new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, pipePattern);
}
return dataContainer.processTablet(consumer);
}
@@ -197,7 +202,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
// if notNullPattern is not "root", we need to convert the tablet
if (dataContainer == null) {
dataContainer =
- new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, getPattern());
+ new TabletInsertionDataContainer(pipeTaskMeta, this, tablet,
isAligned, pipePattern);
}
return dataContainer.convertToTablet();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 9c57245cd1a..158b8b35401 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tablet;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
@@ -30,7 +31,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -86,7 +86,10 @@ public class TabletInsertionDataContainer {
}
public TabletInsertionDataContainer(
- PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, InsertNode
insertNode, String pattern) {
+ PipeTaskMeta pipeTaskMeta,
+ EnrichedEvent sourceEvent,
+ InsertNode insertNode,
+ PipePattern pattern) {
this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
@@ -105,7 +108,7 @@ public class TabletInsertionDataContainer {
EnrichedEvent sourceEvent,
Tablet tablet,
boolean isAligned,
- String pattern) {
+ PipePattern pattern) {
this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
@@ -113,7 +116,7 @@ public class TabletInsertionDataContainer {
}
@TestOnly
- public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
+ public TabletInsertionDataContainer(InsertNode insertNode, PipePattern
pattern) {
this(null, null, insertNode, pattern);
}
@@ -123,7 +126,7 @@ public class TabletInsertionDataContainer {
//////////////////////////// parse ////////////////////////////
- private void parse(InsertRowNode insertRowNode, String pattern) {
+ private void parse(InsertRowNode insertRowNode, PipePattern pattern) {
final int originColumnSize = insertRowNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
@@ -190,7 +193,7 @@ public class TabletInsertionDataContainer {
}
}
- private void parse(InsertTabletNode insertTabletNode, String pattern) {
+ private void parse(InsertTabletNode insertTabletNode, PipePattern pattern) {
final int originColumnSize = insertTabletNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
@@ -273,7 +276,7 @@ public class TabletInsertionDataContainer {
}
}
- private void parse(Tablet tablet, boolean isAligned, String pattern) {
+ private void parse(Tablet tablet, boolean isAligned, PipePattern pattern) {
final int originColumnSize = tablet.getSchemas().size();
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
@@ -366,13 +369,13 @@ public class TabletInsertionDataContainer {
private void generateColumnIndexMapper(
String[] originMeasurementList,
- String pattern,
+ PipePattern pattern,
Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
final int originColumnSize = originMeasurementList.length;
// case 1: for example, pattern is root.a.b or pattern is null and device
is root.a.b.c
// in this case, all data can be matched without checking the measurements
- if (pattern == null || pattern.length() <= deviceId.length() &&
deviceId.startsWith(pattern)) {
+ if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(deviceId)) {
for (int i = 0; i < originColumnSize; i++) {
originColumnIndex2FilteredColumnIndexMapperList[i] = i;
}
@@ -380,7 +383,7 @@ public class TabletInsertionDataContainer {
// case 2: for example, pattern is root.a.b.c and device is root.a.b
// in this case, we need to check the full path
- else if (pattern.length() > deviceId.length() &&
pattern.startsWith(deviceId)) {
+ else if (pattern.mayOverlapWithDevice(deviceId)) {
int filteredCount = 0;
for (int i = 0; i < originColumnSize; i++) {
@@ -391,10 +394,7 @@ public class TabletInsertionDataContainer {
continue;
}
- // low cost check comes first
- if (pattern.length() == deviceId.length() + measurement.length() + 1
- // high cost check comes later
- && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
+ if (pattern.matchesMeasurement(deviceId, measurement)) {
originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index dfa229bf108..110414a64ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -63,7 +64,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
boolean isGeneratedByPipe,
String pipeName,
PipeTaskMeta pipeTaskMeta,
- String pattern,
+ PipePattern pattern,
long startTime,
long endTime) {
super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
@@ -168,7 +169,11 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeTsFileInsertionEvent(
resource, isLoaded, isGeneratedByPipe, pipeName, pipeTaskMeta,
pattern, startTime, endTime);
}
@@ -221,7 +226,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
waitForTsFileClose();
dataContainer =
new TsFileInsertionDataContainer(
- tsFile, getPattern(), startTime, endTime, pipeTaskMeta, this);
+ tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
}
return dataContainer;
} catch (InterruptedException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index e91d9c4b264..0bb2b8be2be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -29,7 +30,6 @@ import
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
import org.apache.iotdb.tsfile.read.TsFileReader;
@@ -52,12 +52,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Objects;
public class TsFileInsertionDataContainer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
- private final String pattern; // used to filter data
+ private final PipePattern pattern; // used to filter data
private final IExpression timeFilterExpression; // used to filter data
private final PipeTaskMeta pipeTaskMeta; // used to report progress
@@ -74,14 +75,14 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
private boolean shouldParsePattern = false;
- public TsFileInsertionDataContainer(File tsFile, String pattern, long
startTime, long endTime)
- throws IOException {
+ public TsFileInsertionDataContainer(
+ File tsFile, PipePattern pattern, long startTime, long endTime) throws
IOException {
this(tsFile, pattern, startTime, endTime, null, null);
}
public TsFileInsertionDataContainer(
File tsFile,
- String pattern,
+ PipePattern pattern,
long startTime,
long endTime,
PipeTaskMeta pipeTaskMeta,
@@ -145,8 +146,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
// case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
// in this case, all data can be matched without checking the
measurements
- if (pattern == null
- || pattern.length() <= deviceId.length() &&
deviceId.startsWith(pattern)) {
+ if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
@@ -154,14 +154,11 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
// case 2: for example, pattern is root.a.b.c and device is root.a.b
// in this case, we need to check the full path
- else if (pattern.length() > deviceId.length() &&
pattern.startsWith(deviceId)) {
+ else if (pattern.mayOverlapWithDevice(deviceId)) {
final List<String> filteredMeasurements = new ArrayList<>();
for (final String measurement : entry.getValue()) {
- // low cost check comes first
- if (pattern.length() == deviceId.length() + measurement.length() + 1
- // high cost check comes later
- && pattern.endsWith(TsFileConstant.PATH_SEPARATOR +
measurement)) {
+ if (pattern.matchesMeasurement(deviceId, measurement)) {
filteredMeasurements.add(measurement);
} else {
// Parse pattern iff there are measurements filtered out
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index e693f6dae4b..f291afdefb9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.realtime;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
@@ -42,7 +43,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
EnrichedEvent event,
TsFileEpoch tsFileEpoch,
Map<String, String[]> device2Measurements,
- String pattern) {
+ PipePattern pattern) {
this(event, tsFileEpoch, device2Measurements, null, pattern,
Long.MIN_VALUE, Long.MAX_VALUE);
}
@@ -51,7 +52,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
TsFileEpoch tsFileEpoch,
Map<String, String[]> device2Measurements,
PipeTaskMeta pipeTaskMeta,
- String pattern,
+ PipePattern pattern,
long startTime,
long endTime) {
// pipeTaskMeta is used to report the progress of the event, the
PipeRealtimeEvent
@@ -137,7 +138,11 @@ public class PipeRealtimeEvent extends EnrichedEvent {
@Override
public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime) {
return new PipeRealtimeEvent(
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
pipeName, pipeTaskMeta, pattern, startTime, endTime),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index e4a3edb4ef6..68023a73cdf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -20,9 +20,8 @@
package org.apache.iotdb.db.pipe.extractor.dataregion;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor;
-import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
@@ -42,7 +41,6 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +52,9 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
@@ -69,7 +68,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
@@ -96,13 +95,25 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
}
hasNoExtractionNeed = false;
+ // Validate extractor.pattern.format is within valid range
+ validator
+ .validateAttributeValueRange(
+ EXTRACTOR_PATTERN_FORMAT_KEY,
+ true,
+ EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+ EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE)
+ .validateAttributeValueRange(
+ SOURCE_PATTERN_FORMAT_KEY,
+ true,
+ EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+ EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
+
+ // Get the pattern format to check whether the pattern is legal
+ final PipePattern pattern =
+
PipePattern.parsePipePatternFromSourceParameters(validator.getParameters());
+
// Check whether the pattern is legal
- validatePattern(
- validator
- .getParameters()
- .getStringOrDefault(
- Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
- EXTRACTOR_PATTERN_DEFAULT_VALUE));
+ validatePattern(pattern);
// Validate extractor.history.enable and extractor.realtime.enable
validator
@@ -174,36 +185,9 @@ public class IoTDBDataRegionExtractor extends
IoTDBExtractor {
realtimeExtractor.validate(validator);
}
- private void validatePattern(String pattern) {
- if (!pattern.startsWith("root")) {
- throw new IllegalArgumentException(
- "The argument `extractor.pattern` or `source.pattern` is an illegal
path.");
- }
-
- try {
- PathUtils.isLegalPath(pattern);
- } catch (IllegalPathException e) {
- try {
- if ("root".equals(pattern) || "root.".equals(pattern)) {
- return;
- }
-
- // Split the pattern to nodes.
- String[] pathNodes = StringUtils.splitPreserveAllTokens(pattern,
"\\.");
-
- // Check whether the pattern without last node is legal.
- PathUtils.splitPathToDetachedNodes(
- String.join(".", Arrays.copyOfRange(pathNodes, 0, pathNodes.length
- 1)));
- String lastNode = pathNodes[pathNodes.length - 1];
-
- // Check whether the last node is legal.
- if (!"".equals(lastNode)) {
- Double.parseDouble(lastNode);
- }
- } catch (Exception ignored) {
- throw new IllegalArgumentException(
- "The argument `extractor.pattern` or `source.pattern` is an
illegal path.");
- }
+ private void validatePattern(PipePattern pattern) {
+ if (!pattern.isLegal()) {
+ throw new IllegalArgumentException(String.format("Pattern \"%s\" is
illegal.", pattern));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 9133b38caca..f5efe198ecf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
@@ -60,14 +61,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
public class PipeHistoricalDataRegionTsFileExtractor implements
PipeHistoricalDataRegionExtractor {
@@ -84,7 +82,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private int dataRegionId;
- private String pattern;
+ private PipePattern pipePattern;
private boolean isDbNameCoveredByPattern = false;
private boolean isHistoricalExtractorEnabled = false;
@@ -197,18 +195,14 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
}
- pattern =
- parameters.getStringOrDefault(
- Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
- EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
+
final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new
DataRegionId(environment.getRegionId()));
if (Objects.nonNull(dataRegion)) {
final String databaseName = dataRegion.getDatabaseName();
- if (Objects.nonNull(databaseName)
- && pattern.length() <= databaseName.length()
- && databaseName.startsWith(pattern)) {
- isDbNameCoveredByPattern = true;
+ if (Objects.nonNull(databaseName)) {
+ isDbNameCoveredByPattern = pipePattern.coversDb(databaseName);
}
}
@@ -458,7 +452,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
false,
pipeName,
pipeTaskMeta,
- pattern,
+ pipePattern,
historicalDataExtractionStartTime,
historicalDataExtractionEndTime);
if (isDbNameCoveredByPattern) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index f564f737539..85ac5dd2f54 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -56,9 +57,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor
{
@@ -73,7 +72,7 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
protected boolean shouldExtractInsertion;
protected boolean shouldExtractDeletion;
- protected String pattern;
+ protected PipePattern pipePattern;
private boolean isDbNameCoveredByPattern = false;
protected long realtimeDataExtractionStartTime = Long.MIN_VALUE; // Event
time
@@ -153,18 +152,14 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
long creationTime = environment.getCreationTime();
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
- pattern =
- parameters.getStringOrDefault(
- Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
- PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
+
final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new
DataRegionId(environment.getRegionId()));
if (dataRegion != null) {
final String databaseName = dataRegion.getDatabaseName();
- if (databaseName != null
- && pattern.length() <= databaseName.length()
- && databaseName.startsWith(pattern)) {
- isDbNameCoveredByPattern = true;
+ if (databaseName != null) {
+ isDbNameCoveredByPattern = pipePattern.coversDb(databaseName);
}
}
@@ -365,8 +360,12 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
return shouldExtractDeletion;
}
- public final String getPattern() {
- return pattern;
+ public final String getPatternString() {
+ return pipePattern != null ? pipePattern.getPattern() : null;
+ }
+
+ public final PipePattern getPipePattern() {
+ return pipePattern;
}
public final long getRealtimeDataExtractionStartTime() {
@@ -403,8 +402,8 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
@Override
public String toString() {
return "PipeRealtimeDataRegionExtractor{"
- + "pattern='"
- + pattern
+ + "pipePattern='"
+ + pipePattern
+ '\''
+ ", dataRegionId='"
+ dataRegionId
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index d12f265fa26..729546b0286 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
+import org.apache.iotdb.db.pipe.pattern.matcher.CachedSchemaPatternMatcher;
+import org.apache.iotdb.db.pipe.pattern.matcher.PipeDataRegionMatcher;
import java.io.Closeable;
@@ -76,7 +76,7 @@ public class PipeDataRegionAssigner implements Closeable {
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
extractor.getPipeName(),
extractor.getPipeTaskMeta(),
- extractor.getPattern(),
+ extractor.getPipePattern(),
extractor.getRealtimeDataExtractionStartTime(),
extractor.getRealtimeDataExtractionEndTime());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
index 4eba880cd36..9c841ceccb9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -64,7 +64,7 @@ public class TsFileEpochManager {
epoch,
resource.getDevices().stream()
.collect(Collectors.toMap(device -> device, device ->
EMPTY_MEASUREMENT_ARRAY)),
- event.getPattern());
+ event.getPipePattern());
}
public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
@@ -76,6 +76,6 @@ public class TsFileEpochManager {
event,
epoch,
Collections.singletonMap(node.getDevicePath().getFullPath(),
node.getMeasurements()),
- event.getPattern());
+ event.getPipePattern());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
similarity index 66%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
index 7643c6340f0..470c7ba110f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher;
+package org.apache.iotdb.db.pipe.pattern.matcher;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -40,12 +41,12 @@ import java.util.stream.Collectors;
public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
- private static final Logger LOGGER =
LoggerFactory.getLogger(CachedSchemaPatternMatcher.class);
+ protected static final Logger LOGGER =
LoggerFactory.getLogger(CachedSchemaPatternMatcher.class);
- private final ReentrantReadWriteLock lock;
+ protected final ReentrantReadWriteLock lock;
- private final Set<PipeRealtimeDataRegionExtractor> extractors;
- private final Cache<String, Set<PipeRealtimeDataRegionExtractor>>
deviceToExtractorsCache;
+ protected final Set<PipeRealtimeDataRegionExtractor> extractors;
+ protected final Cache<String, Set<PipeRealtimeDataRegionExtractor>>
deviceToExtractorsCache;
public CachedSchemaPatternMatcher() {
this.lock = new ReentrantReadWriteLock();
@@ -130,47 +131,32 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
if (measurements.length == 0) {
// `measurements` is empty (only in case of tsfile event). match all
extractors.
//
- // case 1: for example, pattern is root.a.b, device is root.a.b.c,
measurement can be any.
+ // case 1: the pattern can match all measurements of the device.
// in this case, the extractor can be matched without checking the
measurements.
//
- // case 2: for example, pattern is root.a.b.c, device is root.a.b.
- // in this situation, the extractor can not be matched in some
cases, but we can not know
- // all the measurements of the device in an efficient way, so we
ASSUME that the extractor
- // can be matched. this is a trade-off between efficiency and
accuracy. for most user's
- // usage, this is acceptable, which may result in some unnecessary
data processing and
- // transmission, but will not result in data loss.
+ // case 2: the pattern may match some measurements of the device.
+ // in this case, we can't get all measurements efficiently here,
+ // so we just ASSUME the extractor matches and do more checks later.
matchedExtractors.addAll(extractorsFilteredByDevice);
} else {
- // `measurements` is not empty (only in case of tablet event). match
extractors by
- // measurements.
+ // `measurements` is not empty (only in case of tablet event).
+ // Match extractors by measurements.
extractorsFilteredByDevice.forEach(
extractor -> {
- final String pattern = extractor.getPattern();
-
- // case 1: for example, pattern is root.a.b and device is
root.a.b.c
- // in this case, the extractor can be matched without checking
the measurements
- if (pattern.length() <= device.length()) {
+ final PipePattern pattern = extractor.getPipePattern();
+ if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(device)) {
+ // The pattern can match all measurements of the device.
matchedExtractors.add(extractor);
- }
- // case 2: for example, pattern is root.a.b.c and device is
root.a.b
- // in this case, we need to check the full path
- else {
+ } else {
for (final String measurement : measurements) {
- // ignore null measurement for partial insert
+ // Ignore null measurement for partial insert
if (measurement == null) {
continue;
}
- // for example, pattern is root.a.b.c, device is root.a.b
and measurement is c
- // in this case, the extractor can be matched. other cases
are not matched.
- // please note that there should be a . between device and
measurement.
- if (
- // low cost check comes first
- pattern.length() == device.length() + measurement.length()
+ 1
- // high cost check comes later
- && pattern.endsWith(TsFileConstant.PATH_SEPARATOR +
measurement)) {
+ if (pattern.matchesMeasurement(device, measurement)) {
matchedExtractors.add(extractor);
- // there would be no more matched extractors because the
measurements are
+ // There would be no more matched extractors because the
measurements are
// unique
break;
}
@@ -190,7 +176,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
return matchedExtractors;
}
- private Set<PipeRealtimeDataRegionExtractor> filterExtractorsByDevice(String
device) {
+ protected Set<PipeRealtimeDataRegionExtractor>
filterExtractorsByDevice(String device) {
final Set<PipeRealtimeDataRegionExtractor> filteredExtractors = new
HashSet<>();
for (PipeRealtimeDataRegionExtractor extractor : extractors) {
@@ -199,15 +185,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
continue;
}
- final String pattern = extractor.getPattern();
- if (
- // for example, pattern is root.a.b and device is root.a.b.c
- // in this case, the extractor can be matched without checking the
measurements
- (pattern.length() <= device.length() && device.startsWith(pattern))
- // for example, pattern is root.a.b.c and device is root.a.b
- // in this case, the extractor can be selected as candidate, but the
measurements should
- // be checked further
- || (pattern.length() > device.length() &&
pattern.startsWith(device))) {
+ final PipePattern pipePattern = extractor.getPipePattern();
+ if (Objects.isNull(pipePattern) ||
pipePattern.mayOverlapWithDevice(device)) {
filteredExtractors.add(extractor);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
index 4e102a1f7cf..4be05cc3a81 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher;
+package org.apache.iotdb.db.pipe.pattern.matcher;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 6a29a403739..07b1e5d6541 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -224,26 +225,28 @@ public class PipeTabletInsertionEventTest {
@Test
public void convertToTabletForTest() {
TabletInsertionDataContainer container1 =
- new TabletInsertionDataContainer(insertRowNode, pattern);
+ new TabletInsertionDataContainer(insertRowNode, new
PrefixPipePattern(pattern));
Tablet tablet1 = container1.convertToTablet();
boolean isAligned1 = container1.isAligned();
Assert.assertEquals(tablet1, tabletForInsertRowNode);
Assert.assertFalse(isAligned1);
TabletInsertionDataContainer container2 =
- new TabletInsertionDataContainer(insertTabletNode, pattern);
+ new TabletInsertionDataContainer(insertTabletNode, new
PrefixPipePattern(pattern));
Tablet tablet2 = container2.convertToTablet();
boolean isAligned2 = container2.isAligned();
Assert.assertEquals(tablet2, tabletForInsertTabletNode);
Assert.assertFalse(isAligned2);
- PipeRawTabletInsertionEvent event3 = new
PipeRawTabletInsertionEvent(tablet1, false, pattern);
+ PipeRawTabletInsertionEvent event3 =
+ new PipeRawTabletInsertionEvent(tablet1, false, new
PrefixPipePattern(pattern));
Tablet tablet3 = event3.convertToTablet();
boolean isAligned3 = event3.isAligned();
Assert.assertEquals(tablet1, tablet3);
Assert.assertFalse(isAligned3);
- PipeRawTabletInsertionEvent event4 = new
PipeRawTabletInsertionEvent(tablet2, false, pattern);
+ PipeRawTabletInsertionEvent event4 =
+ new PipeRawTabletInsertionEvent(tablet2, false, new
PrefixPipePattern(pattern));
Tablet tablet4 = event4.convertToTablet();
boolean isAligned4 = event4.isAligned();
Assert.assertEquals(tablet2, tablet4);
@@ -253,26 +256,28 @@ public class PipeTabletInsertionEventTest {
@Test
public void convertToAlignedTabletForTest() {
TabletInsertionDataContainer container1 =
- new TabletInsertionDataContainer(insertRowNodeAligned, pattern);
+ new TabletInsertionDataContainer(insertRowNodeAligned, new
PrefixPipePattern(pattern));
Tablet tablet1 = container1.convertToTablet();
boolean isAligned1 = container1.isAligned();
Assert.assertEquals(tablet1, tabletForInsertRowNode);
Assert.assertTrue(isAligned1);
TabletInsertionDataContainer container2 =
- new TabletInsertionDataContainer(insertTabletNodeAligned, pattern);
+ new TabletInsertionDataContainer(insertTabletNodeAligned, new
PrefixPipePattern(pattern));
Tablet tablet2 = container2.convertToTablet();
boolean isAligned2 = container2.isAligned();
Assert.assertEquals(tablet2, tabletForInsertTabletNode);
Assert.assertTrue(isAligned2);
- PipeRawTabletInsertionEvent event3 = new
PipeRawTabletInsertionEvent(tablet1, true, pattern);
+ PipeRawTabletInsertionEvent event3 =
+ new PipeRawTabletInsertionEvent(tablet1, true, new
PrefixPipePattern(pattern));
Tablet tablet3 = event3.convertToTablet();
boolean isAligned3 = event3.isAligned();
Assert.assertEquals(tablet1, tablet3);
Assert.assertTrue(isAligned3);
- PipeRawTabletInsertionEvent event4 = new
PipeRawTabletInsertionEvent(tablet2, true, pattern);
+ PipeRawTabletInsertionEvent event4 =
+ new PipeRawTabletInsertionEvent(tablet2, true, new
PrefixPipePattern(pattern));
Tablet tablet4 = event4.convertToTablet();
boolean isAligned4 = event4.isAligned();
Assert.assertEquals(tablet2, tablet4);
@@ -286,7 +291,7 @@ public class PipeTabletInsertionEventTest {
null,
new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 111L,
113L),
insertRowNode,
- pattern);
+ new PrefixPipePattern(pattern));
Tablet tablet1 = container1.convertToTablet();
Assert.assertEquals(0, tablet1.rowSize);
boolean isAligned1 = container1.isAligned();
@@ -297,7 +302,7 @@ public class PipeTabletInsertionEventTest {
null,
new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L,
110L),
insertRowNode,
- pattern);
+ new PrefixPipePattern(pattern));
Tablet tablet2 = container2.convertToTablet();
Assert.assertEquals(1, tablet2.rowSize);
boolean isAligned2 = container2.isAligned();
@@ -308,7 +313,7 @@ public class PipeTabletInsertionEventTest {
null,
new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L,
113L),
insertTabletNode,
- pattern);
+ new PrefixPipePattern(pattern));
Tablet tablet3 = container3.convertToTablet();
Assert.assertEquals(3, tablet3.rowSize);
boolean isAligned3 = container3.isAligned();
@@ -319,7 +324,7 @@ public class PipeTabletInsertionEventTest {
null,
new PipeRawTabletInsertionEvent(tabletForInsertTabletNode,
Long.MIN_VALUE, 109L),
insertTabletNode,
- pattern);
+ new PrefixPipePattern(pattern));
Tablet tablet4 = container4.convertToTablet();
Assert.assertEquals(0, tablet4.rowSize);
boolean isAligned4 = container4.isAligned();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index da2a7a00667..b26286c4180 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.pipe.event;
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -48,6 +51,9 @@ public class TsFileInsertionDataContainerTest {
private static final long TSFILE_START_TIME = 300L;
+ private static final String PREFIX_FORMAT = "prefix";
+ private static final String IOTDB_FORMAT = "iotdb";
+
private File alignedTsFile;
private File nonalignedTsFile;
@@ -71,6 +77,10 @@ public class TsFileInsertionDataContainerTest {
measurementNumbers.add(1);
measurementNumbers.add(2);
+ Set<String> patternFormats = new HashSet<>();
+ patternFormats.add(PREFIX_FORMAT);
+ patternFormats.add(IOTDB_FORMAT);
+
Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1));
startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME));
@@ -91,44 +101,126 @@ public class TsFileInsertionDataContainerTest {
for (int deviceNumber : deviceNumbers) {
for (int measurementNumber : measurementNumbers) {
- for (Pair<Long, Long> startEndTime : startEndTimes) {
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 0, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 2, startEndTime.left,
startEndTime.right);
-
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 999, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1000, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1001, startEndTime.left,
startEndTime.right);
-
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 999 * 2 + 1, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1000, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1001 * 2 - 1,
startEndTime.left, startEndTime.right);
-
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1023, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1024, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1025, startEndTime.left,
startEndTime.right);
-
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1023 * 2 + 1,
startEndTime.left, startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1024 * 2, startEndTime.left,
startEndTime.right);
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 1025 * 2 - 1,
startEndTime.left, startEndTime.right);
-
- testToTabletInsertionEvents(
- deviceNumber, measurementNumber, 10001, startEndTime.left,
startEndTime.right);
+ for (String patternFormat : patternFormats) {
+ for (Pair<Long, Long> startEndTime : startEndTimes) {
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 0,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 2,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 999,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1000,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1001,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 999 * 2 + 1,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1000,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1001 * 2 - 1,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1023,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1024,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1025,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1023 * 2 + 1,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1024 * 2,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 1025 * 2 - 1,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber,
+ measurementNumber,
+ 10001,
+ patternFormat,
+ startEndTime.left,
+ startEndTime.right);
+ }
}
}
}
@@ -138,14 +230,16 @@ public class TsFileInsertionDataContainerTest {
int deviceNumber,
int measurementNumber,
int rowNumberInOneDevice,
+ String patternFormat,
long startTime,
long endTime)
throws Exception {
LOGGER.info(
- "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {},
rowNumberInOneDevice: {}, startTime: {}, endTime: {}",
+ "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {},
rowNumberInOneDevice: {}, patternFormat: {}, startTime: {}, endTime: {}",
deviceNumber,
measurementNumber,
rowNumberInOneDevice,
+ patternFormat,
startTime,
endTime);
@@ -191,10 +285,21 @@ public class TsFileInsertionDataContainerTest {
}
}
+ final PipePattern rootPattern;
+ switch (patternFormat) {
+ case PREFIX_FORMAT:
+ rootPattern = new PrefixPipePattern("root");
+ break;
+ case IOTDB_FORMAT:
+ default:
+ rootPattern = new IoTDBPipePattern("root.**");
+ break;
+ }
+
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(alignedTsFile, "root", startTime,
endTime);
+ new TsFileInsertionDataContainer(alignedTsFile, rootPattern,
startTime, endTime);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(nonalignedTsFile, "root",
startTime, endTime)) {
+ new TsFileInsertionDataContainer(nonalignedTsFile, rootPattern,
startTime, endTime)) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
@@ -230,20 +335,19 @@ public class TsFileInsertionDataContainerTest {
.forEach(
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
- (tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
-
rowCollector.collectRow(row);
-
Assert.assertEquals(
-
measurementNumber, row.size());
-
count3.incrementAndGet();
- } catch (IOException
e) {
- throw new
RuntimeException(e);
- }
- });
- }))));
+ (tablet, rowCollector) ->
+ new
PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(
+
measurementNumber, row.size());
+
count3.incrementAndGet();
+ } catch
(IOException e) {
+ throw new
RuntimeException(e);
+ }
+ })))));
Assert.assertEquals(count1.getAndSet(0), deviceNumber *
expectedRowNumber);
Assert.assertEquals(count2.getAndSet(0), deviceNumber *
expectedRowNumber);
@@ -255,19 +359,18 @@ public class TsFileInsertionDataContainerTest {
event ->
event
.processTablet(
- (tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- Assert.assertEquals(measurementNumber,
row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- })
+ (tablet, rowCollector) ->
+ new PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+
Assert.assertEquals(measurementNumber, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }))
.forEach(
tabletInsertionEvent1 ->
tabletInsertionEvent1
@@ -295,9 +398,9 @@ public class TsFileInsertionDataContainerTest {
}
}))));
- Assert.assertEquals(count1.get(), deviceNumber * expectedRowNumber);
- Assert.assertEquals(count2.get(), deviceNumber * expectedRowNumber);
- Assert.assertEquals(count3.get(), deviceNumber * expectedRowNumber);
+ Assert.assertEquals(deviceNumber * expectedRowNumber, count1.get());
+ Assert.assertEquals(deviceNumber * expectedRowNumber, count2.get());
+ Assert.assertEquals(deviceNumber * expectedRowNumber, count3.get());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -338,12 +441,26 @@ public class TsFileInsertionDataContainerTest {
}));
}
+ final PipePattern oneAlignedDevicePattern;
+ final PipePattern oneNonAlignedDevicePattern;
+ switch (patternFormat) {
+ case PREFIX_FORMAT:
+ oneAlignedDevicePattern = new
PrefixPipePattern(oneDeviceInAlignedTsFile.get());
+ oneNonAlignedDevicePattern = new
PrefixPipePattern(oneDeviceInUnalignedTsFile.get());
+ break;
+ case IOTDB_FORMAT:
+ default:
+ oneAlignedDevicePattern = new
IoTDBPipePattern(oneDeviceInAlignedTsFile.get() + ".**");
+ oneNonAlignedDevicePattern = new
IoTDBPipePattern(oneDeviceInUnalignedTsFile.get() + ".**");
+ break;
+ }
+
try (final TsFileInsertionDataContainer alignedContainer =
new TsFileInsertionDataContainer(
- alignedTsFile, oneDeviceInAlignedTsFile.get(), startTime,
endTime);
+ alignedTsFile, oneAlignedDevicePattern, startTime, endTime);
final TsFileInsertionDataContainer nonalignedContainer =
new TsFileInsertionDataContainer(
- nonalignedTsFile, oneDeviceInUnalignedTsFile.get(), startTime,
endTime); ) {
+ nonalignedTsFile, oneNonAlignedDevicePattern, startTime,
endTime)) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
@@ -379,24 +496,23 @@ public class TsFileInsertionDataContainerTest {
.forEach(
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
- (tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
-
rowCollector.collectRow(row);
-
Assert.assertEquals(
-
measurementNumber, row.size());
-
count3.incrementAndGet();
- } catch (IOException
e) {
- throw new
RuntimeException(e);
- }
- });
- }))));
-
- Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
- Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
- Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
+ (tablet, rowCollector) ->
+ new
PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(
+
measurementNumber, row.size());
+
count3.incrementAndGet();
+ } catch
(IOException e) {
+ throw new
RuntimeException(e);
+ }
+ })))));
+
+ Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
+ Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
+ Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
nonalignedContainer
.toTabletInsertionEvents()
@@ -404,19 +520,18 @@ public class TsFileInsertionDataContainerTest {
event ->
event
.processTablet(
- (tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- Assert.assertEquals(measurementNumber,
row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- })
+ (tablet, rowCollector) ->
+ new PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+
Assert.assertEquals(measurementNumber, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }))
.forEach(
tabletInsertionEvent1 ->
tabletInsertionEvent1
@@ -444,20 +559,36 @@ public class TsFileInsertionDataContainerTest {
}
}))));
- Assert.assertEquals(count1.get(), expectedRowNumber);
- Assert.assertEquals(count2.get(), expectedRowNumber);
- Assert.assertEquals(count3.get(), expectedRowNumber);
+ Assert.assertEquals(expectedRowNumber, count1.get());
+ Assert.assertEquals(expectedRowNumber, count2.get());
+ Assert.assertEquals(expectedRowNumber, count3.get());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
+ final PipePattern oneAlignedMeasurementPattern;
+ final PipePattern oneNonAlignedMeasurementPattern;
+ switch (patternFormat) {
+ case PREFIX_FORMAT:
+ oneAlignedMeasurementPattern = new
PrefixPipePattern(oneMeasurementInAlignedTsFile.get());
+ oneNonAlignedMeasurementPattern =
+ new PrefixPipePattern(oneMeasurementInUnalignedTsFile.get());
+ break;
+ case IOTDB_FORMAT:
+ default:
+ oneAlignedMeasurementPattern = new
IoTDBPipePattern(oneMeasurementInAlignedTsFile.get());
+ oneNonAlignedMeasurementPattern =
+ new IoTDBPipePattern(oneMeasurementInUnalignedTsFile.get());
+ break;
+ }
+
try (final TsFileInsertionDataContainer alignedContainer =
new TsFileInsertionDataContainer(
- alignedTsFile, oneMeasurementInAlignedTsFile.get(), startTime,
endTime);
+ alignedTsFile, oneAlignedMeasurementPattern, startTime,
endTime);
final TsFileInsertionDataContainer nonalignedContainer =
new TsFileInsertionDataContainer(
- nonalignedTsFile, oneMeasurementInUnalignedTsFile.get(),
startTime, endTime); ) {
+ nonalignedTsFile, oneNonAlignedMeasurementPattern, startTime,
endTime)) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
@@ -493,23 +624,22 @@ public class TsFileInsertionDataContainerTest {
.forEach(
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
- (tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
-
rowCollector.collectRow(row);
-
Assert.assertEquals(1, row.size());
-
count3.incrementAndGet();
- } catch (IOException
e) {
- throw new
RuntimeException(e);
- }
- });
- }))));
-
- Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
- Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
- Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
+ (tablet, rowCollector) ->
+ new
PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(1, row.size());
+
count3.incrementAndGet();
+ } catch
(IOException e) {
+ throw new
RuntimeException(e);
+ }
+ })))));
+
+ Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
+ Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
+ Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
nonalignedContainer
.toTabletInsertionEvents()
@@ -517,19 +647,18 @@ public class TsFileInsertionDataContainerTest {
event ->
event
.processTablet(
- (tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- Assert.assertEquals(1, row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- })
+ (tablet, rowCollector) ->
+ new PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+ Assert.assertEquals(1, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }))
.forEach(
tabletInsertionEvent1 ->
tabletInsertionEvent1
@@ -556,20 +685,30 @@ public class TsFileInsertionDataContainerTest {
}
}))));
- Assert.assertEquals(count1.get(), expectedRowNumber);
- Assert.assertEquals(count2.get(), expectedRowNumber);
- Assert.assertEquals(count3.get(), expectedRowNumber);
+ Assert.assertEquals(expectedRowNumber, count1.get());
+ Assert.assertEquals(expectedRowNumber, count2.get());
+ Assert.assertEquals(expectedRowNumber, count3.get());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
+ final PipePattern notExistPattern;
+ switch (patternFormat) {
+ case PREFIX_FORMAT:
+ notExistPattern = new PrefixPipePattern("root.`not-exist-pattern`");
+ break;
+ case IOTDB_FORMAT:
+ default:
+ notExistPattern = new IoTDBPipePattern("root.`not-exist-pattern`");
+ break;
+ }
+
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(
- alignedTsFile, "not-exist-pattern", startTime, endTime);
+ new TsFileInsertionDataContainer(alignedTsFile, notExistPattern,
startTime, endTime);
final TsFileInsertionDataContainer nonalignedContainer =
new TsFileInsertionDataContainer(
- nonalignedTsFile, "not-exist-pattern", startTime, endTime); ) {
+ nonalignedTsFile, notExistPattern, startTime, endTime)) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
@@ -605,23 +744,22 @@ public class TsFileInsertionDataContainerTest {
.forEach(
tabletInsertionEvent2 ->
tabletInsertionEvent2.processTablet(
- (tablet, rowCollector) -> {
- new
PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
-
rowCollector.collectRow(row);
-
Assert.assertEquals(0, row.size());
-
count3.incrementAndGet();
- } catch (IOException
e) {
- throw new
RuntimeException(e);
- }
- });
- }))));
-
- Assert.assertEquals(count1.getAndSet(0), 0);
- Assert.assertEquals(count2.getAndSet(0), 0);
- Assert.assertEquals(count3.getAndSet(0), 0);
+ (tablet, rowCollector) ->
+ new
PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+
rowCollector.collectRow(row);
+
Assert.assertEquals(0, row.size());
+
count3.incrementAndGet();
+ } catch
(IOException e) {
+ throw new
RuntimeException(e);
+ }
+ })))));
+
+ Assert.assertEquals(0, count1.getAndSet(0));
+ Assert.assertEquals(0, count2.getAndSet(0));
+ Assert.assertEquals(0, count3.getAndSet(0));
nonalignedContainer
.toTabletInsertionEvents()
@@ -629,19 +767,18 @@ public class TsFileInsertionDataContainerTest {
event ->
event
.processTablet(
- (tablet, rowCollector) -> {
- new PipeRawTabletInsertionEvent(tablet, false)
- .processRowByRow(
- (row, collector) -> {
- try {
- rowCollector.collectRow(row);
- Assert.assertEquals(0, row.size());
- count1.incrementAndGet();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- })
+ (tablet, rowCollector) ->
+ new PipeRawTabletInsertionEvent(tablet, false)
+ .processRowByRow(
+ (row, collector) -> {
+ try {
+ rowCollector.collectRow(row);
+ Assert.assertEquals(0, row.size());
+ count1.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }))
.forEach(
tabletInsertionEvent1 ->
tabletInsertionEvent1
@@ -668,9 +805,9 @@ public class TsFileInsertionDataContainerTest {
}
}))));
- Assert.assertEquals(count1.get(), 0);
- Assert.assertEquals(count2.get(), 0);
- Assert.assertEquals(count3.get(), 0);
+ Assert.assertEquals(0, count1.get());
+ Assert.assertEquals(0, count2.get());
+ Assert.assertEquals(0, count3.get());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
similarity index 92%
rename from
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
rename to
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 6f429b9dcd9..609dd9b3f44 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -17,14 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.extractor;
+package org.apache.iotdb.db.pipe.pattern;
import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
+import org.apache.iotdb.db.pipe.pattern.matcher.CachedSchemaPatternMatcher;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -127,12 +128,12 @@ public class CachedSchemaPatternMatcherTest {
for (int j = 0; j < deviceNum; j++) {
PipeRealtimeEvent event =
new PipeRealtimeEvent(
- null, null, Collections.singletonMap("root." + i,
measurements), "root");
+ null, null, Collections.singletonMap("root." + i,
measurements), null);
long startTime = System.currentTimeMillis();
matcher.match(event).forEach(extractor -> extractor.extract(event));
totalTime += (System.currentTimeMillis() - startTime);
}
- PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap,
"root");
+ PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap,
null);
long startTime = System.currentTimeMillis();
matcher.match(event).forEach(extractor -> extractor.extract(event));
totalTime += (System.currentTimeMillis() - startTime);
@@ -148,7 +149,9 @@ public class CachedSchemaPatternMatcherTest {
public static class PipeRealtimeDataRegionFakeExtractor extends
PipeRealtimeDataRegionExtractor {
- public PipeRealtimeDataRegionFakeExtractor() {}
+ public PipeRealtimeDataRegionFakeExtractor() {
+ pipePattern = new PrefixPipePattern(null);
+ }
@Override
public Event supply() {
@@ -166,10 +169,13 @@ public class CachedSchemaPatternMatcherTest {
for (String s : v) {
match[0] =
match[0]
- || (k + TsFileConstant.PATH_SEPARATOR +
s).startsWith(getPattern());
+ || (k + TsFileConstant.PATH_SEPARATOR + s)
+ .startsWith(getPatternString());
}
} else {
- match[0] = match[0] || (getPattern().startsWith(k) ||
k.startsWith(getPattern()));
+ match[0] =
+ match[0]
+ || (getPatternString().startsWith(k) ||
k.startsWith(getPatternString()));
}
});
Assert.assertTrue(match[0]);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
new file mode 100644
index 00000000000..5c74f3ea21c
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.pattern;
+
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IoTDBPipePatternTest {
+
+ @Test
+ public void testIotdbPipePattern() {
+ // Test legal and illegal pattern
+ String[] legalPatterns = {
+ "root", "root.db", "root.db.d1.s", "root.db.`1`", "root.*.d.*s.s",
+ };
+ String[] illegalPatterns = {
+ "root.", "roo", "", "root..", "root./",
+ };
+ for (String s : legalPatterns) {
+ Assert.assertTrue(new IoTDBPipePattern(s).isLegal());
+ }
+ for (String t : illegalPatterns) {
+ try {
+ Assert.assertFalse(new IoTDBPipePattern(t).isLegal());
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof PipeException);
+ }
+ }
+
+ // Test pattern cover db
+ String db = "root.db";
+ String[] patternsCoverDb = {
+ "root.**", "root.db.**", "root.*db*.**",
+ };
+ String[] patternsNotCoverDb = {
+ "root.db", "root.*", "root.*.*", "root.db.*.**", "root.db.d1",
"root.**.db.**",
+ };
+ for (String s : patternsCoverDb) {
+ Assert.assertTrue(new IoTDBPipePattern(s).coversDb(db));
+ }
+ for (String t : patternsNotCoverDb) {
+ Assert.assertFalse(new IoTDBPipePattern(t).coversDb(db));
+ }
+
+ String device = "root.db.d1";
+
+ // Test pattern cover device
+ String[] patternsCoverDevice = {
+ "root.**", "root.db.**", "root.*.*.*", "root.db.d1.*",
"root.*db*.*d*.*", "root.**.*1.*",
+ };
+ String[] patternsNotCoverDevice = {
+ "root.*", "root.*.*", "root.db.d1", "root.db.d2.*", "root.**.d2.**",
+ };
+ for (String s : patternsCoverDevice) {
+ Assert.assertTrue(new IoTDBPipePattern(s).coversDevice(device));
+ }
+ for (String t : patternsNotCoverDevice) {
+ Assert.assertFalse(new IoTDBPipePattern(t).coversDevice(device));
+ }
+
+ // Test pattern may overlap with device
+ String[] patternsOverlapWithDevice = {
+ "root.db.**", "root.db.d1", "root.db.d1.*", "root.db.d1.s1",
"root.**.d2.**", "root.*.d*.**",
+ };
+ String[] patternsNotOverlapWithDevice = {
+ "root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**",
+ };
+ for (String s : patternsOverlapWithDevice) {
+ Assert.assertTrue(new IoTDBPipePattern(s).mayOverlapWithDevice(device));
+ }
+ for (String t : patternsNotOverlapWithDevice) {
+ Assert.assertFalse(new IoTDBPipePattern(t).mayOverlapWithDevice(device));
+ }
+
+ // Test pattern match measurement
+ String measurement = "s1";
+ String[] patternsMatchMeasurement = {
+ "root.db.d1.s1", "root.db.d1.*",
+ };
+ String[] patternsNotMatchMeasurement = {
+ "root.db.d1", "root.db.d1", "root.db.d1.*.*",
+ };
+ for (String s : patternsMatchMeasurement) {
+ Assert.assertTrue(new IoTDBPipePattern(s).matchesMeasurement(device,
measurement));
+ }
+ for (String t : patternsNotMatchMeasurement) {
+ Assert.assertFalse(new IoTDBPipePattern(t).matchesMeasurement(device,
measurement));
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
new file mode 100644
index 00000000000..cb327d2dac0
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.pattern;
+
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PrefixPipePatternTest {
+
+ @Test
+ public void testPrefixPipePattern() {
+ // Test legal and illegal pattern
+ String[] legalPatterns = {
+ "root", "root.", "root.db", "root.db.d1.s", "root.db.`1`",
+ };
+ String[] illegalPatterns = {
+ "roo", "", "root..", "root./",
+ };
+ for (String s : legalPatterns) {
+ Assert.assertTrue(new PrefixPipePattern(s).isLegal());
+ }
+ for (String t : illegalPatterns) {
+ Assert.assertFalse(new PrefixPipePattern(t).isLegal());
+ }
+
+ // Test pattern cover db
+ String db = "root.db";
+ String[] patternsCoverDb = {
+ "root", "root.", "root.d", "root.db",
+ };
+ String[] patternsNotCoverDb = {
+ "root.**", "root.db.",
+ };
+ for (String s : patternsCoverDb) {
+ Assert.assertTrue(new PrefixPipePattern(s).coversDb(db));
+ }
+ for (String t : patternsNotCoverDb) {
+ Assert.assertFalse(new PrefixPipePattern(t).coversDb(db));
+ }
+
+ String device = "root.db.d1";
+
+ // Test pattern cover device
+ String[] patternsCoverDevice = {
+ "root", "root.", "root.d", "root.db", "root.db.", "root.db.d",
"root.db.d1",
+ };
+ String[] patternsNotCoverDevice = {
+ "root.db.d1.", "root.db.d1.s1", "root.**", "root.db.d2",
+ };
+ for (String s : patternsCoverDevice) {
+ Assert.assertTrue(new PrefixPipePattern(s).coversDevice(device));
+ }
+ for (String t : patternsNotCoverDevice) {
+ Assert.assertFalse(new PrefixPipePattern(t).coversDevice(device));
+ }
+
+ // Test pattern may overlap with device
+ String[] patternsOverlapWithDevice = {
+ "root", "root.db.d1", "root.db.d1.", "root.db.d1.s1",
+ };
+ String[] patternsNotOverlapWithDevice = {
+ "root.db.d2", "root.**",
+ };
+ for (String s : patternsOverlapWithDevice) {
+ Assert.assertTrue(new PrefixPipePattern(s).mayOverlapWithDevice(device));
+ }
+ for (String t : patternsNotOverlapWithDevice) {
+ Assert.assertFalse(new
PrefixPipePattern(t).mayOverlapWithDevice(device));
+ }
+
+ // Test pattern match measurement
+ String measurement = "s1";
+ String[] patternsMatchMeasurement = {
+ "root.db.d1", "root.db.d1.", "root.db.d1.s", "root.db.d1.s1",
+ };
+ String[] patternsNotMatchMeasurement = {
+ "root.db.d1.s11", "root.db.d1.s2",
+ };
+ for (String s : patternsMatchMeasurement) {
+ Assert.assertTrue(new PrefixPipePattern(s).matchesMeasurement(device,
measurement));
+ }
+ for (String t : patternsNotMatchMeasurement) {
+ Assert.assertFalse(new PrefixPipePattern(t).matchesMeasurement(device,
measurement));
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index c67f9fd4dc2..8c913fa31f5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -34,7 +34,14 @@ public class PipeExtractorConstant {
public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
public static final String SOURCE_PATTERN_KEY = "source.pattern";
- public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
+ public static final String EXTRACTOR_PATH_KEY = "extractor.path";
+ public static final String SOURCE_PATH_KEY = "source.path";
+ public static final String EXTRACTOR_PATTERN_FORMAT_KEY =
"extractor.pattern.format";
+ public static final String SOURCE_PATTERN_FORMAT_KEY =
"source.pattern.format";
+ public static final String EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE = "prefix";
+ public static final String EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE = "iotdb";
+ public static final String EXTRACTOR_PATTERN_PREFIX_DEFAULT_VALUE = "root";
+ public static final String EXTRACTOR_PATTERN_IOTDB_DEFAULT_VALUE = "root.**";
public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
"extractor.forwarding-pipe-requests";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index e9e50f958cc..fb9c1444b70 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.event;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.progress.committer.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.pipe.api.event.Event;
@@ -49,7 +49,7 @@ public abstract class EnrichedEvent implements Event {
public static final long NO_COMMIT_ID = -1;
protected long commitId = NO_COMMIT_ID;
- protected final String pattern;
+ protected final PipePattern pipePattern;
protected final long startTime;
protected final long endTime;
@@ -60,14 +60,18 @@ public abstract class EnrichedEvent implements Event {
protected boolean shouldReportOnCommit = false;
protected EnrichedEvent(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pipePattern,
+ long startTime,
+ long endTime) {
referenceCount = new AtomicInteger(0);
this.pipeName = pipeName;
this.pipeTaskMeta = pipeTaskMeta;
- this.pattern = pattern;
+ this.pipePattern = pipePattern;
this.startTime = startTime;
this.endTime = endTime;
- isPatternParsed =
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ isPatternParsed = this.pipePattern == null || this.pipePattern.isRoot();
isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
}
@@ -189,12 +193,16 @@ public abstract class EnrichedEvent implements Event {
}
/**
- * Get the {@link EnrichedEvent#pattern} of this {@link EnrichedEvent}.
+ * Get the pattern string of this {@link EnrichedEvent}.
*
- * @return the {@link EnrichedEvent#pattern}
+ * @return the pattern string
*/
- public final String getPattern() {
- return pattern == null ?
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
+ public final String getPatternString() {
+ return pipePattern != null ? pipePattern.getPattern() : null;
+ }
+
+ public final PipePattern getPipePattern() {
+ return pipePattern;
}
public final long getStartTime() {
@@ -206,8 +214,8 @@ public abstract class EnrichedEvent implements Event {
}
/**
- * If pipe's {@link EnrichedEvent#pattern} is database-level, then no need
to parse {@link
- * EnrichedEvent} by {@link EnrichedEvent#pattern} cause pipes are
data-region-level.
+ * If pipe's pattern is database-level, then no need to parse {@link
EnrichedEvent} by pattern
+ * cause pipes are data-region-level.
*/
public void skipParsingPattern() {
isPatternParsed = true;
@@ -230,7 +238,11 @@ public abstract class EnrichedEvent implements Event {
}
public abstract EnrichedEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime);
+ String pipeName,
+ PipeTaskMeta pipeTaskMeta,
+ PipePattern pattern,
+ long startTime,
+ long endTime);
public PipeTaskMeta getPipeTaskMeta() {
return pipeTaskMeta;
@@ -278,7 +290,7 @@ public abstract class EnrichedEvent implements Event {
+ "', commitId="
+ commitId
+ ", pattern='"
- + pattern
+ + pipePattern
+ "', startTime="
+ startTime
+ ", endTime="
@@ -303,7 +315,7 @@ public abstract class EnrichedEvent implements Event {
+ "', commitId="
+ commitId
+ ", pattern='"
- + pattern
+ + pipePattern
+ "', startTime="
+ startTime
+ ", endTime="
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index 96da71b05a0..afc5da84cf5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.event;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -44,7 +45,7 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent
implements Seriali
String snapshotPath,
String pipeName,
PipeTaskMeta pipeTaskMeta,
- String pattern,
+ PipePattern pattern,
PipeSnapshotResourceManager resourceManager) {
super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
this.snapshotPath = snapshotPath;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index c888e97ee6f..e31e9c85efc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.event;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.slf4j.Logger;
@@ -38,7 +39,7 @@ public abstract class PipeWritePlanEvent extends
EnrichedEvent implements Serial
protected ProgressIndex progressIndex;
protected PipeWritePlanEvent(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, boolean
isGeneratedByPipe) {
+ String pipeName, PipeTaskMeta pipeTaskMeta, PipePattern pattern, boolean
isGeneratedByPipe) {
super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
this.isGeneratedByPipe = isGeneratedByPipe;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
new file mode 100644
index 00000000000..ec4192adff3
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.pattern;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import java.util.Objects;
+
+public class IoTDBPipePattern extends PipePattern {
+
+ private final PartialPath patternPartialPath;
+
+ public IoTDBPipePattern(String pattern) {
+ super(pattern);
+
+ try {
+ patternPartialPath = new PartialPath(getPattern());
+ } catch (IllegalPathException e) {
+ throw new PipeException("Illegal IoTDBPipePattern: " + getPattern(), e);
+ }
+ }
+
+ @Override
+ public String getDefaultPattern() {
+ return PipeExtractorConstant.EXTRACTOR_PATTERN_IOTDB_DEFAULT_VALUE;
+ }
+
+ @Override
+ public boolean isLegal() {
+ if (!pattern.startsWith("root")) {
+ return false;
+ }
+
+ try {
+ PathUtils.isLegalPath(pattern);
+ return true;
+ } catch (IllegalPathException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean coversDb(String db) {
+ try {
+ return patternPartialPath.include(
+ new PartialPath(db, IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD));
+ } catch (IllegalPathException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean coversDevice(String device) {
+ try {
+ return patternPartialPath.include(
+ new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+ } catch (IllegalPathException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean mayOverlapWithDevice(String device) {
+ try {
+ // Another way is to use patternPath.overlapWith("device.*"),
+ // there will be no false positives but time cost may be higher.
+ return patternPartialPath.matchPrefixPath(new PartialPath(device));
+ } catch (IllegalPathException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean matchesMeasurement(String device, String measurement) {
+ // For aligned timeseries, empty measurement is an alias of the time
column.
+ if (Objects.isNull(measurement) || measurement.isEmpty()) {
+ return false;
+ }
+
+ try {
+ return patternPartialPath.matchFullPath(new PartialPath(device,
measurement));
+ } catch (IllegalPathException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "IoTDBPipePattern" + super.toString();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
new file mode 100644
index 00000000000..04be541f625
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.pattern;
+
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATH_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATH_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
+
+public abstract class PipePattern {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipePattern.class);
+
+ protected final String pattern;
+
+ protected PipePattern(String pattern) {
+ this.pattern = pattern != null ? pattern : getDefaultPattern();
+ }
+
+ public String getPattern() {
+ return pattern;
+ }
+
+ public boolean isRoot() {
+ return Objects.isNull(pattern) ||
this.pattern.equals(this.getDefaultPattern());
+ }
+
+ /**
+ * Interpret from source parameters and get a pipe pattern.
+ *
+ * @return The interpreted {@link PipePattern} which is not null.
+ */
+ public static PipePattern
parsePipePatternFromSourceParameters(PipeParameters sourceParameters) {
+ final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY,
SOURCE_PATH_KEY);
+
+ // 1. If "source.path" is specified, it will be interpreted as an
IoTDB-style path,
+ // ignoring the other 2 parameters.
+ if (path != null) {
+ return new IoTDBPipePattern(path);
+ }
+
+ final String pattern =
+ sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
+
+ // 2. Otherwise, If "source.pattern" is specified, it will be interpreted
+ // according to "source.pattern.format".
+ if (pattern != null) {
+ final String patternFormat =
+ sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY,
SOURCE_PATTERN_FORMAT_KEY);
+
+ // If "source.pattern.format" is not specified, use prefix format by
default.
+ if (patternFormat == null) {
+ return new PrefixPipePattern(pattern);
+ }
+
+ switch (patternFormat.toLowerCase()) {
+ case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
+ return new IoTDBPipePattern(pattern);
+ case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
+ return new PrefixPipePattern(pattern);
+ default:
+ LOGGER.info(
+ "Unknown pattern format: {}, use prefix matching format by
default.", patternFormat);
+ return new PrefixPipePattern(pattern);
+ }
+ }
+
+ // 3. If neither "source.path" nor "source.pattern" is specified,
+ // this pipe source will match all data.
+ return new PrefixPipePattern(null);
+ }
+
+ public abstract String getDefaultPattern();
+
+ /** Check if this pattern is legal. Different pattern type may have
different rules. */
+ public abstract boolean isLegal();
+
+ /** Check if this pattern matches all time-series under a database. */
+ public abstract boolean coversDb(String db);
+
+ /** Check if a device's all measurements are covered by this pattern. */
+ public abstract boolean coversDevice(String device);
+
+ /**
+ * Check if a device may have some measurements matched by the pattern.
+ *
+ * <p>NOTE1: this is only called when {@link PipePattern#coversDevice} is
false.
+ *
+ * <p>NOTE2: this is just a loose check and may have false positives. To
further check if a
+ * measurement matches the pattern, please use {@link
PipePattern#matchesMeasurement} after this.
+ */
+ public abstract boolean mayOverlapWithDevice(String device);
+
+ /**
+ * Check if a full path with device and measurement can be matched by
pattern.
+ *
+ * <p>NOTE: this is only called when {@link
PipePattern#mayOverlapWithDevice} is true.
+ */
+ public abstract boolean matchesMeasurement(String device, String
measurement);
+
+ @Override
+ public String toString() {
+ return "{pattern='" + pattern + "'}";
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
new file mode 100644
index 00000000000..1373b015df1
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.pattern;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+
+public class PrefixPipePattern extends PipePattern {
+
+ public PrefixPipePattern(String pattern) {
+ super(pattern);
+ }
+
+ @Override
+ public String getDefaultPattern() {
+ return PipeExtractorConstant.EXTRACTOR_PATTERN_PREFIX_DEFAULT_VALUE;
+ }
+
+ @Override
+ public boolean isLegal() {
+ if (!pattern.startsWith("root")) {
+ return false;
+ }
+
+ try {
+ PathUtils.isLegalPath(pattern);
+ } catch (IllegalPathException e) {
+ try {
+ if ("root".equals(pattern) || "root.".equals(pattern)) {
+ return true;
+ }
+
+ // Split the pattern to nodes.
+ String[] pathNodes = StringUtils.splitPreserveAllTokens(pattern,
"\\.");
+
+ // Check whether the pattern without last node is legal.
+ PathUtils.splitPathToDetachedNodes(
+ String.join(".", Arrays.copyOfRange(pathNodes, 0, pathNodes.length
- 1)));
+ String lastNode = pathNodes[pathNodes.length - 1];
+
+ // Check whether the last node is legal.
+ if (!"".equals(lastNode)) {
+ Double.parseDouble(lastNode);
+ }
+ } catch (Exception ignored) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean coversDb(String db) {
+ return pattern.length() <= db.length() && db.startsWith(pattern);
+ }
+
+ @Override
+ public boolean coversDevice(String device) {
+ // for example, pattern is root.a.b and device is root.a.b.c
+ // in this case, the extractor can be matched without checking the
measurements
+ return pattern.length() <= device.length() && device.startsWith(pattern);
+ }
+
+ @Override
+ public boolean mayOverlapWithDevice(String device) {
+ return (
+ // for example, pattern is root.a.b and device is root.a.b.c
+ // in this case, the extractor can be matched without checking the
measurements
+ pattern.length() <= device.length() && device.startsWith(pattern))
+ // for example, pattern is root.a.b.c and device is root.a.b
+ // in this case, the extractor can be selected as candidate, but the
measurements should
+ // be checked further
+ || (pattern.length() > device.length() && pattern.startsWith(device));
+ }
+
+ @Override
+ public boolean matchesMeasurement(String device, String measurement) {
+ // We assume that the device is already matched.
+ if (pattern.length() <= device.length()) {
+ return true;
+ }
+
+ // For example, pattern is "root.a.b.c", device is "root.a.b",
+ // then measurements "c" and "cc" can be matched,
+ // measurements "d" or "dc" can't be matched.
+ String dotAndMeasurement = TsFileConstant.PATH_SEPARATOR + measurement;
+ return
+ // low cost check comes first
+ pattern.length() <= device.length() + dotAndMeasurement.length()
+ // high cost check comes later
+ && dotAndMeasurement.startsWith(pattern.substring(device.length()));
+ }
+
+ @Override
+ public String toString() {
+ return "PrefixPipePattern" + super.toString();
+ }
+}