This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch sql-dialect-in-pipe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1aa2250279b0e396542e68405890698add0564fb Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Oct 9 18:58:16 2024 +0800 Inject sql dialect param into pipe sql --- .../response/pipe/task/PipeTableResp.java | 4 +++- .../impl/pipe/task/CreatePipeProcedureV2.java | 3 +-- .../execution/config/TableConfigTaskVisitor.java | 6 +++++ .../execution/config/TreeConfigTaskVisitor.java | 6 +++++ .../plan/relational/sql/parser/AstBuilder.java | 10 ++++---- .../pipe/config/constant/SystemConstant.java | 28 ++++++++++++++++++++++ 6 files changed, 49 insertions(+), 8 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java index bb7cf2137d3..f2cbe60cb97 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; @@ -169,7 +170,8 @@ public class PipeTableResp implements DataSet { staticMeta.getPipeName(), staticMeta.getCreationTime(), runtimeMeta.getStatus().get().name(), - staticMeta.getExtractorParameters().toString(), + SystemConstant.addSystemKeysIfNecessary(staticMeta.getExtractorParameters()) + .toString(), staticMeta.getProcessorParameters().toString(), staticMeta.getConnectorParameters().toString(), exceptionMessageBuilder.toString()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index b8c712fdc16..b744ef50d6f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -196,8 +196,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { // config region consensusGroupIdToTaskMetaMap.put( // 0 is the consensus group id of the config region, but data region id and schema region - // id - // also start from 0, so we use Integer.MIN_VALUE to represent the config region + // id also start from 0, so we use Integer.MIN_VALUE to represent the config region Integer.MIN_VALUE, new PipeTaskMeta( MinimumProgressIndex.INSTANCE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 432a5111097..3a177923f3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; @@ -466,6 +467,11 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont @Override protected IConfigTask visitCreatePipe(CreatePipe node, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); + + // Inject table model into the extractor attributes + node.getExtractorAttributes() + .put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); + return new CreatePipeTask(node); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 4f959422d19..80677d5adec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountDatabaseTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountTimeSlotListTask; @@ -461,6 +462,11 @@ public class TreeConfigTaskVisitor extends StatementVisitor<IConfigTask, MPPQuer @Override public IConfigTask visitCreatePipe( CreatePipeStatement createPipeStatement, MPPQueryContext context) { + // Inject tree model into the extractor attributes + createPipeStatement + .getExtractorAttributes() + .put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + return new CreatePipeTask(createPipeStatement); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index c2262423b4a..b4ce587db00 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -571,12 +571,12 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { ctx.extractorAttributesClause() != null ? parseExtractorAttributesClause( ctx.extractorAttributesClause().extractorAttributeClause()) - : Collections.emptyMap(); + : new HashMap<>(); // DO NOT USE Collections.emptyMap() here final Map<String, String> processorAttributes = ctx.processorAttributesClause() != null ? parseProcessorAttributesClause( ctx.processorAttributesClause().processorAttributeClause()) - : Collections.emptyMap(); + : new HashMap<>(); // DO NOT USE Collections.emptyMap() here final Map<String, String> connectorAttributes = ctx.connectorAttributesClause() != null ? parseConnectorAttributesClause( @@ -639,7 +639,7 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { isReplaceAllExtractorAttributes = Objects.nonNull(ctx.alterExtractorAttributesClause().REPLACE()); } else { - extractorAttributes = Collections.emptyMap(); + extractorAttributes = new HashMap<>(); // DO NOT USE Collections.emptyMap() here isReplaceAllExtractorAttributes = false; } @@ -652,7 +652,7 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { isReplaceAllProcessorAttributes = Objects.nonNull(ctx.alterProcessorAttributesClause().REPLACE()); } else { - processorAttributes = Collections.emptyMap(); + processorAttributes = new HashMap<>(); // DO NOT USE Collections.emptyMap() here isReplaceAllProcessorAttributes = false; } @@ -665,7 +665,7 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { isReplaceAllConnectorAttributes = Objects.nonNull(ctx.alterConnectorAttributesClause().REPLACE()); } else { - connectorAttributes = Collections.emptyMap(); + connectorAttributes = new HashMap<>(); // DO NOT USE Collections.emptyMap() here isReplaceAllConnectorAttributes = false; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java index c77c56fb9c0..852bc7cc1be 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java @@ -19,11 +19,39 @@ package org.apache.iotdb.commons.pipe.config.constant; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + public class SystemConstant { public static final String RESTART_KEY = "__system.restart"; public static final boolean RESTART_DEFAULT_VALUE = false; + public static final String SQL_DIALECT_KEY = "__system.sql-dialect"; + public static final String SQL_DIALECT_TREE_VALUE = "tree"; + public static final String SQL_DIALECT_TABLE_VALUE = "table"; + + /////////////////////////////////// Utility /////////////////////////////////// + + public static final Set<String> SYSTEM_KEYS = new HashSet<>(); + + static { + SYSTEM_KEYS.add(RESTART_KEY); + SYSTEM_KEYS.add(SQL_DIALECT_KEY); + } + + public static PipeParameters addSystemKeysIfNecessary(final PipeParameters givenPipeParameters) { + final Map<String, String> attributes = new HashMap<>(givenPipeParameters.getAttribute()); + attributes.putIfAbsent(SQL_DIALECT_KEY, SQL_DIALECT_TREE_VALUE); + return new PipeParameters(attributes); + } + + /////////////////////////////////// Private Constructor /////////////////////////////////// + private SystemConstant() { throw new IllegalStateException("Utility class"); }
