This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f547b38c91e [To dev/1.3] Pipe: support multiple path patterns under
tree model (#16575) (#16629)
f547b38c91e is described below
commit f547b38c91e988467f89d0926689b2feff7e2255
Author: VGalaxies <[email protected]>
AuthorDate: Thu Oct 23 12:21:20 2025 +0800
[To dev/1.3] Pipe: support multiple path patterns under tree model (#16575)
(#16629)
---
.../it/autocreate/IoTDBPipePatternFormatIT.java | 326 ++++++++++++++++++++-
.../iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java | 73 +++++
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 10 +-
.../PipeConfigPhysicalPlanPatternParseVisitor.java | 60 ++--
...eConfigPhysicalPlanPatternParseVisitorTest.java | 79 ++++-
.../agent/task/connection/PipeEventCollector.java | 5 +-
.../TsFileInsertionDataContainerProvider.java | 6 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 10 +-
.../visitor/PipeStatementPatternParseVisitor.java | 30 +-
.../source/dataregion/IoTDBDataRegionSource.java | 7 +-
.../schemaregion/PipePlanPatternParseVisitor.java | 54 ++--
.../sink/PipeStatementPatternParseVisitorTest.java | 46 ++-
.../db/pipe/source/IoTDBDataRegionSourceTest.java | 2 +
.../source/PipePlanPatternParseVisitorTest.java | 257 +++++++++++++++-
.../datastructure/pattern/IoTDBPipePattern.java | 10 +-
.../pipe/datastructure/pattern/PipePattern.java | 168 ++++++++---
.../datastructure/pattern/PrefixPipePattern.java | 2 +-
.../datastructure/pattern/SinglePipePattern.java | 53 ++++
.../pattern/UnionIoTDBPipePattern.java | 146 +++++++++
.../datastructure/pattern/UnionPipePattern.java | 86 ++++++
.../pipe/source/IoTDBNonDataRegionSource.java | 11 +-
21 files changed, 1277 insertions(+), 164 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipePatternFormatIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipePatternFormatIT.java
index 1d80e175344..2ef920a9cd2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipePatternFormatIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipePatternFormatIT.java
@@ -92,7 +92,7 @@ public class IoTDBPipePatternFormatIT extends
AbstractPipeDualAutoIT {
}
@Test
- public void testIotdbPattern() throws Exception {
+ public void testIoTDBPattern() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -115,8 +115,6 @@ public class IoTDBPipePatternFormatIT extends
AbstractPipeDualAutoIT {
final Map<String, String> connectorAttributes = new HashMap<>();
extractorAttributes.put("extractor.path", "root.**.d1.s*");
- // When path is set, pattern should be ignored
- extractorAttributes.put("extractor.pattern", "root");
extractorAttributes.put("extractor.inclusion", "data.insert");
connectorAttributes.put("connector", "iotdb-thrift-connector");
@@ -146,7 +144,7 @@ public class IoTDBPipePatternFormatIT extends
AbstractPipeDualAutoIT {
}
@Test
- public void testIotdbPatternWithLegacySyntax() throws Exception {
+ public void testIoTDBPatternWithLegacySyntax() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -197,4 +195,324 @@ public class IoTDBPipePatternFormatIT extends
AbstractPipeDualAutoIT {
expectedResSet);
}
}
+
+ @Test
+ public void testMultiplePrefixPatternHistoricalData() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.db.d1.s,
root.db2.d1.s");
+ extractorAttributes.put("extractor.inclusion", "data.insert");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
+ "insert into root.db.d2(time, s) values (2, 2)",
+ "insert into root.db2.d1(time, s) values (3, 3)"))) {
+ return;
+ }
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ final Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,null,1.0,1.0,");
+ expectedResSet.add("3,3.0,null,null,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.db2.**,root.db.**",
+ "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
+ expectedResSet);
+ }
+ }
+
+ @Test
+ public void testMultipleIoTDBPatternHistoricalData() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
+ extractorAttributes.put("extractor.inclusion", "data.insert");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
+ "insert into root.db.d2(time, s) values (2, 2)",
+ "insert into root.db2.d1(time, s, t) values (3, 3, 3)",
+ "insert into root.db3.d1(time, s) values (4, 4)"))) {
+ return;
+ }
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ final Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,null,null,1.0,1.0,null,");
+ expectedResSet.add("2,null,null,null,null,2.0,");
+ expectedResSet.add("3,3.0,3.0,null,null,null,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.db2.**,root.db.**",
+
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
+ expectedResSet);
+ }
+ }
+
+ @Test
+ public void testMultipleHybridPatternHistoricalData() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.path", "root.db.d1.*");
+ extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
+ extractorAttributes.put("extractor.inclusion", "data.insert");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
+ "insert into root.db2.d1(time, s) values (2, 2)",
+ "insert into root.db3.d1(time, s) values (3, 3)"))) {
+ return;
+ }
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ final Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,1.0,1.0,null,");
+ expectedResSet.add("2,null,null,2.0,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.db.**,root.db2.**",
+ "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
+ expectedResSet);
+ }
+ }
+
+ @Test
+ public void testMultiplePrefixPatternRealtimeData() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.pattern", "root.db.d1.s,
root.db2.d1.s");
+ extractorAttributes.put("extractor.inclusion", "data.insert");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
+ "insert into root.db.d2(time, s) values (2, 2)",
+ "insert into root.db2.d1(time, s) values (3, 3)"))) {
+ return;
+ }
+
+ final Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,null,1.0,1.0,");
+ expectedResSet.add("3,3.0,null,null,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.db2.**,root.db.**",
+ "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
+ expectedResSet);
+ }
+ }
+
+ @Test
+ public void testMultipleIoTDBPatternRealtimeData() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.path", "root.db.**, root.db2.d1.*");
+ extractorAttributes.put("extractor.inclusion", "data.insert");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
+ "insert into root.db.d2(time, s) values (2, 2)",
+ "insert into root.db2.d1(time, s, t) values (3, 3, 3)",
+ "insert into root.db3.d1(time, s) values (4, 4)"))) {
+ return;
+ }
+
+ final Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,null,null,1.0,1.0,null,");
+ expectedResSet.add("2,null,null,null,null,2.0,");
+ expectedResSet.add("3,3.0,3.0,null,null,null,");
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.db2.**,root.db.**",
+
"Time,root.db2.d1.s,root.db2.d1.t,root.db.d1.s,root.db.d1.s1,root.db.d2.s,",
+ expectedResSet);
+ }
+ }
+
+ @Test
+ public void testMultipleHybridPatternRealtimeData() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.path", "root.db.d1.*");
+ extractorAttributes.put("extractor.pattern", "root.db2.d1.s");
+ extractorAttributes.put("extractor.inclusion", "data.insert");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
+ "insert into root.db2.d1(time, s) values (2, 2)",
+ "insert into root.db3.d1(time, s) values (3, 3)"))) {
+ return;
+ }
+
+ final Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1,1.0,1.0,null,");
+ expectedResSet.add("2,null,null,2.0,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.db.**,root.db2.**",
+ "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
+ expectedResSet);
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
index 76affe08850..661027160bf 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
@@ -104,6 +104,79 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualManualIT {
}
}
+ @Test
+ public void testPureSchemaInclusionWithMultiplePattern() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.inclusion", "schema");
+ extractorAttributes.put("path",
"root.ln.wf01.wt01.status,root.ln.wf02.**");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "create timeseries root.ln.wf01.wt01.status with
datatype=BOOLEAN,encoding=PLAIN",
+ "ALTER timeseries root.ln.wf01.wt01.status ADD TAGS tag3=v3",
+ "ALTER timeseries root.ln.wf01.wt01.status ADD ATTRIBUTES
attr4=v4",
+ "create timeseries root.ln.wf02.wt01.status with
datatype=BOOLEAN,encoding=PLAIN",
+ "ALTER timeseries root.ln.wf02.wt01.status ADD TAGS tag3=v3",
+ "ALTER timeseries root.ln.wf02.wt01.status ADD ATTRIBUTES
attr4=v4",
+ "create timeseries root.ln.wf03.wt01.status with
datatype=BOOLEAN,encoding=PLAIN",
+ "ALTER timeseries root.ln.wf03.wt01.status ADD TAGS tag3=v3",
+ "ALTER timeseries root.ln.wf03.wt01.status ADD ATTRIBUTES
attr4=v4"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "show timeseries root.ln.**",
+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
+ new HashSet<String>() {
+ {
+ add(
+
"root.ln.wf01.wt01.status,null,root.ln,BOOLEAN,PLAIN,LZ4,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,");
+ add(
+
"root.ln.wf02.wt01.status,null,root.ln,BOOLEAN,PLAIN,LZ4,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,");
+ }
+ });
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.ln.wf01.wt01(time, status) values(now(),
false)", "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv, "select * from root.ln.**", "Time,",
Collections.emptySet());
+ }
+ }
+
@Test
public void testAuthExclusion() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 1e27dbd6cce..5ba70f80890 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -528,12 +530,14 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
final Set<ConfigPhysicalPlanType> executionTypes =
PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
- final IoTDBPipePattern pattern =
- new
IoTDBPipePattern(parameters.get(ColumnHeaderConstant.PATH_PATTERN));
+ final List<PipePattern> pipePatterns =
+ PipePattern.parseMultiplePatterns(
+ parameters.get(ColumnHeaderConstant.PATH_PATTERN),
IoTDBPipePattern::new);
+ final PipePattern pipePattern =
PipePattern.buildUnionPattern(pipePatterns);
final List<TSStatus> results = new ArrayList<>();
while (generator.hasNext()) {
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
- .process(generator.next(), pattern)
+ .process(generator.next(), (UnionIoTDBPipePattern) pipePattern)
.filter(configPhysicalPlan ->
executionTypes.contains(configPhysicalPlan.getType()))
.ifPresent(
configPhysicalPlan ->
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitor.java
index 8ed9ffca21f..c8beb49181c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitor.java
@@ -22,7 +22,8 @@ package org.apache.iotdb.confignode.manager.pipe.source;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanVisitor;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
@@ -58,12 +59,12 @@ import java.util.stream.Stream;
/**
* The {@link PipeConfigPhysicalPlanPatternParseVisitor} will transform the
schema {@link
- * ConfigPhysicalPlan}s using {@link IoTDBPipePattern}. Rule:
+ * ConfigPhysicalPlan}s using {@link UnionIoTDBPipePattern}. Rule:
*
* <p>1. All patterns in the output {@link ConfigPhysicalPlan} will be the
intersection of the
- * original {@link ConfigPhysicalPlan}'s patterns and the given {@link
IoTDBPipePattern}.
+ * original {@link ConfigPhysicalPlan}'s patterns and the given {@link
UnionIoTDBPipePattern}.
*
- * <p>2. If a pattern does not intersect with the {@link IoTDBPipePattern},
it's dropped.
+ * <p>2. If a pattern does not intersect with the {@link
UnionIoTDBPipePattern}, it's dropped.
*
* <p>3. If all the patterns in the {@link ConfigPhysicalPlan} is dropped, the
{@link
* ConfigPhysicalPlan} is dropped.
@@ -72,13 +73,13 @@ import java.util.stream.Stream;
* one is used in the {@link PipeConfigRegionWritePlanEvent} in {@link
ConfigRegionListeningQueue}.
*/
public class PipeConfigPhysicalPlanPatternParseVisitor
- extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>,
IoTDBPipePattern> {
+ extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>,
UnionIoTDBPipePattern> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfigPhysicalPlanPatternParseVisitor.class);
@Override
public Optional<ConfigPhysicalPlan> visitPlan(
- final ConfigPhysicalPlan plan, final IoTDBPipePattern pattern) {
+ final ConfigPhysicalPlan plan, final UnionIoTDBPipePattern pattern) {
return Optional.of(plan);
}
@@ -90,7 +91,7 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
// Other matches using "matchPrefixPath" are with the same principle.
@Override
public Optional<ConfigPhysicalPlan> visitCreateDatabase(
- final DatabaseSchemaPlan createDatabasePlan, final IoTDBPipePattern
pattern) {
+ final DatabaseSchemaPlan createDatabasePlan, final UnionIoTDBPipePattern
pattern) {
return pattern.matchPrefixPath(createDatabasePlan.getSchema().getName())
? Optional.of(createDatabasePlan)
: Optional.empty();
@@ -98,7 +99,7 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitAlterDatabase(
- final DatabaseSchemaPlan alterDatabasePlan, final IoTDBPipePattern
pattern) {
+ final DatabaseSchemaPlan alterDatabasePlan, final UnionIoTDBPipePattern
pattern) {
return pattern.matchPrefixPath(alterDatabasePlan.getSchema().getName())
? Optional.of(alterDatabasePlan)
: Optional.empty();
@@ -106,7 +107,7 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitDeleteDatabase(
- final DeleteDatabasePlan deleteDatabasePlan, final IoTDBPipePattern
pattern) {
+ final DeleteDatabasePlan deleteDatabasePlan, final UnionIoTDBPipePattern
pattern) {
return pattern.matchPrefixPath(deleteDatabasePlan.getName())
? Optional.of(deleteDatabasePlan)
: Optional.empty();
@@ -114,7 +115,8 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitCreateSchemaTemplate(
- final CreateSchemaTemplatePlan createSchemaTemplatePlan, final
IoTDBPipePattern pattern) {
+ final CreateSchemaTemplatePlan createSchemaTemplatePlan,
+ final UnionIoTDBPipePattern pattern) {
// This is a deserialized template and can be arbitrarily altered
final Template template = createSchemaTemplatePlan.getTemplate();
template.getSchemaMap().keySet().removeIf(measurement ->
!pattern.matchTailNode(measurement));
@@ -126,7 +128,7 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitCommitSetSchemaTemplate(
final CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan,
- final IoTDBPipePattern pattern) {
+ final UnionIoTDBPipePattern pattern) {
return pattern.matchPrefixPath(commitSetSchemaTemplatePlan.getPath())
? Optional.of(commitSetSchemaTemplatePlan)
: Optional.empty();
@@ -135,7 +137,7 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitPipeUnsetSchemaTemplate(
final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlan,
- final IoTDBPipePattern pattern) {
+ final UnionIoTDBPipePattern pattern) {
return pattern.matchPrefixPath(pipeUnsetSchemaTemplatePlan.getPath())
? Optional.of(pipeUnsetSchemaTemplatePlan)
: Optional.empty();
@@ -143,7 +145,8 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitExtendSchemaTemplate(
- final ExtendSchemaTemplatePlan extendSchemaTemplatePlan, final
IoTDBPipePattern pattern) {
+ final ExtendSchemaTemplatePlan extendSchemaTemplatePlan,
+ final UnionIoTDBPipePattern pattern) {
final TemplateExtendInfo extendInfo =
extendSchemaTemplatePlan.getTemplateExtendInfo();
final int[] filteredIndexes =
IntStream.range(0, extendInfo.getMeasurements().size())
@@ -154,41 +157,39 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
new ExtendSchemaTemplatePlan(
new TemplateExtendInfo(
extendInfo.getTemplateName(),
- IoTDBPipePattern.applyIndexesOnList(
- filteredIndexes, extendInfo.getMeasurements()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
extendInfo.getDataTypes()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
extendInfo.getEncodings()),
- IoTDBPipePattern.applyIndexesOnList(
- filteredIndexes, extendInfo.getCompressors()))))
+ PipePattern.applyIndexesOnList(filteredIndexes,
extendInfo.getMeasurements()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
extendInfo.getDataTypes()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
extendInfo.getEncodings()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
extendInfo.getCompressors()))))
: Optional.empty();
}
@Override
public Optional<ConfigPhysicalPlan> visitGrantUser(
- final AuthorPlan grantUserPlan, final IoTDBPipePattern pattern) {
+ final AuthorPlan grantUserPlan, final UnionIoTDBPipePattern pattern) {
return visitPathRelatedAuthorPlan(grantUserPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitRevokeUser(
- final AuthorPlan revokeUserPlan, final IoTDBPipePattern pattern) {
+ final AuthorPlan revokeUserPlan, final UnionIoTDBPipePattern pattern) {
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitGrantRole(
- final AuthorPlan revokeUserPlan, final IoTDBPipePattern pattern) {
+ final AuthorPlan revokeUserPlan, final UnionIoTDBPipePattern pattern) {
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
}
@Override
public Optional<ConfigPhysicalPlan> visitRevokeRole(
- final AuthorPlan revokeUserPlan, final IoTDBPipePattern pattern) {
+ final AuthorPlan revokeUserPlan, final UnionIoTDBPipePattern pattern) {
return visitPathRelatedAuthorPlan(revokeUserPlan, pattern);
}
private Optional<ConfigPhysicalPlan> visitPathRelatedAuthorPlan(
- final AuthorPlan pathRelatedAuthorPlan, final IoTDBPipePattern pattern) {
+ final AuthorPlan pathRelatedAuthorPlan, final UnionIoTDBPipePattern
pattern) {
final List<PartialPath> intersectedPaths =
pathRelatedAuthorPlan.getNodeNameList().stream()
.map(pattern::getIntersection)
@@ -216,7 +217,8 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitPipeDeleteTimeSeries(
- final PipeDeleteTimeSeriesPlan pipeDeleteTimeSeriesPlan, final
IoTDBPipePattern pattern) {
+ final PipeDeleteTimeSeriesPlan pipeDeleteTimeSeriesPlan,
+ final UnionIoTDBPipePattern pattern) {
try {
final PathPatternTree intersectedTree =
pattern.getIntersection(
@@ -234,7 +236,8 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitPipeDeleteLogicalView(
- final PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan, final
IoTDBPipePattern pattern) {
+ final PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan,
+ final UnionIoTDBPipePattern pattern) {
try {
final PathPatternTree intersectedTree =
pattern.getIntersection(
@@ -252,7 +255,8 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitPipeDeactivateTemplate(
- final PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan, final
IoTDBPipePattern pattern) {
+ final PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan,
+ final UnionIoTDBPipePattern pattern) {
final Map<PartialPath, List<Template>> newTemplateSetInfo =
pipeDeactivateTemplatePlan.getTemplateSetInfo().entrySet().stream()
.flatMap(
@@ -275,7 +279,7 @@ public class PipeConfigPhysicalPlanPatternParseVisitor
@Override
public Optional<ConfigPhysicalPlan> visitTTL(
- final SetTTLPlan setTTLPlan, final IoTDBPipePattern pattern) {
+ final SetTTLPlan setTTLPlan, final UnionIoTDBPipePattern pattern) {
final PartialPath partialPath = new
PartialPath(setTTLPlan.getPathPattern());
final List<PartialPath> intersectionList =
pattern.matchPrefixPath(partialPath.getFullPath())
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
index ea14e12bd9e..18430fcb981 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
@@ -54,8 +55,15 @@ import java.util.List;
public class PipeConfigPhysicalPlanPatternParseVisitorTest {
- private final IoTDBPipePattern prefixPathPattern = new
IoTDBPipePattern("root.db.device.**");
- private final IoTDBPipePattern fullPathPattern = new
IoTDBPipePattern("root.db.device.s1");
+ private final UnionIoTDBPipePattern prefixPathPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.**"));
+ private final UnionIoTDBPipePattern fullPathPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.s1"));
+ private final UnionIoTDBPipePattern multiplePathPattern =
+ new UnionIoTDBPipePattern(
+ Arrays.asList(
+ new IoTDBPipePattern("root.db.device.s1"),
+ new IoTDBPipePattern("root.db.device.s2")));
@Test
public void testCreateDatabase() {
@@ -75,6 +83,11 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest {
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.visitCreateDatabase(createDatabasePlanToFilter, prefixPathPattern)
.isPresent());
+ Assert.assertEquals(
+ createDatabasePlan,
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitCreateDatabase(createDatabasePlan, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -95,6 +108,11 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest {
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.visitAlterDatabase(alterDatabasePlanToFilter, prefixPathPattern)
.isPresent());
+ Assert.assertEquals(
+ alterDatabasePlan,
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitAlterDatabase(alterDatabasePlan, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -111,6 +129,11 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.visitDeleteDatabase(deleteDatabasePlanToFilter, prefixPathPattern)
.isPresent());
+ Assert.assertEquals(
+ deleteDatabasePlan,
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitDeleteDatabase(deleteDatabasePlan, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -142,6 +165,15 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
Assert.assertEquals(
createSchemaTemplatePlan.getTemplate().getSchemaMap().get("s1"),
parsedTemplatePlan.getTemplate().getSchemaMap().get("s1"));
+
+ final CreateSchemaTemplatePlan parsedTemplatePlan2 =
+ (CreateSchemaTemplatePlan)
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitCreateSchemaTemplate(createSchemaTemplatePlan,
multiplePathPattern)
+ .orElseThrow(AssertionError::new);
+ Assert.assertEquals(
+ new HashSet<>(Arrays.asList("s1", "s2")),
+ parsedTemplatePlan2.getTemplate().getSchemaMap().keySet());
}
@Test
@@ -161,6 +193,11 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.visitCommitSetSchemaTemplate(setSchemaTemplatePlanOnFullPath,
fullPathPattern)
.orElseThrow(AssertionError::new));
+ Assert.assertEquals(
+ setSchemaTemplatePlanOnFullPath,
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitCommitSetSchemaTemplate(setSchemaTemplatePlanOnFullPath,
multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -180,6 +217,12 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
.visitPipeUnsetSchemaTemplate(pipeUnsetSchemaTemplatePlanOrFullPath,
fullPathPattern)
.orElseThrow(AssertionError::new));
+ Assert.assertEquals(
+ pipeUnsetSchemaTemplatePlanOrFullPath,
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitPipeUnsetSchemaTemplate(
+ pipeUnsetSchemaTemplatePlanOrFullPath, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -213,6 +256,14 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
Assert.assertEquals(
extendSchemaTemplatePlan.getTemplateExtendInfo().getCompressors().get(0),
parsedTemplatePlan.getTemplateExtendInfo().getCompressors().get(0));
+
+ final ExtendSchemaTemplatePlan parsedTemplatePlan2 =
+ (ExtendSchemaTemplatePlan)
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitExtendSchemaTemplate(extendSchemaTemplatePlan,
multiplePathPattern)
+ .orElseThrow(AssertionError::new);
+ Assert.assertEquals(
+ Arrays.asList("s1", "s2"),
parsedTemplatePlan2.getTemplateExtendInfo().getMeasurements());
}
@Test
@@ -320,6 +371,18 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
.orElseThrow(AssertionError::new))
.getPatternTreeBytes())
.getAllPathPatterns());
+
+ Assert.assertEquals(
+ Collections.singletonList(new PartialPath("root.db.device.s1")),
+ PathPatternTree.deserialize(
+ ((PipeDeleteTimeSeriesPlan)
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitPipeDeleteTimeSeries(
+ new
PipeDeleteTimeSeriesPlan(patternTree.serialize()),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new))
+ .getPatternTreeBytes())
+ .getAllPathPatterns());
}
@Test
@@ -339,6 +402,18 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
.orElseThrow(AssertionError::new))
.getPatternTreeBytes())
.getAllPathPatterns());
+
+ Assert.assertEquals(
+ Collections.singletonList(new PartialPath("root.db.device.s1")),
+ PathPatternTree.deserialize(
+ ((PipeDeleteLogicalViewPlan)
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .visitPipeDeleteLogicalView(
+ new
PipeDeleteLogicalViewPlan(patternTree.serialize()),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new))
+ .getPatternTreeBytes())
+ .getAllPathPatterns());
}
@Test
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index bf9e658f565..e5f1101e40c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.connection;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -157,7 +157,8 @@ public class PipeEventCollector implements EventCollector {
// Only used by events containing delete data node, no need to bind
progress index here since
// delete data event does not have progress index currently
IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
- .process(deleteDataEvent.getPlanNode(), (IoTDBPipePattern)
deleteDataEvent.getPipePattern())
+ .process(
+ deleteDataEvent.getPlanNode(), (UnionIoTDBPipePattern)
deleteDataEvent.getPipePattern())
.map(
planNode ->
new PipeSchemaRegionWritePlanEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
index 1050950d176..e8b29c72c40 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -21,8 +21,8 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.container;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
@@ -86,8 +86,8 @@ public class TsFileInsertionDataContainerProvider {
pipeName, creationTime, tsFile, pattern, startTime, endTime,
pipeTaskMeta, sourceEvent);
}
- if (pattern instanceof IoTDBPipePattern
- && !((IoTDBPipePattern)
pattern).mayMatchMultipleTimeSeriesInOneDevice()) {
+ if (pattern instanceof UnionIoTDBPipePattern
+ && !((UnionIoTDBPipePattern)
pattern).mayMatchMultipleTimeSeriesInOneDevice()) {
// If the pattern matches only one time series in one device, use query
container here
// because there is no timestamps merge overhead.
//
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8834cc495bf..5965d362564 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalExc
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -501,8 +503,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final Set<StatementType> executionTypes =
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
- final IoTDBPipePattern pattern =
- new
IoTDBPipePattern(parameters.get(ColumnHeaderConstant.PATH_PATTERN));
+ final List<PipePattern> pipePatterns =
+ PipePattern.parseMultiplePatterns(
+ parameters.get(ColumnHeaderConstant.PATH_PATTERN),
IoTDBPipePattern::new);
+ final PipePattern pipePattern =
PipePattern.buildUnionPattern(pipePatterns);
// Clear to avoid previous exceptions
batchVisitor.clear();
@@ -517,7 +521,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
// Here we apply the statements as many as possible
// Even if there are failed statements
STATEMENT_PATTERN_PARSE_VISITOR
- .process(originalStatement, pattern)
+ .process(originalStatement, (UnionIoTDBPipePattern) pipePattern)
.flatMap(parsedStatement -> batchVisitor.process(parsedStatement,
null))
.ifPresent(statement ->
results.add(executeStatementAndClassifyExceptions(statement)));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
index 75394e98cc5..c5d1dab6a10 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -36,12 +37,12 @@ import java.util.stream.IntStream;
/**
* The {@link PipeStatementPatternParseVisitor} will transform the schema
{@link Statement}s using
- * {@link IoTDBPipePattern}. Rule:
+ * {@link UnionIoTDBPipePattern}. Rule:
*
* <p>1. All patterns in the output {@link Statement} will be the intersection
of the original
- * {@link Statement}'s patterns and the given {@link IoTDBPipePattern}.
+ * {@link Statement}'s patterns and the given {@link UnionIoTDBPipePattern}.
*
- * <p>2. If a pattern does not intersect with the {@link IoTDBPipePattern},
it's dropped.
+ * <p>2. If a pattern does not intersect with the {@link
UnionIoTDBPipePattern}, it's dropped.
*
* <p>3. If all the patterns in the {@link Statement} is dropped, the {@link
Statement} is dropped.
*
@@ -49,16 +50,16 @@ import java.util.stream.IntStream;
* from the {@link SRStatementGenerator} and will no longer be used.
*/
public class PipeStatementPatternParseVisitor
- extends StatementVisitor<Optional<Statement>, IoTDBPipePattern> {
+ extends StatementVisitor<Optional<Statement>, UnionIoTDBPipePattern> {
@Override
public Optional<Statement> visitNode(
- final StatementNode statement, final IoTDBPipePattern pattern) {
+ final StatementNode statement, final UnionIoTDBPipePattern pattern) {
return Optional.of((Statement) statement);
}
@Override
public Optional<Statement> visitCreateTimeseries(
- final CreateTimeSeriesStatement statement, final IoTDBPipePattern
pattern) {
+ final CreateTimeSeriesStatement statement, final UnionIoTDBPipePattern
pattern) {
return pattern.matchesMeasurement(
statement.getPath().getDevice(),
statement.getPath().getMeasurement())
? Optional.of(statement)
@@ -67,7 +68,7 @@ public class PipeStatementPatternParseVisitor
@Override
public Optional<Statement> visitCreateAlignedTimeseries(
- final CreateAlignedTimeSeriesStatement statement, final IoTDBPipePattern
pattern) {
+ final CreateAlignedTimeSeriesStatement statement, final
UnionIoTDBPipePattern pattern) {
final int[] filteredIndexes =
IntStream.range(0, statement.getMeasurements().size())
.filter(
@@ -107,7 +108,8 @@ public class PipeStatementPatternParseVisitor
// For logical view with tags/attributes
@Override
public Optional<Statement> visitAlterTimeSeries(
- final AlterTimeSeriesStatement alterTimeSeriesStatement, final
IoTDBPipePattern pattern) {
+ final AlterTimeSeriesStatement alterTimeSeriesStatement,
+ final UnionIoTDBPipePattern pattern) {
return pattern.matchesMeasurement(
alterTimeSeriesStatement.getPath().getDevice(),
alterTimeSeriesStatement.getPath().getMeasurement())
@@ -117,7 +119,8 @@ public class PipeStatementPatternParseVisitor
@Override
public Optional<Statement> visitActivateTemplate(
- final ActivateTemplateStatement activateTemplateStatement, final
IoTDBPipePattern pattern) {
+ final ActivateTemplateStatement activateTemplateStatement,
+ final UnionIoTDBPipePattern pattern) {
return
pattern.matchDevice(activateTemplateStatement.getPath().getFullPath())
? Optional.of(activateTemplateStatement)
: Optional.empty();
@@ -125,7 +128,8 @@ public class PipeStatementPatternParseVisitor
@Override
public Optional<Statement> visitCreateLogicalView(
- final CreateLogicalViewStatement createLogicalViewStatement, final
IoTDBPipePattern pattern) {
+ final CreateLogicalViewStatement createLogicalViewStatement,
+ final UnionIoTDBPipePattern pattern) {
final int[] filteredIndexes =
IntStream.range(0,
createLogicalViewStatement.getTargetPathList().size())
.filter(
@@ -138,10 +142,10 @@ public class PipeStatementPatternParseVisitor
return Optional.empty();
}
createLogicalViewStatement.setTargetFullPaths(
- IoTDBPipePattern.applyIndexesOnList(
+ PipePattern.applyIndexesOnList(
filteredIndexes, createLogicalViewStatement.getTargetPathList()));
createLogicalViewStatement.setViewExpressions(
- IoTDBPipePattern.applyIndexesOnList(
+ PipePattern.applyIndexesOnList(
filteredIndexes, createLogicalViewStatement.getViewExpressions()));
return Optional.of(createLogicalViewStatement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index aa046bb88e6..e7ff984358a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.db.pipe.source.dataregion;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.commons.pipe.source.IoTDBSource;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -231,9 +231,8 @@ public class IoTDBDataRegionSource extends IoTDBSource {
}
if (shouldExtractDeletion
- && !(pattern instanceof IoTDBPipePattern
- && (((IoTDBPipePattern) pattern).isPrefix()
- || ((IoTDBPipePattern) pattern).isFullPath()))) {
+ && !(pattern instanceof UnionIoTDBPipePattern
+ && (((UnionIoTDBPipePattern) pattern).isPrefixOrFullPath()))) {
throw new IllegalArgumentException(
String.format(
"The path pattern %s is not valid for the source. Only prefix or
full path is allowed.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/PipePlanPatternParseVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/PipePlanPatternParseVisitor.java
index 80b0302a4fb..edfe437b44f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/PipePlanPatternParseVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/PipePlanPatternParseVisitor.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.db.pipe.source.schemaregion;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -52,27 +53,28 @@ import java.util.stream.IntStream;
/**
* The {@link PipePlanPatternParseVisitor} will transform the schema {@link
PlanNode}s using {@link
- * IoTDBPipePattern}. Rule:
+ * UnionIoTDBPipePattern}. Rule:
*
* <p>1. All patterns in the output {@link PlanNode} will be the intersection
of the original {@link
- * PlanNode}'s patterns and the given {@link IoTDBPipePattern}.
+ * PlanNode}'s patterns and the given {@link UnionIoTDBPipePattern}.
*
- * <p>2. If a pattern does not intersect with the {@link IoTDBPipePattern},
it's dropped.
+ * <p>2. If a pattern does not intersect with the {@link
UnionIoTDBPipePattern}, it's dropped.
*
* <p>3. If all the patterns in the {@link PlanNode} is dropped, the {@link
PlanNode} is dropped.
*
* <p>4. The output {@link PlanNode} shall be a copied form of the original
one because the original
* one is used in the {@link PipeSchemaRegionWritePlanEvent} in {@link
SchemaRegionListeningQueue}.
*/
-public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>, IoTDBPipePattern> {
+public class PipePlanPatternParseVisitor
+ extends PlanVisitor<Optional<PlanNode>, UnionIoTDBPipePattern> {
@Override
- public Optional<PlanNode> visitPlan(final PlanNode node, final
IoTDBPipePattern pattern) {
+ public Optional<PlanNode> visitPlan(final PlanNode node, final
UnionIoTDBPipePattern pattern) {
return Optional.of(node);
}
@Override
public Optional<PlanNode> visitCreateTimeSeries(
- final CreateTimeSeriesNode node, final IoTDBPipePattern pattern) {
+ final CreateTimeSeriesNode node, final UnionIoTDBPipePattern pattern) {
return pattern.matchesMeasurement(node.getPath().getDevice(),
node.getPath().getMeasurement())
? Optional.of(node)
: Optional.empty();
@@ -80,7 +82,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitCreateAlignedTimeSeries(
- final CreateAlignedTimeSeriesNode node, final IoTDBPipePattern pattern) {
+ final CreateAlignedTimeSeriesNode node, final UnionIoTDBPipePattern
pattern) {
final int[] filteredIndexes =
IntStream.range(0, node.getMeasurements().size())
.filter(
@@ -93,19 +95,19 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
new CreateAlignedTimeSeriesNode(
node.getPlanNodeId(),
node.getDevicePath(),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
node.getMeasurements()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
node.getDataTypes()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
node.getEncodings()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
node.getCompressors()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
node.getAliasList()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
node.getTagsList()),
- IoTDBPipePattern.applyIndexesOnList(filteredIndexes,
node.getAttributesList())))
+ PipePattern.applyIndexesOnList(filteredIndexes,
node.getMeasurements()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
node.getDataTypes()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
node.getEncodings()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
node.getCompressors()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
node.getAliasList()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
node.getTagsList()),
+ PipePattern.applyIndexesOnList(filteredIndexes,
node.getAttributesList())))
: Optional.empty();
}
@Override
public Optional<PlanNode> visitCreateMultiTimeSeries(
- final CreateMultiTimeSeriesNode node, final IoTDBPipePattern pattern) {
+ final CreateMultiTimeSeriesNode node, final UnionIoTDBPipePattern
pattern) {
final Map<PartialPath, MeasurementGroup> filteredMeasurementGroupMap =
node.getMeasurementGroupMap().entrySet().stream()
.filter(entry ->
pattern.matchPrefixPath(entry.getKey().getFullPath()))
@@ -124,7 +126,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
}
private static MeasurementGroup trimMeasurementGroup(
- final String device, final MeasurementGroup group, final
IoTDBPipePattern pattern) {
+ final String device, final MeasurementGroup group, final
UnionIoTDBPipePattern pattern) {
final int[] filteredIndexes =
IntStream.range(0, group.size())
.filter(index -> pattern.matchesMeasurement(device,
group.getMeasurements().get(index)))
@@ -159,7 +161,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitAlterTimeSeries(
- final AlterTimeSeriesNode node, final IoTDBPipePattern pattern) {
+ final AlterTimeSeriesNode node, final UnionIoTDBPipePattern pattern) {
return pattern.matchesMeasurement(node.getPath().getDevice(),
node.getPath().getMeasurement())
? Optional.of(node)
: Optional.empty();
@@ -167,7 +169,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitInternalCreateTimeSeries(
- final InternalCreateTimeSeriesNode node, final IoTDBPipePattern pattern)
{
+ final InternalCreateTimeSeriesNode node, final UnionIoTDBPipePattern
pattern) {
final MeasurementGroup group =
pattern.matchPrefixPath(node.getDevicePath().getFullPath())
? trimMeasurementGroup(
@@ -182,7 +184,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitActivateTemplate(
- final ActivateTemplateNode node, final IoTDBPipePattern pattern) {
+ final ActivateTemplateNode node, final UnionIoTDBPipePattern pattern) {
return pattern.matchDevice(node.getActivatePath().getFullPath())
? Optional.of(node)
: Optional.empty();
@@ -190,7 +192,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitInternalBatchActivateTemplate(
- final InternalBatchActivateTemplateNode node, final IoTDBPipePattern
pattern) {
+ final InternalBatchActivateTemplateNode node, final
UnionIoTDBPipePattern pattern) {
final Map<PartialPath, Pair<Integer, Integer>>
filteredTemplateActivationMap =
node.getTemplateActivationMap().entrySet().stream()
.filter(entry -> pattern.matchDevice(entry.getKey().getFullPath()))
@@ -204,7 +206,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitInternalCreateMultiTimeSeries(
- final InternalCreateMultiTimeSeriesNode node, final IoTDBPipePattern
pattern) {
+ final InternalCreateMultiTimeSeriesNode node, final
UnionIoTDBPipePattern pattern) {
final Map<PartialPath, Pair<Boolean, MeasurementGroup>> filteredDeviceMap =
node.getDeviceMap().entrySet().stream()
.filter(entry ->
pattern.matchPrefixPath(entry.getKey().getFullPath()))
@@ -228,7 +230,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitBatchActivateTemplate(
- final BatchActivateTemplateNode node, final IoTDBPipePattern pattern) {
+ final BatchActivateTemplateNode node, final UnionIoTDBPipePattern
pattern) {
final Map<PartialPath, Pair<Integer, Integer>>
filteredTemplateActivationMap =
node.getTemplateActivationMap().entrySet().stream()
.filter(entry -> pattern.matchDevice(entry.getKey().getFullPath()))
@@ -241,7 +243,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitCreateLogicalView(
- final CreateLogicalViewNode node, final IoTDBPipePattern pattern) {
+ final CreateLogicalViewNode node, final UnionIoTDBPipePattern pattern) {
final Map<PartialPath, ViewExpression> filteredViewPathToSourceMap =
node.getViewPathToSourceExpressionMap().entrySet().stream()
.filter(
@@ -256,7 +258,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitAlterLogicalView(
- final AlterLogicalViewNode node, final IoTDBPipePattern pattern) {
+ final AlterLogicalViewNode node, final UnionIoTDBPipePattern pattern) {
final Map<PartialPath, ViewExpression> filteredViewPathToSourceMap =
node.getViewPathToSourceMap().entrySet().stream()
.filter(
@@ -271,7 +273,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
@Override
public Optional<PlanNode> visitDeleteData(
- final DeleteDataNode node, final IoTDBPipePattern pattern) {
+ final DeleteDataNode node, final UnionIoTDBPipePattern pattern) {
final List<PartialPath> intersectedPaths =
node.getPathList().stream()
.map(pattern::getIntersection)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementPatternParseVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementPatternParseVisitorTest.java
index a5dcc34c056..04d3696b2ad 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementPatternParseVisitorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementPatternParseVisitorTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import
org.apache.iotdb.commons.schema.view.viewExpression.leaf.TimeSeriesViewOperand;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -42,8 +43,15 @@ import java.util.Map;
public class PipeStatementPatternParseVisitorTest {
- private final IoTDBPipePattern prefixPathPattern = new
IoTDBPipePattern("root.db.device.**");
- private final IoTDBPipePattern fullPathPattern = new
IoTDBPipePattern("root.db.device.s1");
+ private final UnionIoTDBPipePattern prefixPathPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.**"));
+ private final UnionIoTDBPipePattern fullPathPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.s1"));
+ private final UnionIoTDBPipePattern multiplePathPattern =
+ new UnionIoTDBPipePattern(
+ Arrays.asList(
+ new IoTDBPipePattern("root.db.device.s1"),
+ new IoTDBPipePattern("root.db.device.s2")));
@Test
public void testCreateTimeSeries() throws IllegalPathException {
@@ -77,6 +85,15 @@ public class PipeStatementPatternParseVisitorTest {
new PipeStatementPatternParseVisitor()
.visitCreateTimeseries(createTimeSeriesStatementToFilter,
prefixPathPattern)
.isPresent());
+ Assert.assertEquals(
+ createTimeSeriesStatement,
+ new PipeStatementPatternParseVisitor()
+ .visitCreateTimeseries(createTimeSeriesStatement,
multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ new PipeStatementPatternParseVisitor()
+ .visitCreateTimeseries(createTimeSeriesStatementToFilter,
multiplePathPattern)
+ .isPresent());
}
@Test
@@ -118,6 +135,13 @@ public class PipeStatementPatternParseVisitorTest {
new PipeStatementPatternParseVisitor()
.visitCreateAlignedTimeseries(originalCreateAlignedTimeSeriesStatement,
fullPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ originalCreateAlignedTimeSeriesStatement,
+ new PipeStatementPatternParseVisitor()
+ .visitCreateAlignedTimeseries(
+ originalCreateAlignedTimeSeriesStatement, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -149,6 +173,15 @@ public class PipeStatementPatternParseVisitorTest {
new PipeStatementPatternParseVisitor()
.visitAlterTimeSeries(alterTimeSeriesStatementToFilter,
prefixPathPattern)
.isPresent());
+ Assert.assertEquals(
+ alterTimeSeriesStatement,
+ new PipeStatementPatternParseVisitor()
+ .visitAlterTimeSeries(alterTimeSeriesStatement,
multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ new PipeStatementPatternParseVisitor()
+ .visitAlterTimeSeries(alterTimeSeriesStatementToFilter,
multiplePathPattern)
+ .isPresent());
}
@Test
@@ -167,6 +200,15 @@ public class PipeStatementPatternParseVisitorTest {
new PipeStatementPatternParseVisitor()
.visitActivateTemplate(activateTemplateStatementToFilter,
prefixPathPattern)
.isPresent());
+ Assert.assertEquals(
+ activateTemplateStatement,
+ new PipeStatementPatternParseVisitor()
+ .visitActivateTemplate(activateTemplateStatement,
multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ new PipeStatementPatternParseVisitor()
+ .visitActivateTemplate(activateTemplateStatementToFilter,
multiplePathPattern)
+ .isPresent());
}
@Test
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
index dd0558bc238..fd1853fca1b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
@@ -82,6 +82,8 @@ public class IoTDBDataRegionSourceTest {
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.`a-b`"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1"));
+
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b"));
+
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b,root.db1.`a,b`.**"));
}
public Exception testIoTDBDataRegionExtractorWithPattern(final String
pattern) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanPatternParseVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanPatternParseVisitorTest.java
index af68f813066..86c3d134376 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanPatternParseVisitorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanPatternParseVisitorTest.java
@@ -20,8 +20,10 @@
package org.apache.iotdb.db.pipe.source;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import
org.apache.iotdb.commons.schema.view.viewExpression.leaf.TimeSeriesViewOperand;
import org.apache.iotdb.db.pipe.source.schemaregion.IoTDBSchemaRegionSource;
@@ -48,15 +50,22 @@ import org.apache.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class PipePlanPatternParseVisitorTest {
- private final IoTDBPipePattern prefixPathPattern = new
IoTDBPipePattern("root.db.device.**");
- private final IoTDBPipePattern fullPathPattern = new
IoTDBPipePattern("root.db.device.s1");
+
+ private final UnionIoTDBPipePattern prefixPathPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.**"));
+ private final UnionIoTDBPipePattern fullPathPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.db.device.s1"));
+ private final UnionIoTDBPipePattern multiplePathPattern =
+ new UnionIoTDBPipePattern(
+ Arrays.asList(
+ new IoTDBPipePattern("root.db.device.s1"),
+ new IoTDBPipePattern("root.db.device.s2")));
@Test
public void testCreateTimeSeries() throws IllegalPathException {
@@ -82,6 +91,17 @@ public class PipePlanPatternParseVisitorTest {
Collections.emptyMap(),
Collections.emptyMap(),
"a1");
+ final CreateTimeSeriesNode createTimeSeriesNode2 =
+ new CreateTimeSeriesNode(
+ new PlanNodeId("2024-04-30-3"),
+ new MeasurementPath("root.db.device.s2"),
+ TSDataType.FLOAT,
+ TSEncoding.RLE,
+ CompressionType.SNAPPY,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ "a2");
Assert.assertEquals(
createTimeSeriesNode,
@@ -92,6 +112,21 @@ public class PipePlanPatternParseVisitorTest {
IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
.visitCreateTimeSeries(createTimeSeriesNodeToFilter,
prefixPathPattern)
.isPresent());
+
+ Assert.assertEquals(
+ createTimeSeriesNode,
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitCreateTimeSeries(createTimeSeriesNode, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertEquals(
+ createTimeSeriesNode2,
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitCreateTimeSeries(createTimeSeriesNode2, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitCreateTimeSeries(createTimeSeriesNodeToFilter,
multiplePathPattern)
+ .isPresent());
}
@Test
@@ -121,6 +156,35 @@ public class PipePlanPatternParseVisitorTest {
Arrays.asList(Collections.emptyMap(),
Collections.emptyMap())),
fullPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ new PartialPath("root.db.device"),
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN),
+ Arrays.asList(TSEncoding.RLE, TSEncoding.PLAIN),
+ Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY),
+ Arrays.asList("a1", "a2"),
+ Arrays.asList(Collections.emptyMap(), Collections.emptyMap()),
+ Arrays.asList(Collections.emptyMap(), Collections.emptyMap())),
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitCreateAlignedTimeSeries(
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ new PartialPath("root.db.device"),
+ Arrays.asList("s1", "s2", "s3"),
+ Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN,
TSDataType.INT32),
+ Arrays.asList(TSEncoding.RLE, TSEncoding.PLAIN,
TSEncoding.RLE),
+ Arrays.asList(
+ CompressionType.SNAPPY, CompressionType.SNAPPY,
CompressionType.SNAPPY),
+ Arrays.asList("a1", "a2", "a3"),
+ Arrays.asList(
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap()),
+ Arrays.asList(
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap())),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -152,6 +216,40 @@ public class PipePlanPatternParseVisitorTest {
Arrays.asList(Collections.emptyMap(),
Collections.emptyMap())),
fullPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ new CreateMultiTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ Arrays.asList(
+ new MeasurementPath("root.db.device.s1"), new
MeasurementPath("root.db.device.s2")),
+ Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN),
+ Arrays.asList(TSEncoding.RLE, TSEncoding.PLAIN),
+ Arrays.asList(CompressionType.SNAPPY, CompressionType.SNAPPY),
+ Arrays.asList(Collections.emptyMap(), Collections.emptyMap()),
+ Arrays.asList("a1", "a2"),
+ Arrays.asList(Collections.emptyMap(), Collections.emptyMap()),
+ Arrays.asList(Collections.emptyMap(), Collections.emptyMap())),
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitCreateMultiTimeSeries(
+ new CreateMultiTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ Arrays.asList(
+ new MeasurementPath("root.db.device.s1"),
+ new MeasurementPath("root.db.device.s2"),
+ new MeasurementPath("root.db1.device.s1")),
+ Arrays.asList(TSDataType.FLOAT, TSDataType.BOOLEAN,
TSDataType.INT32),
+ Arrays.asList(TSEncoding.RLE, TSEncoding.PLAIN,
TSEncoding.RLE),
+ Arrays.asList(
+ CompressionType.SNAPPY, CompressionType.SNAPPY,
CompressionType.SNAPPY),
+ Arrays.asList(
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap()),
+ Arrays.asList("a1", "a2", "a3"),
+ Arrays.asList(
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap()),
+ Arrays.asList(
+ Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap())),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -177,6 +275,16 @@ public class PipePlanPatternParseVisitorTest {
Collections.emptyMap(),
attributesMap,
false);
+ final AlterTimeSeriesNode alterTimeSeriesNode2 =
+ new AlterTimeSeriesNode(
+ new PlanNodeId("2024-04-30-3"),
+ new MeasurementPath("root.db.device.s2"),
+ AlterTimeSeriesStatement.AlterType.ADD_ATTRIBUTES,
+ attributesMap,
+ "",
+ Collections.emptyMap(),
+ attributesMap,
+ false);
Assert.assertEquals(
alterTimeSeriesNode,
@@ -187,6 +295,21 @@ public class PipePlanPatternParseVisitorTest {
IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
.visitAlterTimeSeries(alterTimeSeriesNodeToFilter,
prefixPathPattern)
.isPresent());
+
+ Assert.assertEquals(
+ alterTimeSeriesNode,
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitAlterTimeSeries(alterTimeSeriesNode, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertEquals(
+ alterTimeSeriesNode2,
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitAlterTimeSeries(alterTimeSeriesNode2, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitAlterTimeSeries(alterTimeSeriesNodeToFilter,
multiplePathPattern)
+ .isPresent());
}
@Test
@@ -199,12 +322,13 @@ public class PipePlanPatternParseVisitorTest {
expectedMeasurementGroup.addTags(Collections.emptyMap());
expectedMeasurementGroup.addAttributes(Collections.emptyMap());
- final ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
- expectedMeasurementGroup.serialize(byteBuffer);
- byteBuffer.flip();
final MeasurementGroup originalMeasurementGroup = new MeasurementGroup();
- originalMeasurementGroup.deserialize(byteBuffer);
-
+ originalMeasurementGroup.addMeasurement(
+ "s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
+ originalMeasurementGroup.addProps(Collections.emptyMap());
+ originalMeasurementGroup.addAlias("a1");
+ originalMeasurementGroup.addTags(Collections.emptyMap());
+ originalMeasurementGroup.addAttributes(Collections.emptyMap());
originalMeasurementGroup.addMeasurement(
"s2", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
originalMeasurementGroup.addProps(Collections.emptyMap());
@@ -227,6 +351,22 @@ public class PipePlanPatternParseVisitorTest {
true),
fullPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ new InternalCreateTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ new PartialPath("root.db.device"),
+ originalMeasurementGroup,
+ true),
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitInternalCreateTimeSeries(
+ new InternalCreateTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ new PartialPath("root.db.device"),
+ originalMeasurementGroup,
+ true),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -246,6 +386,16 @@ public class PipePlanPatternParseVisitorTest {
IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
.visitActivateTemplate(activateTemplateNodeToFilter,
prefixPathPattern)
.isPresent());
+
+ Assert.assertEquals(
+ activateTemplateNode,
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitActivateTemplate(activateTemplateNode, multiplePathPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitActivateTemplate(activateTemplateNodeToFilter,
multiplePathPattern)
+ .isPresent());
}
@Test
@@ -266,6 +416,23 @@ public class PipePlanPatternParseVisitorTest {
}),
fullPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ new InternalBatchActivateTemplateNode(
+ new PlanNodeId("2024-04-30-2"),
+ Collections.singletonMap(new PartialPath("root.db.device"), new
Pair<>(1, 1))),
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitInternalBatchActivateTemplate(
+ new InternalBatchActivateTemplateNode(
+ new PlanNodeId("2024-04-30-2"),
+ new HashMap<PartialPath, Pair<Integer, Integer>>() {
+ {
+ put(new PartialPath("root.db.device"), new Pair<>(1,
1));
+ put(new PartialPath("root.db1"), new Pair<>(2, 2));
+ }
+ }),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -278,12 +445,13 @@ public class PipePlanPatternParseVisitorTest {
expectedMeasurementGroup.addTags(Collections.emptyMap());
expectedMeasurementGroup.addAttributes(Collections.emptyMap());
- final ByteBuffer byteBuffer = ByteBuffer.allocate(1000);
- expectedMeasurementGroup.serialize(byteBuffer);
- byteBuffer.flip();
final MeasurementGroup originalMeasurementGroup = new MeasurementGroup();
- originalMeasurementGroup.deserialize(byteBuffer);
-
+ originalMeasurementGroup.addMeasurement(
+ "s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY);
+ originalMeasurementGroup.addProps(Collections.emptyMap());
+ originalMeasurementGroup.addAlias("a1");
+ originalMeasurementGroup.addTags(Collections.emptyMap());
+ originalMeasurementGroup.addAttributes(Collections.emptyMap());
originalMeasurementGroup.addMeasurement(
"s2", TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
originalMeasurementGroup.addProps(Collections.emptyMap());
@@ -291,6 +459,10 @@ public class PipePlanPatternParseVisitorTest {
originalMeasurementGroup.addTags(Collections.emptyMap());
originalMeasurementGroup.addAttributes(Collections.emptyMap());
+ final MeasurementGroup anotherMeasurementGroup = new MeasurementGroup();
+ anotherMeasurementGroup.addMeasurement(
+ "s3", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY);
+
Assert.assertEquals(
new InternalCreateMultiTimeSeriesNode(
new PlanNodeId("2024-04-30-1"),
@@ -312,6 +484,28 @@ public class PipePlanPatternParseVisitorTest {
}),
fullPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ new InternalCreateMultiTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ Collections.singletonMap(
+ new PartialPath("root.db.device"), new Pair<>(false,
originalMeasurementGroup))),
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitInternalCreateMultiTimeSeries(
+ new InternalCreateMultiTimeSeriesNode(
+ new PlanNodeId("2024-04-30-2"),
+ new HashMap<PartialPath, Pair<Boolean,
MeasurementGroup>>() {
+ {
+ put(
+ new PartialPath("root.db.device"),
+ new Pair<>(false, originalMeasurementGroup));
+ put(
+ new PartialPath("root.db1.device"),
+ new Pair<>(false, anotherMeasurementGroup));
+ }
+ }),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -332,6 +526,23 @@ public class PipePlanPatternParseVisitorTest {
}),
fullPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ new BatchActivateTemplateNode(
+ new PlanNodeId("2024-04-30-2"),
+ Collections.singletonMap(new PartialPath("root.db.device"), new
Pair<>(1, 1))),
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitBatchActivateTemplate(
+ new BatchActivateTemplateNode(
+ new PlanNodeId("2024-04-30-2"),
+ new HashMap<PartialPath, Pair<Integer, Integer>>() {
+ {
+ put(new PartialPath("root.db.device"), new Pair<>(1,
1));
+ put(new PartialPath("root.db"), new Pair<>(2, 2));
+ }
+ }),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
@Test
@@ -402,5 +613,25 @@ public class PipePlanPatternParseVisitorTest {
Long.MAX_VALUE),
prefixPathPattern)
.orElseThrow(AssertionError::new));
+
+ Assert.assertEquals(
+ new DeleteDataNode(
+ new PlanNodeId("2024-04-30-2"),
+ Arrays.asList(
+ new MeasurementPath("root.db.device.s1"), new
MeasurementPath("root.db.device.s2")),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE),
+ IoTDBSchemaRegionSource.PATTERN_PARSE_VISITOR
+ .visitDeleteData(
+ new DeleteDataNode(
+ new PlanNodeId("2024-04-30-2"),
+ Arrays.asList(
+ new MeasurementPath("root.db.device.s1"),
+ new MeasurementPath("root.db.device.s2"),
+ new MeasurementPath("root.db.device.s3")),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE),
+ multiplePathPattern)
+ .orElseThrow(AssertionError::new));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
index 9ed5837e625..b8bfc21e099 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
@@ -32,9 +32,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
-public class IoTDBPipePattern extends PipePattern {
+public class IoTDBPipePattern extends SinglePipePattern {
private final PartialPath patternPartialPath;
private static volatile DevicePathGetter devicePathGetter = PartialPath::new;
@@ -50,13 +49,6 @@ public class IoTDBPipePattern extends PipePattern {
}
}
- public static <T> List<T> applyIndexesOnList(
- final int[] filteredIndexes, final List<T> originalList) {
- return Objects.nonNull(originalList)
- ?
Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList())
- : null;
- }
-
@Override
public String getDefaultPattern() {
return PipeSourceConstant.EXTRACTOR_PATTERN_IOTDB_DEFAULT_VALUE;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
index 80275ea3a92..ba78f87bbef 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
@@ -24,7 +24,13 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
@@ -39,67 +45,144 @@ public abstract class PipePattern {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipePattern.class);
- protected final String pattern;
-
- protected PipePattern(final String pattern) {
- this.pattern = pattern != null ? pattern : getDefaultPattern();
- }
-
- public String getPattern() {
- return pattern;
- }
-
- public boolean isRoot() {
- return Objects.isNull(pattern) ||
this.pattern.equals(this.getDefaultPattern());
+ public static <T> List<T> applyIndexesOnList(
+ final int[] filteredIndexes, final List<T> originalList) {
+ return Objects.nonNull(originalList)
+ ?
Arrays.stream(filteredIndexes).mapToObj(originalList::get).collect(Collectors.toList())
+ : null;
}
/**
- * Interpret from source parameters and get a pipe pattern.
+ * Interpret from source parameters and get a {@link PipePattern}.
*
- * @return The interpreted {@link PipePattern} which is not null.
+ * @return The interpreted {@link PipePattern} which is not {@code null}.
*/
public static PipePattern parsePipePatternFromSourceParameters(
final PipeParameters sourceParameters) {
final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY,
SOURCE_PATH_KEY);
+ final String pattern =
+ sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
- // 1. If "source.path" is specified, it will be interpreted as an
IoTDB-style path,
- // ignoring the other 2 parameters.
- if (path != null) {
- return new IoTDBPipePattern(path);
+ // 1. If both "source.path" and "source.pattern" are specified, their
union will be used.
+ if (path != null && pattern != null) {
+ final List<PipePattern> result = new ArrayList<>();
+ // Parse "source.path" as IoTDB-style path.
+ result.addAll(parseMultiplePatterns(path, IoTDBPipePattern::new));
+ // Parse "source.pattern" using the helper method.
+ result.addAll(parsePatternsFromPatternParameter(pattern,
sourceParameters));
+ return buildUnionPattern(result);
}
- final String pattern =
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
+ // 2. If only "source.path" is specified, it will be interpreted as an
IoTDB-style path.
+ if (path != null) {
+ return buildUnionPattern(parseMultiplePatterns(path,
IoTDBPipePattern::new));
+ }
- // 2. Otherwise, If "source.pattern" is specified, it will be interpreted
- // according to "source.pattern.format".
+ // 3. If only "source.pattern" is specified, parse it using the helper
method.
if (pattern != null) {
- final String patternFormat =
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY,
SOURCE_PATTERN_FORMAT_KEY);
+ return buildUnionPattern(parsePatternsFromPatternParameter(pattern,
sourceParameters));
+ }
+
+ // 4. If neither "source.path" nor "source.pattern" is specified,
+ // this pipe source will match all data.
+ return buildUnionPattern(Collections.singletonList(new
IoTDBPipePattern(null)));
+ }
+
+ /**
+ * A private helper method to parse a list of {@link PipePattern}s from the
"pattern" parameter,
+ * considering its "format".
+ *
+ * @param pattern The pattern string to parse.
+ * @param sourceParameters The source parameters to read the format from.
+ * @return A list of parsed {@link PipePattern}s.
+ */
+ private static List<PipePattern> parsePatternsFromPatternParameter(
+ final String pattern, final PipeParameters sourceParameters) {
+ final String patternFormat =
+ sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY,
SOURCE_PATTERN_FORMAT_KEY);
+
+ // If "source.pattern.format" is not specified, use prefix format by
default.
+ if (patternFormat == null) {
+ return parseMultiplePatterns(pattern, PrefixPipePattern::new);
+ }
+
+ switch (patternFormat.toLowerCase()) {
+ case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
+ return parseMultiplePatterns(pattern, IoTDBPipePattern::new);
+ case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
+ return parseMultiplePatterns(pattern, PrefixPipePattern::new);
+ default:
+ LOGGER.info(
+ "Unknown pattern format: {}, use prefix matching format by
default.", patternFormat);
+ return parseMultiplePatterns(pattern, PrefixPipePattern::new);
+ }
+ }
+
+ public static List<PipePattern> parseMultiplePatterns(
+ final String pattern, final Function<String, PipePattern>
patternSupplier) {
+ if (pattern.isEmpty()) {
+ return Collections.singletonList(patternSupplier.apply(pattern));
+ }
- // If "source.pattern.format" is not specified, use prefix format by
default.
- if (patternFormat == null) {
- return new PrefixPipePattern(pattern);
+ final List<PipePattern> patterns = new ArrayList<>();
+ final StringBuilder currentPattern = new StringBuilder();
+ boolean inBackticks = false;
+
+ for (final char c : pattern.toCharArray()) {
+ if (c == '`') {
+ inBackticks = !inBackticks;
+ currentPattern.append(c);
+ } else if (c == ',' && !inBackticks) {
+ final String singlePattern = currentPattern.toString().trim();
+ if (!singlePattern.isEmpty()) {
+ patterns.add(patternSupplier.apply(singlePattern));
+ }
+ currentPattern.setLength(0);
+ } else {
+ currentPattern.append(c);
}
+ }
+
+ final String lastPattern = currentPattern.toString().trim();
+ if (!lastPattern.isEmpty()) {
+ patterns.add(patternSupplier.apply(lastPattern));
+ }
+
+ return patterns;
+ }
- switch (patternFormat.toLowerCase()) {
- case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
- return new IoTDBPipePattern(pattern);
- case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
- return new PrefixPipePattern(pattern);
- default:
- LOGGER.info(
- "Unknown pattern format: {}, use prefix matching format by
default.", patternFormat);
- return new PrefixPipePattern(pattern);
+ /**
+ * A private helper method to build the most specific UnionPipePattern
possible. If all patterns
+ * are IoTDBPipePattern, it returns an IoTDBUnionPipePattern. Otherwise, it
returns a general
+ * UnionPipePattern.
+ */
+ public static PipePattern buildUnionPattern(final List<PipePattern>
patterns) {
+ // Check if all instances in the list are of type IoTDBPipePattern
+ boolean allIoTDB = true;
+ for (final PipePattern p : patterns) {
+ if (!(p instanceof IoTDBPipePattern)) {
+ allIoTDB = false;
+ break;
}
}
- // 3. If neither "source.path" nor "source.pattern" is specified,
- // this pipe source will match all data.
- return new IoTDBPipePattern(null);
+ if (allIoTDB) {
+ final List<IoTDBPipePattern> iotdbPatterns = new
ArrayList<>(patterns.size());
+ for (final PipePattern p : patterns) {
+ iotdbPatterns.add((IoTDBPipePattern) p);
+ }
+ return new UnionIoTDBPipePattern(iotdbPatterns);
+ } else {
+ // If there's a mix of pattern types, use the general UnionPipePattern
+ return new UnionPipePattern(patterns);
+ }
}
- public abstract String getDefaultPattern();
+ public abstract boolean isSingle();
+
+ public abstract String getPattern();
+
+ public abstract boolean isRoot();
/** Check if this pattern is legal. Different pattern type may have
different rules. */
public abstract boolean isLegal();
@@ -133,9 +216,4 @@ public abstract class PipePattern {
* <p>NOTE: this is only called when {@link
PipePattern#mayOverlapWithDevice(String)} is true.
*/
public abstract boolean matchesMeasurement(final String device, final String
measurement);
-
- @Override
- public String toString() {
- return "{pattern='" + pattern + "'}";
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
index 492b92261b5..758accf4ba5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PrefixPipePattern.java
@@ -28,7 +28,7 @@ import org.apache.tsfile.common.constant.TsFileConstant;
import java.util.Arrays;
-public class PrefixPipePattern extends PipePattern {
+public class PrefixPipePattern extends SinglePipePattern {
public PrefixPipePattern(final String pattern) {
super(pattern);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/SinglePipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/SinglePipePattern.java
new file mode 100644
index 00000000000..352759f3699
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/SinglePipePattern.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.pattern;
+
+import java.util.Objects;
+
+public abstract class SinglePipePattern extends PipePattern {
+
+ protected final String pattern;
+
+ protected SinglePipePattern(final String pattern) {
+ this.pattern = pattern != null ? pattern : getDefaultPattern();
+ }
+
+ @Override
+ public boolean isSingle() {
+ return true;
+ }
+
+ @Override
+ public String getPattern() {
+ return pattern;
+ }
+
+ @Override
+ public boolean isRoot() {
+ return Objects.isNull(pattern) ||
this.pattern.equals(this.getDefaultPattern());
+ }
+
+ public abstract String getDefaultPattern();
+
+ @Override
+ public String toString() {
+ return "{pattern='" + pattern + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
new file mode 100644
index 00000000000..d2da4c99667
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionIoTDBPipePattern.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.pattern;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a union of multiple {@link IoTDBPipePattern}s. This specialized
class ensures type
+ * safety and provides access to methods specific to IoTDBPipePattern, such as
getIntersection.
+ */
+public class UnionIoTDBPipePattern extends PipePattern {
+
+ private final List<IoTDBPipePattern> patterns;
+
+ public UnionIoTDBPipePattern(final List<IoTDBPipePattern> patterns) {
+ this.patterns = patterns;
+ }
+
+ public UnionIoTDBPipePattern(final IoTDBPipePattern pattern) {
+ this.patterns = Collections.singletonList(pattern);
+ }
+
+ // **********************************************************************
+ // IoTDBPipePattern-specific aggregated methods
+ // **********************************************************************
+
+ public boolean matchPrefixPath(final String path) {
+ return patterns.stream().anyMatch(p -> p.matchPrefixPath(path));
+ }
+
+ public boolean matchDevice(final String devicePath) {
+ return patterns.stream().anyMatch(p -> p.matchDevice(devicePath));
+ }
+
+ public boolean matchTailNode(final String tailNode) {
+ return patterns.stream().anyMatch(p -> p.matchTailNode(tailNode));
+ }
+
+ public List<PartialPath> getIntersection(final PartialPath partialPath) {
+ final Set<PartialPath> uniqueIntersections = new LinkedHashSet<>();
+ for (final IoTDBPipePattern pattern : patterns) {
+ uniqueIntersections.addAll(pattern.getIntersection(partialPath));
+ }
+ return new ArrayList<>(uniqueIntersections);
+ }
+
+ public PathPatternTree getIntersection(final PathPatternTree patternTree) {
+ final PathPatternTree resultTree = new PathPatternTree();
+ for (final IoTDBPipePattern pattern : patterns) {
+ final PathPatternTree intersection =
pattern.getIntersection(patternTree);
+ if (intersection.isEmpty()) {
+ continue;
+ }
+ intersection.getAllPathPatterns().forEach(resultTree::appendPathPattern);
+ }
+ resultTree.constructTree();
+ return resultTree;
+ }
+
+ public boolean isPrefixOrFullPath() {
+ return patterns.stream().allMatch(p -> p.isPrefix() || p.isFullPath());
+ }
+
+ public boolean mayMatchMultipleTimeSeriesInOneDevice() {
+ return
patterns.stream().anyMatch(IoTDBPipePattern::mayMatchMultipleTimeSeriesInOneDevice);
+ }
+
+ // **********************************************************************
+ // Implementation of abstract methods from PipePattern
+ // **********************************************************************
+
+ @Override
+ public boolean isSingle() {
+ return patterns.size() == 1;
+ }
+
+ @Override
+ public String getPattern() {
+ return
patterns.stream().map(PipePattern::getPattern).collect(Collectors.joining(","));
+ }
+
+ @Override
+ public boolean isRoot() {
+ return patterns.stream().anyMatch(PipePattern::isRoot);
+ }
+
+ @Override
+ public boolean isLegal() {
+ return patterns.stream().allMatch(PipePattern::isLegal);
+ }
+
+ @Override
+ public boolean coversDb(final String db) {
+ return patterns.stream().anyMatch(p -> p.coversDb(db));
+ }
+
+ @Override
+ public boolean coversDevice(final String device) {
+ return patterns.stream().anyMatch(p -> p.coversDevice(device));
+ }
+
+ @Override
+ public boolean mayOverlapWithDb(final String db) {
+ return patterns.stream().anyMatch(p -> p.mayOverlapWithDb(db));
+ }
+
+ @Override
+ public boolean mayOverlapWithDevice(final String device) {
+ return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
+ }
+
+ @Override
+ public boolean matchesMeasurement(final String device, final String
measurement) {
+ return patterns.stream().anyMatch(p -> p.matchesMeasurement(device,
measurement));
+ }
+
+ @Override
+ public String toString() {
+ return "UnionIoTDBPipePattern{" + "patterns=" + patterns + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
new file mode 100644
index 00000000000..678ac4ff861
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/UnionPipePattern.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.pattern;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a union of multiple {@link PipePattern}s. A path is considered
to match if it matches
+ * any of the patterns in the collection.
+ */
+public class UnionPipePattern extends PipePattern {
+
+ private final List<PipePattern> patterns;
+
+ public UnionPipePattern(final List<PipePattern> patterns) {
+ this.patterns = patterns;
+ }
+
+ @Override
+ public boolean isSingle() {
+ return patterns.size() == 1;
+ }
+
+ @Override
+ public String getPattern() {
+ return
patterns.stream().map(PipePattern::getPattern).collect(Collectors.joining(","));
+ }
+
+ @Override
+ public boolean isRoot() {
+ return patterns.stream().anyMatch(PipePattern::isRoot);
+ }
+
+ @Override
+ public boolean isLegal() {
+ return patterns.stream().allMatch(PipePattern::isLegal);
+ }
+
+ @Override
+ public boolean coversDb(final String db) {
+ return patterns.stream().anyMatch(p -> p.coversDb(db));
+ }
+
+ @Override
+ public boolean coversDevice(final String device) {
+ return patterns.stream().anyMatch(p -> p.coversDevice(device));
+ }
+
+ @Override
+ public boolean mayOverlapWithDb(final String db) {
+ return patterns.stream().anyMatch(p -> p.mayOverlapWithDb(db));
+ }
+
+ @Override
+ public boolean mayOverlapWithDevice(final String device) {
+ return patterns.stream().anyMatch(p -> p.mayOverlapWithDevice(device));
+ }
+
+ @Override
+ public boolean matchesMeasurement(final String device, final String
measurement) {
+ return patterns.stream().anyMatch(p -> p.matchesMeasurement(device,
measurement));
+ }
+
+ @Override
+ public String toString() {
+ return "UnionPipePattern{" + "patterns=" + patterns + '}';
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
index 837ce2faed2..6a93321b8ca 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/source/IoTDBNonDataRegionSource.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.commons.pipe.source;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
import
org.apache.iotdb.commons.pipe.datastructure.queue.ConcurrentIterableLinkedQueue;
import
org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class IoTDBNonDataRegionSource extends IoTDBSource {
- protected IoTDBPipePattern pipePattern;
+ protected UnionIoTDBPipePattern pipePattern;
private List<PipeSnapshotEvent> historicalEvents = new LinkedList<>();
// A fixed size initialized only when the historicalEvents are first
@@ -68,15 +68,14 @@ public abstract class IoTDBNonDataRegionSource extends
IoTDBSource {
super.customize(parameters, configuration);
final PipePattern pattern =
PipePattern.parsePipePatternFromSourceParameters(parameters);
- if (!(pattern instanceof IoTDBPipePattern
- && (((IoTDBPipePattern) pattern).isPrefix()
- || ((IoTDBPipePattern) pattern).isFullPath()))) {
+ if (!(pattern instanceof UnionIoTDBPipePattern
+ && (((UnionIoTDBPipePattern) pattern).isPrefixOrFullPath()))) {
throw new IllegalArgumentException(
String.format(
"The path pattern %s is not valid for the source. Only prefix or
full path is allowed.",
pattern.getPattern()));
}
- pipePattern = (IoTDBPipePattern) pattern;
+ pipePattern = (UnionIoTDBPipePattern) pattern;
}
@Override