This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch sql-dialect-in-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1aa2250279b0e396542e68405890698add0564fb
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Oct 9 18:58:16 2024 +0800

    Inject sql dialect param into pipe sql
---
 .../response/pipe/task/PipeTableResp.java          |  4 +++-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  3 +--
 .../execution/config/TableConfigTaskVisitor.java   |  6 +++++
 .../execution/config/TreeConfigTaskVisitor.java    |  6 +++++
 .../plan/relational/sql/parser/AstBuilder.java     | 10 ++++----
 .../pipe/config/constant/SystemConstant.java       | 28 ++++++++++++++++++++++
 6 files changed, 49 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index bb7cf2137d3..f2cbe60cb97 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
@@ -169,7 +170,8 @@ public class PipeTableResp implements DataSet {
               staticMeta.getPipeName(),
               staticMeta.getCreationTime(),
               runtimeMeta.getStatus().get().name(),
-              staticMeta.getExtractorParameters().toString(),
+              
SystemConstant.addSystemKeysIfNecessary(staticMeta.getExtractorParameters())
+                  .toString(),
               staticMeta.getProcessorParameters().toString(),
               staticMeta.getConnectorParameters().toString(),
               exceptionMessageBuilder.toString());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index b8c712fdc16..b744ef50d6f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -196,8 +196,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       // config region
       consensusGroupIdToTaskMetaMap.put(
           // 0 is the consensus group id of the config region, but data region 
id and schema region
-          // id
-          // also start from 0, so we use Integer.MIN_VALUE to represent the 
config region
+          // id also start from 0, so we use Integer.MIN_VALUE to represent 
the config region
           Integer.MIN_VALUE,
           new PipeTaskMeta(
               MinimumProgressIndex.INSTANCE,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 432a5111097..3a177923f3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.execution.config;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -466,6 +467,11 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
   @Override
   protected IConfigTask visitCreatePipe(CreatePipe node, MPPQueryContext 
context) {
     context.setQueryType(QueryType.WRITE);
+
+    // Inject table model into the extractor attributes
+    node.getExtractorAttributes()
+        .put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+
     return new CreatePipeTask(node);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 4f959422d19..80677d5adec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.execution.config;
 
+import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountDatabaseTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountTimeSlotListTask;
@@ -461,6 +462,11 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
   @Override
   public IConfigTask visitCreatePipe(
       CreatePipeStatement createPipeStatement, MPPQueryContext context) {
+    // Inject tree model into the extractor attributes
+    createPipeStatement
+        .getExtractorAttributes()
+        .put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
+
     return new CreatePipeTask(createPipeStatement);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index c2262423b4a..b4ce587db00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -571,12 +571,12 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
         ctx.extractorAttributesClause() != null
             ? parseExtractorAttributesClause(
                 ctx.extractorAttributesClause().extractorAttributeClause())
-            : Collections.emptyMap();
+            : new HashMap<>(); // DO NOT USE Collections.emptyMap() here
     final Map<String, String> processorAttributes =
         ctx.processorAttributesClause() != null
             ? parseProcessorAttributesClause(
                 ctx.processorAttributesClause().processorAttributeClause())
-            : Collections.emptyMap();
+            : new HashMap<>(); // DO NOT USE Collections.emptyMap() here
     final Map<String, String> connectorAttributes =
         ctx.connectorAttributesClause() != null
             ? parseConnectorAttributesClause(
@@ -639,7 +639,7 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       isReplaceAllExtractorAttributes =
           Objects.nonNull(ctx.alterExtractorAttributesClause().REPLACE());
     } else {
-      extractorAttributes = Collections.emptyMap();
+      extractorAttributes = new HashMap<>(); // DO NOT USE 
Collections.emptyMap() here
       isReplaceAllExtractorAttributes = false;
     }
 
@@ -652,7 +652,7 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       isReplaceAllProcessorAttributes =
           Objects.nonNull(ctx.alterProcessorAttributesClause().REPLACE());
     } else {
-      processorAttributes = Collections.emptyMap();
+      processorAttributes = new HashMap<>(); // DO NOT USE 
Collections.emptyMap() here
       isReplaceAllProcessorAttributes = false;
     }
 
@@ -665,7 +665,7 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
       isReplaceAllConnectorAttributes =
           Objects.nonNull(ctx.alterConnectorAttributesClause().REPLACE());
     } else {
-      connectorAttributes = Collections.emptyMap();
+      connectorAttributes = new HashMap<>(); // DO NOT USE 
Collections.emptyMap() here
       isReplaceAllConnectorAttributes = false;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index c77c56fb9c0..852bc7cc1be 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -19,11 +19,39 @@
 
 package org.apache.iotdb.commons.pipe.config.constant;
 
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 public class SystemConstant {
 
   public static final String RESTART_KEY = "__system.restart";
   public static final boolean RESTART_DEFAULT_VALUE = false;
 
+  public static final String SQL_DIALECT_KEY = "__system.sql-dialect";
+  public static final String SQL_DIALECT_TREE_VALUE = "tree";
+  public static final String SQL_DIALECT_TABLE_VALUE = "table";
+
+  /////////////////////////////////// Utility 
///////////////////////////////////
+
+  public static final Set<String> SYSTEM_KEYS = new HashSet<>();
+
+  static {
+    SYSTEM_KEYS.add(RESTART_KEY);
+    SYSTEM_KEYS.add(SQL_DIALECT_KEY);
+  }
+
+  public static PipeParameters addSystemKeysIfNecessary(final PipeParameters 
givenPipeParameters) {
+    final Map<String, String> attributes = new 
HashMap<>(givenPipeParameters.getAttribute());
+    attributes.putIfAbsent(SQL_DIALECT_KEY, SQL_DIALECT_TREE_VALUE);
+    return new PipeParameters(attributes);
+  }
+
+  /////////////////////////////////// Private Constructor 
///////////////////////////////////
+
   private SystemConstant() {
     throw new IllegalStateException("Utility class");
   }

Reply via email to