This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 917fd4e9b0c Pipe: Added output.measurements and output.database format
check in AggregateProcessor (#12300)
917fd4e9b0c is described below
commit 917fd4e9b0c5480934acaefe1c6484e4c4c2e16d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 8 17:32:15 2024 +0800
Pipe: Added output.measurements and output.database format check in
AggregateProcessor (#12300)
---
.../processor/aggregate/AggregateProcessor.java | 36 ++++++++++++++++++++--
1 file changed, 33 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 7b855df1b43..9aa7eef8722 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -24,9 +24,11 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
@@ -131,15 +133,43 @@ public class AggregateProcessor implements PipeProcessor {
final PipeParameters parameters = validator.getParameters();
validator
.validate(
- args -> !((String) args).isEmpty(),
+ arg -> !((String) arg).isEmpty(),
String.format("The parameter %s must not be empty.",
PROCESSOR_OPERATORS_KEY),
parameters.getStringOrDefault(
PROCESSOR_OPERATORS_KEY, PROCESSOR_OPERATORS_DEFAULT_VALUE))
.validate(
- args -> !((String) args).isEmpty(),
+ arg -> !((String) arg).isEmpty(),
String.format("The parameter %s must not be empty.",
PROCESSOR_WINDOWING_STRATEGY_KEY),
parameters.getStringOrDefault(
- PROCESSOR_WINDOWING_STRATEGY_KEY,
PROCESSOR_WINDOWING_STRATEGY_DEFAULT_VALUE));
+ PROCESSOR_WINDOWING_STRATEGY_KEY,
PROCESSOR_WINDOWING_STRATEGY_DEFAULT_VALUE))
+ .validate(
+ arg -> ((String) arg).isEmpty() || ((String)
arg).startsWith("root."),
+ String.format(
+ "The output database %s shall start with root.",
+ parameters.getStringOrDefault(
+ PROCESSOR_OUTPUT_DATABASE_KEY,
PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE)),
+ parameters.getStringOrDefault(
+ PROCESSOR_OUTPUT_DATABASE_KEY,
PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE))
+ .validate(
+ arg ->
+ Arrays.stream(((String) arg).replace(" ", "").split(","))
+ .allMatch(this::isLegalMeasurement),
+ String.format(
+ "The output measurements %s contains illegal measurements, the
measurements must be the last level of a legal path",
+ parameters.getStringOrDefault(
+ PROCESSOR_OUTPUT_MEASUREMENTS_KEY,
+ PROCESSOR_OUTPUT_MEASUREMENTS_DEFAULT_VALUE)),
+ parameters.getStringOrDefault(
+ PROCESSOR_OUTPUT_MEASUREMENTS_KEY,
PROCESSOR_OUTPUT_MEASUREMENTS_DEFAULT_VALUE));
+ }
+
+ private boolean isLegalMeasurement(String measurement) {
+ try {
+ PathUtils.isLegalPath("root." + measurement);
+ } catch (IllegalPathException e) {
+ return false;
+ }
+ return measurement.startsWith("`") && measurement.endsWith("`") ||
!measurement.contains(".");
}
@Override