This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-table-model-3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a4b2839a4543db1828e97140b84327b1e8c20de Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Sep 25 18:50:19 2024 +0800 Use session db if necessary when creating pipes --- .../execution/config/TableConfigTaskVisitor.java | 24 ++++++++++++++++++++++ .../plan/relational/sql/parser/AstBuilder.java | 10 ++++----- .../config/constant/PipeExtractorConstant.java | 7 +++++++ .../pipe/config/constant/SystemConstant.java | 2 ++ 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 39911fab191..b39597dc77f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; @@ -94,9 +96,12 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatem import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -122,6 +127,8 @@ import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR_CH public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryContext> { + private static final Logger LOGGER = LoggerFactory.getLogger(TableConfigTaskVisitor.class); + private static final String DATABASE_NOT_SPECIFIED = "database is not specified"; private final IClientSession clientSession; @@ -456,6 +463,23 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont @Override protected IConfigTask visitCreatePipe(CreatePipe node, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); + + final String databaseSetInSession = clientSession.getDatabaseName(); + if (Objects.nonNull(databaseSetInSession)) { + final PipeParameters parameters = new PipeParameters(node.getExtractorAttributes()); + if (!parameters.hasAnyAttributes( + PipeExtractorConstant.EXTRACTOR_CAPTURE_DATA_DATABASE_KEY, + PipeExtractorConstant.SOURCE_CAPTURE_DATA_DATABASE_KEY)) { + LOGGER.info( + "Database name is not specified in the extractor attributes of the create pipe statement {}, " + + "the database name in the session will be used: {}", + node, + databaseSetInSession); + node.getExtractorAttributes() + .put(SystemConstant.SESSION_DATABASE_KEY, databaseSetInSession); + } + } + return new CreatePipeTask(node); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 7f84f6c696f..611308a05db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -570,12 +570,12 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { ctx.extractorAttributesClause() != null ? parseExtractorAttributesClause( ctx.extractorAttributesClause().extractorAttributeClause()) - : Collections.emptyMap(); + : new HashMap<>(); // DO NOT USE Collections.emptyMap() here final Map<String, String> processorAttributes = ctx.processorAttributesClause() != null ? parseProcessorAttributesClause( ctx.processorAttributesClause().processorAttributeClause()) - : Collections.emptyMap(); + : new HashMap<>(); // DO NOT USE Collections.emptyMap() here final Map<String, String> connectorAttributes = ctx.connectorAttributesClause() != null ? parseConnectorAttributesClause( @@ -638,7 +638,7 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { isReplaceAllExtractorAttributes = Objects.nonNull(ctx.alterExtractorAttributesClause().REPLACE()); } else { - extractorAttributes = Collections.emptyMap(); + extractorAttributes = new HashMap<>(); // DO NOT USE Collections.emptyMap() here isReplaceAllExtractorAttributes = false; } @@ -651,7 +651,7 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { isReplaceAllProcessorAttributes = Objects.nonNull(ctx.alterProcessorAttributesClause().REPLACE()); } else { - processorAttributes = Collections.emptyMap(); + processorAttributes = new HashMap<>(); // DO NOT USE Collections.emptyMap() here isReplaceAllProcessorAttributes = false; } @@ -664,7 +664,7 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { isReplaceAllConnectorAttributes = Objects.nonNull(ctx.alterConnectorAttributesClause().REPLACE()); } else { - connectorAttributes = Collections.emptyMap(); + connectorAttributes = new HashMap<>(); // DO NOT USE Collections.emptyMap() here isReplaceAllConnectorAttributes = false; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 58b8c06192c..f1b2f7c1007 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -30,6 +30,13 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_DIALECT_TABLE_VALUE = "table"; public static final String EXTRACTOR_DIALECT_DEFAULT_VALUE = EXTRACTOR_DIALECT_TREE_VALUE; + public static final String EXTRACTOR_CAPTURE_DATA_DATABASE_KEY = + "extractor.capture.data.database"; + public static final String SOURCE_CAPTURE_DATA_DATABASE_KEY = "source.capture.data.database"; + + public static final String EXTRACTOR_CAPTURE_DATA_SQL = "extractor.capture.data.sql"; + public static final String SOURCE_CAPTURE_DATA_SQL = "source.capture.data.sql"; + public static final String EXTRACTOR_INCLUSION_KEY = "extractor.inclusion"; public static final String SOURCE_INCLUSION_KEY = "source.inclusion"; public static final String EXTRACTOR_INCLUSION_DEFAULT_VALUE = "data.insert"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java index c77c56fb9c0..923cb7fe342 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java @@ -24,6 +24,8 @@ public class SystemConstant { public static final String RESTART_KEY = "__system.restart"; public static final boolean RESTART_DEFAULT_VALUE = false; + public static final String SESSION_DATABASE_KEY = "__system.session.database"; + private SystemConstant() { throw new IllegalStateException("Utility class"); }
