This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-table-model-3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5a4b2839a4543db1828e97140b84327b1e8c20de
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Sep 25 18:50:19 2024 +0800

    Use session db if necessary when creating pipes
---
 .../execution/config/TableConfigTaskVisitor.java   | 24 ++++++++++++++++++++++
 .../plan/relational/sql/parser/AstBuilder.java     | 10 ++++-----
 .../config/constant/PipeExtractorConstant.java     |  7 +++++++
 .../pipe/config/constant/SystemConstant.java       |  2 ++
 4 files changed, 38 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 39911fab191..b39597dc77f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.queryengine.plan.execution.config;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -94,9 +96,12 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowClusterStatem
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowRegionStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -122,6 +127,8 @@ import static 
org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR_CH
 
 public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, 
MPPQueryContext> {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TableConfigTaskVisitor.class);
+
   private static final String DATABASE_NOT_SPECIFIED = "database is not 
specified";
 
   private final IClientSession clientSession;
@@ -456,6 +463,23 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
   @Override
   protected IConfigTask visitCreatePipe(CreatePipe node, MPPQueryContext 
context) {
     context.setQueryType(QueryType.WRITE);
+
+    final String databaseSetInSession = clientSession.getDatabaseName();
+    if (Objects.nonNull(databaseSetInSession)) {
+      final PipeParameters parameters = new 
PipeParameters(node.getExtractorAttributes());
+      if (!parameters.hasAnyAttributes(
+          PipeExtractorConstant.EXTRACTOR_CAPTURE_DATA_DATABASE_KEY,
+          PipeExtractorConstant.SOURCE_CAPTURE_DATA_DATABASE_KEY)) {
+        LOGGER.info(
+            "Database name is not specified in the extractor attributes of the 
create pipe statement {}, "
+                + "the database name in the session will be used: {}",
+            node,
+            databaseSetInSession);
+        node.getExtractorAttributes()
+            .put(SystemConstant.SESSION_DATABASE_KEY, databaseSetInSession);
+      }
+    }
+
     return new CreatePipeTask(node);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 7f84f6c696f..611308a05db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -570,12 +570,12 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
         ctx.extractorAttributesClause() != null
             ? parseExtractorAttributesClause(
                 ctx.extractorAttributesClause().extractorAttributeClause())
-            : Collections.emptyMap();
+            : new HashMap<>(); // DO NOT USE Collections.emptyMap() here
     final Map<String, String> processorAttributes =
         ctx.processorAttributesClause() != null
             ? parseProcessorAttributesClause(
                 ctx.processorAttributesClause().processorAttributeClause())
-            : Collections.emptyMap();
+            : new HashMap<>(); // DO NOT USE Collections.emptyMap() here
     final Map<String, String> connectorAttributes =
         ctx.connectorAttributesClause() != null
             ? parseConnectorAttributesClause(
@@ -638,7 +638,7 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       isReplaceAllExtractorAttributes =
           Objects.nonNull(ctx.alterExtractorAttributesClause().REPLACE());
     } else {
-      extractorAttributes = Collections.emptyMap();
+      extractorAttributes = new HashMap<>(); // DO NOT USE 
Collections.emptyMap() here
       isReplaceAllExtractorAttributes = false;
     }
 
@@ -651,7 +651,7 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       isReplaceAllProcessorAttributes =
           Objects.nonNull(ctx.alterProcessorAttributesClause().REPLACE());
     } else {
-      processorAttributes = Collections.emptyMap();
+      processorAttributes = new HashMap<>(); // DO NOT USE 
Collections.emptyMap() here
       isReplaceAllProcessorAttributes = false;
     }
 
@@ -664,7 +664,7 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       isReplaceAllConnectorAttributes =
           Objects.nonNull(ctx.alterConnectorAttributesClause().REPLACE());
     } else {
-      connectorAttributes = Collections.emptyMap();
+      connectorAttributes = new HashMap<>(); // DO NOT USE 
Collections.emptyMap() here
       isReplaceAllConnectorAttributes = false;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 58b8c06192c..f1b2f7c1007 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -30,6 +30,13 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_DIALECT_TABLE_VALUE = "table";
   public static final String EXTRACTOR_DIALECT_DEFAULT_VALUE = 
EXTRACTOR_DIALECT_TREE_VALUE;
 
+  public static final String EXTRACTOR_CAPTURE_DATA_DATABASE_KEY =
+      "extractor.capture.data.database";
+  public static final String SOURCE_CAPTURE_DATA_DATABASE_KEY = 
"source.capture.data.database";
+
+  public static final String EXTRACTOR_CAPTURE_DATA_SQL = 
"extractor.capture.data.sql";
+  public static final String SOURCE_CAPTURE_DATA_SQL = 
"source.capture.data.sql";
+
   public static final String EXTRACTOR_INCLUSION_KEY = "extractor.inclusion";
   public static final String SOURCE_INCLUSION_KEY = "source.inclusion";
   public static final String EXTRACTOR_INCLUSION_DEFAULT_VALUE = "data.insert";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index c77c56fb9c0..923cb7fe342 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -24,6 +24,8 @@ public class SystemConstant {
   public static final String RESTART_KEY = "__system.restart";
   public static final boolean RESTART_DEFAULT_VALUE = false;
 
+  public static final String SESSION_DATABASE_KEY = 
"__system.session.database";
+
   private SystemConstant() {
     throw new IllegalStateException("Utility class");
   }

Reply via email to