This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 917fd4e9b0c Pipe: Added output.measurements and output.database format 
check in AggregateProcessor (#12300)
917fd4e9b0c is described below

commit 917fd4e9b0c5480934acaefe1c6484e4c4c2e16d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 8 17:32:15 2024 +0800

    Pipe: Added output.measurements and output.database format check in 
AggregateProcessor (#12300)
---
 .../processor/aggregate/AggregateProcessor.java    | 36 ++++++++++++++++++++--
 1 file changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 7b855df1b43..9aa7eef8722 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -24,9 +24,11 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
 import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
@@ -131,15 +133,43 @@ public class AggregateProcessor implements PipeProcessor {
     final PipeParameters parameters = validator.getParameters();
     validator
         .validate(
-            args -> !((String) args).isEmpty(),
+            arg -> !((String) arg).isEmpty(),
             String.format("The parameter %s must not be empty.", 
PROCESSOR_OPERATORS_KEY),
             parameters.getStringOrDefault(
                 PROCESSOR_OPERATORS_KEY, PROCESSOR_OPERATORS_DEFAULT_VALUE))
         .validate(
-            args -> !((String) args).isEmpty(),
+            arg -> !((String) arg).isEmpty(),
             String.format("The parameter %s must not be empty.", 
PROCESSOR_WINDOWING_STRATEGY_KEY),
             parameters.getStringOrDefault(
-                PROCESSOR_WINDOWING_STRATEGY_KEY, 
PROCESSOR_WINDOWING_STRATEGY_DEFAULT_VALUE));
+                PROCESSOR_WINDOWING_STRATEGY_KEY, 
PROCESSOR_WINDOWING_STRATEGY_DEFAULT_VALUE))
+        .validate(
+            arg -> ((String) arg).isEmpty() || ((String) 
arg).startsWith("root."),
+            String.format(
+                "The output database %s shall start with root.",
+                parameters.getStringOrDefault(
+                    PROCESSOR_OUTPUT_DATABASE_KEY, 
PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE)),
+            parameters.getStringOrDefault(
+                PROCESSOR_OUTPUT_DATABASE_KEY, 
PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE))
+        .validate(
+            arg ->
+                Arrays.stream(((String) arg).replace(" ", "").split(","))
+                    .allMatch(this::isLegalMeasurement),
+            String.format(
+                "The output measurements %s contains illegal measurements, the 
measurements must be the last level of a legal path",
+                parameters.getStringOrDefault(
+                    PROCESSOR_OUTPUT_MEASUREMENTS_KEY,
+                    PROCESSOR_OUTPUT_MEASUREMENTS_DEFAULT_VALUE)),
+            parameters.getStringOrDefault(
+                PROCESSOR_OUTPUT_MEASUREMENTS_KEY, 
PROCESSOR_OUTPUT_MEASUREMENTS_DEFAULT_VALUE));
+  }
+
+  private boolean isLegalMeasurement(String measurement) {
+    try {
+      PathUtils.isLegalPath("root." + measurement);
+    } catch (IllegalPathException e) {
+      return false;
+    }
+    return measurement.startsWith("`") && measurement.endsWith("`") || 
!measurement.contains(".");
   }
 
   @Override

Reply via email to