This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-table-model
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 133b28060a41300827f0cc098131adaa1a606aab
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Sep 24 17:12:40 2024 +0800

    pipe plugin sql
---
 .../plan/relational/sql/ast/CreatePipe.java        | 11 ++--
 .../ast/{CreatePipe.java => CreatePipePlugin.java} | 65 +++++++++-------------
 .../plan/relational/sql/ast/DropPipe.java          |  2 +-
 .../sql/ast/{DropPipe.java => DropPipePlugin.java} | 20 +++----
 .../ast/{StopPipe.java => ShowPipePlugins.java}    | 26 ++-------
 .../plan/relational/sql/ast/StartPipe.java         |  2 +-
 .../plan/relational/sql/ast/StopPipe.java          |  2 +-
 .../plan/relational/sql/parser/AstBuilder.java     | 31 ++++++++++-
 8 files changed, 78 insertions(+), 81 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java
index 2e2917ed04b..e997c02a239 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java
@@ -43,11 +43,14 @@ public class CreatePipe extends Statement {
       final Map<String, String> processorAttributes,
       final Map<String, String> connectorAttributes) {
     super(null);
-    this.pipeName = requireNonNull(pipeName);
+    this.pipeName = requireNonNull(pipeName, "pipe name can not be null");
     this.ifNotExistsCondition = ifNotExistsCondition;
-    this.extractorAttributes = requireNonNull(extractorAttributes);
-    this.processorAttributes = requireNonNull(processorAttributes);
-    this.connectorAttributes = requireNonNull(connectorAttributes);
+    this.extractorAttributes =
+        requireNonNull(extractorAttributes, "extractor/source attributes can 
not be null");
+    this.processorAttributes =
+        requireNonNull(processorAttributes, "processor attributes can not be 
null");
+    this.connectorAttributes =
+        requireNonNull(connectorAttributes, "connector attributes can not be 
null");
   }
 
   public String getPipeName() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipePlugin.java
similarity index 54%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipePlugin.java
index 2e2917ed04b..cc541ece980 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipePlugin.java
@@ -22,52 +22,44 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
-public class CreatePipe extends Statement {
+public class CreatePipePlugin extends Statement {
 
-  private final String pipeName;
+  private final String pluginName;
   private final boolean ifNotExistsCondition;
-  private final Map<String, String> extractorAttributes;
-  private final Map<String, String> processorAttributes;
-  private final Map<String, String> connectorAttributes;
+  private final String className;
+  private final String uriString;
 
-  public CreatePipe(
-      final String pipeName,
+  public CreatePipePlugin(
+      final String pluginName,
       final boolean ifNotExistsCondition,
-      final Map<String, String> extractorAttributes,
-      final Map<String, String> processorAttributes,
-      final Map<String, String> connectorAttributes) {
+      final String className,
+      final String uriString) {
     super(null);
-    this.pipeName = requireNonNull(pipeName);
+    this.pluginName = requireNonNull(pluginName, "plugin name can not be 
null");
     this.ifNotExistsCondition = ifNotExistsCondition;
-    this.extractorAttributes = requireNonNull(extractorAttributes);
-    this.processorAttributes = requireNonNull(processorAttributes);
-    this.connectorAttributes = requireNonNull(connectorAttributes);
+    this.className = requireNonNull(className, "class name can not be null");
+    this.uriString = requireNonNull(uriString, "uri can not be null");
   }
 
-  public String getPipeName() {
-    return pipeName;
+  public String getPluginName() {
+    return pluginName;
   }
 
   public boolean hasIfNotExistsCondition() {
     return ifNotExistsCondition;
   }
 
-  public Map<String, String> getExtractorAttributes() {
-    return extractorAttributes;
+  public String getClassName() {
+    return className;
   }
 
-  public Map<String, String> getProcessorAttributes() {
-    return processorAttributes;
-  }
-
-  public Map<String, String> getConnectorAttributes() {
-    return connectorAttributes;
+  public String getUriString() {
+    return uriString;
   }
 
   @Override
@@ -83,12 +75,7 @@ public class CreatePipe extends Statement {
 
   @Override
   public int hashCode() {
-    return Objects.hash(
-        pipeName,
-        ifNotExistsCondition,
-        extractorAttributes,
-        processorAttributes,
-        connectorAttributes);
+    return Objects.hash(pluginName, ifNotExistsCondition, className, 
uriString);
   }
 
   @Override
@@ -99,22 +86,20 @@ public class CreatePipe extends Statement {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    CreatePipe other = (CreatePipe) obj;
-    return Objects.equals(pipeName, other.pipeName)
+    CreatePipePlugin other = (CreatePipePlugin) obj;
+    return Objects.equals(pluginName, other.pluginName)
         && Objects.equals(ifNotExistsCondition, other.ifNotExistsCondition)
-        && Objects.equals(extractorAttributes, other.extractorAttributes)
-        && Objects.equals(processorAttributes, other.processorAttributes)
-        && Objects.equals(connectorAttributes, other.connectorAttributes);
+        && Objects.equals(className, other.className)
+        && Objects.equals(uriString, other.uriString);
   }
 
   @Override
   public String toString() {
     return toStringHelper(this)
-        .add("pipeName", pipeName)
+        .add("pluginName", pluginName)
         .add("ifNotExistsCondition", ifNotExistsCondition)
-        .add("extractorAttributes", extractorAttributes)
-        .add("processorAttributes", processorAttributes)
-        .add("connectorAttributes", connectorAttributes)
+        .add("className", className)
+        .add("uriString", uriString)
         .toString();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
index 08889b512ea..053431c1f19 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
@@ -34,7 +34,7 @@ public class DropPipe extends Statement {
 
   public DropPipe(final String pipeName, final boolean ifExistsCondition) {
     super(null);
-    this.pipeName = requireNonNull(pipeName);
+    this.pipeName = requireNonNull(pipeName, "pipe name can not be null");
     this.ifExistsCondition = ifExistsCondition;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipePlugin.java
similarity index 79%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipePlugin.java
index 08889b512ea..81cb32d4244 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropPipePlugin.java
@@ -27,19 +27,19 @@ import java.util.Objects;
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
-public class DropPipe extends Statement {
+public class DropPipePlugin extends Statement {
 
-  private final String pipeName;
+  private final String pluginName;
   private final boolean ifExistsCondition;
 
-  public DropPipe(final String pipeName, final boolean ifExistsCondition) {
+  public DropPipePlugin(final String pluginName, final boolean 
ifExistsCondition) {
     super(null);
-    this.pipeName = requireNonNull(pipeName);
+    this.pluginName = requireNonNull(pluginName, "plugin name can not be 
null");
     this.ifExistsCondition = ifExistsCondition;
   }
 
-  public String getPipeName() {
-    return pipeName;
+  public String getPluginName() {
+    return pluginName;
   }
 
   public boolean hasIfExistsCondition() {
@@ -59,7 +59,7 @@ public class DropPipe extends Statement {
 
   @Override
   public int hashCode() {
-    return Objects.hash(pipeName, ifExistsCondition);
+    return Objects.hash(pluginName, ifExistsCondition);
   }
 
   @Override
@@ -70,15 +70,15 @@ public class DropPipe extends Statement {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    DropPipe other = (DropPipe) obj;
-    return Objects.equals(pipeName, other.pipeName)
+    DropPipePlugin other = (DropPipePlugin) obj;
+    return Objects.equals(pluginName, other.pluginName)
         && Objects.equals(ifExistsCondition, other.ifExistsCondition);
   }
 
   @Override
   public String toString() {
     return toStringHelper(this)
-        .add("pipeName", pipeName)
+        .add("pluginName", pluginName)
         .add("ifExistsCondition", ifExistsCondition)
         .toString();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipePlugins.java
similarity index 69%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipePlugins.java
index be6ac6d80f8..de290de3cdf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowPipePlugins.java
@@ -22,22 +22,13 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
-import java.util.Objects;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
 
-public class StopPipe extends Statement {
+public class ShowPipePlugins extends Statement {
 
-  private final String pipeName;
-
-  public StopPipe(final String pipeName) {
+  public ShowPipePlugins() {
     super(null);
-    this.pipeName = requireNonNull(pipeName);
-  }
-
-  public String getPipeName() {
-    return pipeName;
   }
 
   @Override
@@ -53,23 +44,16 @@ public class StopPipe extends Statement {
 
   @Override
   public int hashCode() {
-    return Objects.hash(pipeName);
+    return 0;
   }
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    StopPipe other = (StopPipe) obj;
-    return Objects.equals(pipeName, other.pipeName);
+    return obj instanceof ShowPipePlugins;
   }
 
   @Override
   public String toString() {
-    return toStringHelper(this).add("pipeName", pipeName).toString();
+    return toStringHelper(this).toString();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
index 46a3395030f..5f6cbffee38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StartPipe.java
@@ -33,7 +33,7 @@ public class StartPipe extends Statement {
 
   public StartPipe(final String pipeName) {
     super(null);
-    this.pipeName = requireNonNull(pipeName);
+    this.pipeName = requireNonNull(pipeName, "pipe name can not be null");
   }
 
   public String getPipeName() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
index be6ac6d80f8..1e1eb242d0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/StopPipe.java
@@ -33,7 +33,7 @@ public class StopPipe extends Statement {
 
   public StopPipe(final String pipeName) {
     super(null);
-    this.pipeName = requireNonNull(pipeName);
+    this.pipeName = requireNonNull(pipeName, "pipe name can not be null");
   }
 
   public String getPipeName() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 0e306c0d684..52c3c770224 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentDatabase;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentTime;
@@ -58,6 +59,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate;
@@ -118,6 +120,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
@@ -165,6 +168,8 @@ import org.apache.tsfile.utils.TimeDuration;
 
 import javax.annotation.Nullable;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.time.ZoneId;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -700,18 +705,38 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
   @Override
   public Node visitCreatePipePluginStatement(
       RelationalSqlParser.CreatePipePluginStatementContext ctx) {
-    return super.visitCreatePipePluginStatement(ctx);
+    final String pluginName = ((Identifier) 
visit(ctx.identifier())).getValue();
+    final boolean hasIfNotExistsCondition =
+        ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null;
+    final String className = ((StringLiteral) visit(ctx.className)).getValue();
+    final String uriString = parseAndValidateURI(ctx.uriClause());
+    return new CreatePipePlugin(pluginName, hasIfNotExistsCondition, 
className, uriString);
+  }
+
+  private String parseAndValidateURI(RelationalSqlParser.UriClauseContext ctx) 
{
+    final String uriString =
+        ctx.uri.identifier() != null
+            ? ((Identifier) visit(ctx.uri.identifier())).getValue()
+            : ((StringLiteral) visit(ctx.uri.string())).getValue();
+    try {
+      new URI(uriString);
+    } catch (URISyntaxException e) {
+      throw new SemanticException(String.format("Invalid URI: %s", uriString));
+    }
+    return uriString;
   }
 
   @Override
   public Node 
visitDropPipePluginStatement(RelationalSqlParser.DropPipePluginStatementContext 
ctx) {
-    return super.visitDropPipePluginStatement(ctx);
+    final String pluginName = ((Identifier) 
visit(ctx.identifier())).getValue();
+    final boolean hasIfExistsCondition = ctx.IF() != null && ctx.EXISTS() != 
null;
+    return new DropPipePlugin(pluginName, hasIfExistsCondition);
   }
 
   @Override
   public Node visitShowPipePluginsStatement(
       RelationalSqlParser.ShowPipePluginsStatementContext ctx) {
-    return super.visitShowPipePluginsStatement(ctx);
+    return new ShowPipePlugins();
   }
 
   @Override

Reply via email to