This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch sql-dialect-in-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1a7dbd468e22a15d83ee4c217dc7d0a14920dbfc
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Oct 9 19:21:26 2024 +0800

    fix
---
 .../dataregion/IoTDBDataRegionExtractor.java       | 48 ++++++++++++++++++++++
 .../config/constant/PipeExtractorConstant.java     |  9 ++--
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 66d3349d661..d7e33ee0a49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.extractor.dataregion;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
@@ -107,6 +109,52 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     super.validate(validator);
 
+    // Validate whether the pipe needs to extract table model data or tree 
model data
+    final boolean isTreeDialect =
+        validator
+            .getParameters()
+            .getStringOrDefault(
+                SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE)
+            .equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
+    final boolean isTreeModelDataAllowedToBeCaptured =
+        validator
+            .getParameters()
+            .getBooleanOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY,
+                    PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY),
+                isTreeDialect);
+    final boolean isTableModelDataAllowedToBeCaptured =
+        validator
+            .getParameters()
+            .getBooleanOrDefault(
+                Arrays.asList(
+                    PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY,
+                    PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY),
+                !isTreeDialect);
+    if (!isTreeModelDataAllowedToBeCaptured
+        && validator
+            .getParameters()
+            .hasAnyAttributes(
+                PipeExtractorConstant.EXTRACTOR_PATH_KEY,
+                PipeExtractorConstant.SOURCE_PATH_KEY,
+                PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
+                PipeExtractorConstant.SOURCE_PATTERN_KEY)) {
+      throw new PipeException(
+          "The pipe cannot extract tree model data when sql dialect is set to 
table.");
+    }
+    if (!isTableModelDataAllowedToBeCaptured
+        && validator
+            .getParameters()
+            .hasAnyAttributes(
+                PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY,
+                PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY,
+                PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY,
+                PipeExtractorConstant.SOURCE_TABLE_NAME_KEY)) {
+      throw new PipeException(
+          "The pipe cannot extract table model data when sql dialect is set to 
tree.");
+    }
+
     final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
         DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
             validator.getParameters());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index db5cded0688..1ddca0c3edf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -24,11 +24,10 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_KEY = "extractor";
   public static final String SOURCE_KEY = "source";
 
-  public static final String EXTRACTOR_DIALECT_KEY = "extractor.dialect";
-  public static final String SOURCE_DIALECT_KEY = "source.dialect";
-  public static final String EXTRACTOR_DIALECT_TREE_VALUE = "tree";
-  public static final String EXTRACTOR_DIALECT_TABLE_VALUE = "table";
-  public static final String EXTRACTOR_DIALECT_DEFAULT_VALUE = 
EXTRACTOR_DIALECT_TREE_VALUE;
+  public static final String EXTRACTOR_CAPTURE_TREE_KEY = 
"extractor.capture.tree";
+  public static final String SOURCE_CAPTURE_TREE_KEY = "source.capture.tree";
+  public static final String EXTRACTOR_CAPTURE_TABLE_KEY = 
"extractor.capture.table";
+  public static final String SOURCE_CAPTURE_TABLE_KEY = "source.capture.table";
 
   public static final String EXTRACTOR_INCLUSION_KEY = "extractor.inclusion";
   public static final String SOURCE_INCLUSION_KEY = "source.inclusion";

Reply via email to