This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 617c0a82e12 Support unified pipe pattern inclusion keys (#18017)
617c0a82e12 is described below
commit 617c0a82e12f7a66c32fbf1bf546e8e625336a59
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 16:13:40 2026 +0800
Support unified pipe pattern inclusion keys (#18017)
---
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 25 ++++++-
.../db/pipe/pattern/PipePatternPruningTest.java | 56 ++++++++++++++-
.../pipe/config/constant/PipeSourceConstant.java | 4 ++
.../pipe/datastructure/pattern/PipePattern.java | 82 ++++++++++++++-------
.../pattern/PipePatternParseTest.java | 83 ++++++++++++++++++++++
5 files changed, 220 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 09d3aef90c8..fba4b7d3ccf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -113,7 +113,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
@@ -123,7 +127,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_START_TIME_KEY;
@@ -915,10 +923,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source has pattern or path, we need to allocate memory
isTSFileParser =
isTSFileParser
- || sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
+ || sourceParameters.hasAnyAttributes(
+ EXTRACTOR_PATTERN_KEY,
+ SOURCE_PATTERN_KEY,
+ EXTRACTOR_PATTERN_INCLUSION_KEY,
+ SOURCE_PATTERN_INCLUSION_KEY,
+ EXTRACTOR_PATTERN_EXCLUSION_KEY,
+ SOURCE_PATTERN_EXCLUSION_KEY);
isTSFileParser =
- isTSFileParser ||
sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
+ isTSFileParser
+ || sourceParameters.hasAnyAttributes(
+ EXTRACTOR_PATH_KEY,
+ SOURCE_PATH_KEY,
+ EXTRACTOR_PATH_INCLUSION_KEY,
+ SOURCE_PATH_INCLUSION_KEY,
+ EXTRACTOR_PATH_EXCLUSION_KEY,
+ SOURCE_PATH_EXCLUSION_KEY);
// If the source is not hybrid, we do need to allocate memory
isTSFileParser =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PipePatternPruningTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PipePatternPruningTest.java
index 4b0c5b1ef2f..bbac90845bd 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PipePatternPruningTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PipePatternPruningTest.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionPipePattern;
+import
org.apache.iotdb.commons.pipe.datastructure.pattern.WithExclusionIoTDBPipePattern;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -148,9 +150,61 @@ public class PipePatternPruningTest {
}
});
- final PipePattern result =
PipePattern.parsePipePatternFromSourceParametersInternal(params);
+ final PipePattern result =
PipePattern.parsePipePatternFromSourceParameters(params);
Assert.assertTrue(result instanceof UnionIoTDBPipePattern);
Assert.assertEquals("root.sg.d1,root.sg.d2", result.getPattern());
}
+
+ @Test
+ public void testPatternAndPatternInclusionPreserved() {
+ final PipeParameters params =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.SOURCE_PATTERN_KEY, "root.sg.A");
+ put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.sg.B");
+ }
+ });
+
+ final PipePattern result =
PipePattern.parsePipePatternFromSourceParameters(params);
+
+ Assert.assertTrue(result instanceof UnionPipePattern);
+ Assert.assertEquals("root.sg.A,root.sg.B", result.getPattern());
+ }
+
+ @Test
+ public void testPathAndPathInclusionPreserved() {
+ final PipeParameters params =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1");
+ put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY,
"root.sg.d2,root.sg.d3");
+ }
+ });
+
+ final PipePattern result =
PipePattern.parsePipePatternFromSourceParameters(params);
+
+ Assert.assertTrue(result instanceof UnionIoTDBPipePattern);
+ Assert.assertEquals("root.sg.d1,root.sg.d2,root.sg.d3",
result.getPattern());
+ }
+
+ @Test
+ public void testPathInclusionWithPathExclusionPreserved() {
+ final PipeParameters params =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY,
"root.sg.**");
+ put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY,
"root.sg.d1,root.sg.d2");
+ }
+ });
+
+ final PipePattern result =
PipePattern.parsePipePatternFromSourceParameters(params);
+
+ Assert.assertTrue(result instanceof WithExclusionIoTDBPipePattern);
+ Assert.assertEquals(
+ "INCLUSION(root.sg.**), EXCLUSION(root.sg.d1,root.sg.d2)",
result.getPattern());
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
index aba780fb0d1..5fe2f53b7f5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -49,8 +49,12 @@ public class PipeSourceConstant {
public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
public static final String SOURCE_PATTERN_KEY = "source.pattern";
+ public static final String EXTRACTOR_PATTERN_INCLUSION_KEY =
"extractor.pattern.inclusion";
+ public static final String SOURCE_PATTERN_INCLUSION_KEY =
"source.pattern.inclusion";
public static final String EXTRACTOR_PATH_KEY = "extractor.path";
public static final String SOURCE_PATH_KEY = "source.path";
+ public static final String EXTRACTOR_PATH_INCLUSION_KEY =
"extractor.path.inclusion";
+ public static final String SOURCE_PATH_INCLUSION_KEY =
"source.path.inclusion";
public static final String EXTRACTOR_PATTERN_FORMAT_KEY =
"extractor.pattern.format";
public static final String SOURCE_PATTERN_FORMAT_KEY =
"source.pattern.format";
public static final String EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE = "prefix";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
index c0118397b3c..4cba6816cc8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePattern.java
@@ -39,16 +39,20 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
public abstract class PipePattern {
@@ -139,30 +143,13 @@ public abstract class PipePattern {
*/
public static PipePattern parsePipePatternFromSourceParameters(
final PipeParameters sourceParameters) {
- final PipePattern pipePattern =
parsePipePatternFromSourceParametersInternal(sourceParameters);
- if (!pipePattern.isSingle()) {
- final String msg =
- String.format(
- "Pipe: The provided pattern should be single now. Inclusion: %s,
Exclusion: %s",
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY),
- sourceParameters.getStringByKeys(
- EXTRACTOR_PATTERN_EXCLUSION_KEY,
SOURCE_PATTERN_EXCLUSION_KEY));
- LOGGER.warn(msg);
- throw new PipeException(msg);
- }
- return pipePattern;
+ return parsePipePatternFromSourceParametersInternal(sourceParameters);
}
public static PipePattern parsePipePatternFromSourceParametersInternal(
final PipeParameters sourceParameters) {
// 1. Parse INCLUSION patterns into a list
- List<PipePattern> inclusionPatterns =
- parsePatternList(
- sourceParameters,
- EXTRACTOR_PATH_KEY,
- SOURCE_PATH_KEY,
- EXTRACTOR_PATTERN_KEY,
- SOURCE_PATTERN_KEY);
+ List<PipePattern> inclusionPatterns =
parseInclusionPatternList(sourceParameters);
// If no inclusion patterns are specified, use default "root.**"
if (inclusionPatterns.isEmpty()) {
@@ -192,9 +179,20 @@ public abstract class PipePattern {
"Pipe: The provided exclusion pattern fully covers the inclusion
pattern. "
+ "This pipe pattern will match nothing. "
+ "Inclusion: %s, Exclusion: %s",
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY),
sourceParameters.getStringByKeys(
- EXTRACTOR_PATTERN_EXCLUSION_KEY,
SOURCE_PATTERN_EXCLUSION_KEY));
+ EXTRACTOR_PATTERN_INCLUSION_KEY,
+ SOURCE_PATTERN_INCLUSION_KEY,
+ EXTRACTOR_PATH_INCLUSION_KEY,
+ SOURCE_PATH_INCLUSION_KEY,
+ EXTRACTOR_PATH_KEY,
+ SOURCE_PATH_KEY,
+ EXTRACTOR_PATTERN_KEY,
+ SOURCE_PATTERN_KEY),
+ sourceParameters.getStringByKeys(
+ EXTRACTOR_PATH_EXCLUSION_KEY,
+ SOURCE_PATH_EXCLUSION_KEY,
+ EXTRACTOR_PATTERN_EXCLUSION_KEY,
+ SOURCE_PATTERN_EXCLUSION_KEY));
LOGGER.warn(msg);
throw new PipeException(msg);
}
@@ -296,20 +294,50 @@ public abstract class PipePattern {
final String extractorPatternKey,
final String sourcePatternKey) {
- final String path = sourceParameters.getStringByKeys(extractorPathKey,
sourcePathKey);
- final String pattern =
sourceParameters.getStringByKeys(extractorPatternKey, sourcePatternKey);
+ final List<PipePattern> result = new ArrayList<>();
+
+ addIoTDBPatternsIfPresent(result, sourceParameters, extractorPathKey,
sourcePathKey);
+ addPatternParameterPatternsIfPresent(
+ result, sourceParameters, extractorPatternKey, sourcePatternKey);
+
+ return result;
+ }
+ private static List<PipePattern> parseInclusionPatternList(
+ final PipeParameters sourceParameters) {
final List<PipePattern> result = new ArrayList<>();
- if (path != null) {
- result.addAll(parseMultiplePatterns(path, IoTDBPipePattern::new));
+ addPatternParameterPatternsIfPresent(
+ result, sourceParameters, EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY);
+ addIoTDBPatternsIfPresent(
+ result, sourceParameters, EXTRACTOR_PATTERN_INCLUSION_KEY,
SOURCE_PATTERN_INCLUSION_KEY);
+ addIoTDBPatternsIfPresent(result, sourceParameters, EXTRACTOR_PATH_KEY,
SOURCE_PATH_KEY);
+ addIoTDBPatternsIfPresent(
+ result, sourceParameters, EXTRACTOR_PATH_INCLUSION_KEY,
SOURCE_PATH_INCLUSION_KEY);
+
+ return result;
+ }
+
+ private static void addIoTDBPatternsIfPresent(
+ final List<PipePattern> result,
+ final PipeParameters sourceParameters,
+ final String extractorKey,
+ final String sourceKey) {
+ final String pattern = sourceParameters.getStringByKeys(extractorKey,
sourceKey);
+ if (pattern != null) {
+ result.addAll(parseMultiplePatterns(pattern, IoTDBPipePattern::new));
}
+ }
+ private static void addPatternParameterPatternsIfPresent(
+ final List<PipePattern> result,
+ final PipeParameters sourceParameters,
+ final String extractorPatternKey,
+ final String sourcePatternKey) {
+ final String pattern =
sourceParameters.getStringByKeys(extractorPatternKey, sourcePatternKey);
if (pattern != null) {
result.addAll(parsePatternsFromPatternParameter(pattern,
sourceParameters));
}
-
- return result;
}
/**
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePatternParseTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePatternParseTest.java
new file mode 100644
index 00000000000..77e61c7b1ea
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/pattern/PipePatternParseTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.datastructure.pattern;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class PipePatternParseTest {
+
+ @Test
+ public void testPatternAndPatternInclusionPreserved() {
+ final PipeParameters params =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.SOURCE_PATTERN_KEY, "root.sg.A");
+ put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.sg.B");
+ }
+ });
+
+ final PipePattern result =
PipePattern.parsePipePatternFromSourceParameters(params);
+
+ Assert.assertTrue(result instanceof UnionPipePattern);
+ Assert.assertEquals("root.sg.A,root.sg.B", result.getPattern());
+ }
+
+ @Test
+ public void testPathAndPathInclusionPreserved() {
+ final PipeParameters params =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1");
+ put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY,
"root.sg.d2,root.sg.d3");
+ }
+ });
+
+ final PipePattern result =
PipePattern.parsePipePatternFromSourceParameters(params);
+
+ Assert.assertTrue(result instanceof UnionIoTDBPipePattern);
+ Assert.assertEquals("root.sg.d1,root.sg.d2,root.sg.d3",
result.getPattern());
+ }
+
+ @Test
+ public void testPathInclusionWithPathExclusionPreserved() {
+ final PipeParameters params =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY,
"root.sg.**");
+ put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY,
"root.sg.d1,root.sg.d2");
+ }
+ });
+
+ final PipePattern result =
PipePattern.parsePipePatternFromSourceParameters(params);
+
+ Assert.assertTrue(result instanceof WithExclusionIoTDBPipePattern);
+ Assert.assertEquals(
+ "INCLUSION(root.sg.**), EXCLUSION(root.sg.d1,root.sg.d2)",
result.getPattern());
+ }
+}