This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch sql-dialect-in-pipe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1a7dbd468e22a15d83ee4c217dc7d0a14920dbfc Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Oct 9 19:21:26 2024 +0800 fix --- .../dataregion/IoTDBDataRegionExtractor.java | 48 ++++++++++++++++++++++ .../config/constant/PipeExtractorConstant.java | 9 ++-- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index 66d3349d661..d7e33ee0a49 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.extractor.dataregion; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; @@ -107,6 +109,52 @@ public class IoTDBDataRegionExtractor extends IoTDBExtractor { public void validate(final PipeParameterValidator validator) throws Exception { super.validate(validator); + // Validate whether the pipe needs to extract table model data or tree model data + final boolean isTreeDialect = + validator + .getParameters() + .getStringOrDefault( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE) + .equals(SystemConstant.SQL_DIALECT_TREE_VALUE); + final boolean isTreeModelDataAllowedToBeCaptured = + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY, + PipeExtractorConstant.SOURCE_CAPTURE_TREE_KEY), + isTreeDialect); + final boolean isTableModelDataAllowedToBeCaptured = + validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY, + PipeExtractorConstant.SOURCE_CAPTURE_TABLE_KEY), + !isTreeDialect); + if (!isTreeModelDataAllowedToBeCaptured + && validator + .getParameters() + .hasAnyAttributes( + PipeExtractorConstant.EXTRACTOR_PATH_KEY, + PipeExtractorConstant.SOURCE_PATH_KEY, + PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, + PipeExtractorConstant.SOURCE_PATTERN_KEY)) { + throw new PipeException( + "The pipe cannot extract tree model data when sql dialect is set to table."); + } + if (!isTableModelDataAllowedToBeCaptured + && validator + .getParameters() + .hasAnyAttributes( + PipeExtractorConstant.EXTRACTOR_DATABASE_NAME_KEY, + PipeExtractorConstant.SOURCE_DATABASE_NAME_KEY, + PipeExtractorConstant.EXTRACTOR_TABLE_NAME_KEY, + PipeExtractorConstant.SOURCE_TABLE_NAME_KEY)) { + throw new PipeException( + "The pipe cannot extract table model data when sql dialect is set to tree."); + } + final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( validator.getParameters()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index db5cded0688..1ddca0c3edf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -24,11 +24,10 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_KEY = "extractor"; public static final String SOURCE_KEY = "source"; - public static final String EXTRACTOR_DIALECT_KEY = "extractor.dialect"; - public static final String SOURCE_DIALECT_KEY = "source.dialect"; - public static final String EXTRACTOR_DIALECT_TREE_VALUE = "tree"; - public static final String EXTRACTOR_DIALECT_TABLE_VALUE = "table"; - public static final String EXTRACTOR_DIALECT_DEFAULT_VALUE = EXTRACTOR_DIALECT_TREE_VALUE; + public static final String EXTRACTOR_CAPTURE_TREE_KEY = "extractor.capture.tree"; + public static final String SOURCE_CAPTURE_TREE_KEY = "source.capture.tree"; + public static final String EXTRACTOR_CAPTURE_TABLE_KEY = "extractor.capture.table"; + public static final String SOURCE_CAPTURE_TABLE_KEY = "source.capture.table"; public static final String EXTRACTOR_INCLUSION_KEY = "extractor.inclusion"; public static final String SOURCE_INCLUSION_KEY = "source.inclusion";
