This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 079fc8fd085 Subscription: basic support for table model from SQL to 
IConfigTask & Pipe: fix too many warn logs from findAllStuckPipes() (#14287)
079fc8fd085 is described below

commit 079fc8fd085ce26921935656c0485858ea0959f0
Author: VGalaxies <[email protected]>
AuthorDate: Tue Dec 3 18:54:18 2024 +0800

    Subscription: basic support for table model from SQL to IConfigTask & Pipe: 
fix too many warn logs from findAllStuckPipes() (#14287)
---
 .../iotdb/rpc/subscription/config/TopicConfig.java | 80 ++++++++++++++++----
 .../rpc/subscription/config/TopicConstant.java     |  8 ++
 .../session/subscription/SubscriptionSession.java  | 26 ++++++-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  9 +--
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 15 ++--
 .../iotdb/db/queryengine/plan/Coordinator.java     |  2 +
 .../execution/config/TableConfigTaskVisitor.java   | 37 +++++++++
 .../execution/config/TreeConfigTaskVisitor.java    | 19 +++--
 .../config/executor/ClusterConfigTaskExecutor.java |  4 +-
 .../config/sys/subscription/CreateTopicTask.java   | 12 ++-
 .../config/sys/subscription/DropTopicTask.java     | 11 ++-
 ...riptionTask.java => ShowSubscriptionsTask.java} | 17 +++--
 .../config/sys/subscription/ShowTopicsTask.java    | 12 ++-
 .../relational/analyzer/StatementAnalyzer.java     | 24 ++++++
 .../plan/relational/sql/ast/AstVisitor.java        | 16 ++++
 .../plan/relational/sql/ast/CreateTopic.java       | 87 ++++++++++++++++++++++
 .../plan/relational/sql/ast/DropTopic.java         | 75 +++++++++++++++++++
 .../plan/relational/sql/ast/ShowSubscriptions.java | 66 ++++++++++++++++
 .../plan/relational/sql/ast/ShowTopics.java        | 66 ++++++++++++++++
 .../relational/sql/ast/SubscriptionStatement.java  | 36 +++++++++
 .../plan/relational/sql/parser/AstBuilder.java     | 57 +++++++++++++-
 .../plan/relational/sql/util/SqlFormatter.java     | 70 +++++++++++++++++
 .../queryengine/plan/statement/StatementType.java  |  2 +-
 .../metadata/subscription/DropTopicStatement.java  |  1 +
 .../commons/subscription/meta/topic/TopicMeta.java | 19 +++--
 .../db/relational/grammar/sql/RelationalSql.g4     | 38 +++++++++-
 26 files changed, 748 insertions(+), 61 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 46dc7601e0f..83d18d4155e 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -33,9 +33,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.rpc.subscription.config.TopicConstant.MODE_LIVE_VALUE;
-import static 
org.apache.iotdb.rpc.subscription.config.TopicConstant.MODE_SNAPSHOT_VALUE;
-
 public class TopicConfig extends PipeParameters {
 
   public TopicConfig() {
@@ -46,6 +43,12 @@ public class TopicConfig extends PipeParameters {
     super(attributes);
   }
 
+  // TODO: hide from the client
+  // refer to org.apache.iotdb.commons.pipe.config.constant.SystemConstant
+  private static final String SQL_DIALECT_KEY = "__system.sql-dialect";
+  private static final String SQL_DIALECT_TREE_VALUE = "tree";
+  private static final String SQL_DIALECT_TABLE_VALUE = "table";
+
   private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
       Collections.singletonMap("realtime.mode", "batch");
   private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
@@ -53,11 +56,18 @@ public class TopicConfig extends PipeParameters {
 
   private static final Map<String, String> SINK_TABLET_FORMAT_CONFIG =
       Collections.singletonMap("format", "tablet");
+  private static final Map<String, String> SINK_TS_FILE_FORMAT_CONFIG =
+      Collections.singletonMap("format", "tsfile");
+  private static final Map<String, String> SINK_HYBRID_FORMAT_CONFIG =
+      Collections.singletonMap("format", "hybrid");
 
   private static final Map<String, String> SNAPSHOT_MODE_CONFIG =
-      Collections.singletonMap("mode", MODE_SNAPSHOT_VALUE);
+      Collections.singletonMap("mode", TopicConstant.MODE_SNAPSHOT_VALUE);
   private static final Map<String, String> LIVE_MODE_CONFIG =
-      Collections.singletonMap("mode", MODE_LIVE_VALUE);
+      Collections.singletonMap("mode", TopicConstant.MODE_LIVE_VALUE);
+
+  private static final Map<String, String> STRICT_MODE_CONFIG =
+      Collections.singletonMap("mode.strict", "true");
 
   private static final Set<String> LOOSE_RANGE_KEY_SET;
 
@@ -80,7 +90,19 @@ public class TopicConfig extends PipeParameters {
 
   /////////////////////////////// utilities ///////////////////////////////
 
-  public Map<String, String> getAttributesWithPathOrPattern() {
+  public boolean isTableTopic() {
+    return SQL_DIALECT_TABLE_VALUE.equalsIgnoreCase(
+        attributes.getOrDefault(SQL_DIALECT_KEY, SQL_DIALECT_TREE_VALUE));
+  }
+
+  /////////////////////////////// extractor attributes mapping 
///////////////////////////////
+
+  public Map<String, String> getAttributeWithSqlDialect() {
+    return Collections.singletonMap(
+        SQL_DIALECT_KEY, attributes.getOrDefault(SQL_DIALECT_KEY, 
SQL_DIALECT_TREE_VALUE));
+  }
+
+  public Map<String, String> getAttributesWithSourcePathOrPattern() {
     if (attributes.containsKey(TopicConstant.PATTERN_KEY)) {
       return Collections.singletonMap(
           TopicConstant.PATTERN_KEY, 
attributes.get(TopicConstant.PATTERN_KEY));
@@ -91,7 +113,20 @@ public class TopicConfig extends PipeParameters {
         attributes.getOrDefault(TopicConstant.PATH_KEY, 
TopicConstant.PATH_DEFAULT_VALUE));
   }
 
-  public Map<String, String> getAttributesWithTimeRange() {
+  public Map<String, String> getAttributesWithSourceDatabaseAndTableName() {
+    final Map<String, String> attributes = new HashMap<>();
+    attributes.put(
+        TopicConstant.DATABASE_NAME_KEY,
+        this.attributes.getOrDefault(
+            TopicConstant.DATABASE_NAME_KEY, 
TopicConstant.DATABASE_NAME_DEFAULT_VALUE));
+    attributes.put(
+        TopicConstant.TABLE_NAME_KEY,
+        this.attributes.getOrDefault(
+            TopicConstant.TABLE_NAME_KEY, 
TopicConstant.TABLE_NAME_DEFAULT_VALUE));
+    return attributes;
+  }
+
+  public Map<String, String> getAttributesWithSourceTimeRange() {
     final Map<String, String> attributesWithTimeRange = new HashMap<>();
 
     // there should be no TopicConstant.NOW_TIME_VALUE here
@@ -105,25 +140,36 @@ public class TopicConfig extends PipeParameters {
     return attributesWithTimeRange;
   }
 
-  public Map<String, String> getAttributesWithRealtimeMode() {
+  public Map<String, String> getAttributesWithSourceRealtimeMode() {
     return REALTIME_STREAM_MODE_CONFIG; // default to stream (hybrid)
   }
 
   public Map<String, String> getAttributesWithSourceMode() {
-    return MODE_SNAPSHOT_VALUE.equalsIgnoreCase(
+    return TopicConstant.MODE_SNAPSHOT_VALUE.equalsIgnoreCase(
             attributes.getOrDefault(TopicConstant.MODE_KEY, 
TopicConstant.MODE_DEFAULT_VALUE))
         ? SNAPSHOT_MODE_CONFIG
         : LIVE_MODE_CONFIG;
   }
 
-  public Map<String, String> getAttributesWithSourceLooseRange() {
-    final String looseRangeValue =
-        attributes.getOrDefault(
-            TopicConstant.LOOSE_RANGE_KEY, 
TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
-    return LOOSE_RANGE_KEY_SET.stream()
-        .collect(Collectors.toMap(key -> key, key -> looseRangeValue));
+  public Map<String, String> getAttributesWithSourceLooseRangeOrStrict() {
+    if (attributes.containsKey(TopicConstant.LOOSE_RANGE_KEY)
+        && !attributes.containsKey(TopicConstant.STRICT_KEY)) {
+      // for forwards compatibility
+      final String looseRangeValue =
+          attributes.getOrDefault(
+              TopicConstant.LOOSE_RANGE_KEY, 
TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
+      return LOOSE_RANGE_KEY_SET.stream()
+          .collect(Collectors.toMap(key -> key, key -> looseRangeValue));
+    } else {
+      // only consider strict
+      return Collections.singletonMap(
+          TopicConstant.STRICT_KEY,
+          attributes.getOrDefault(TopicConstant.STRICT_KEY, 
TopicConstant.STRICT_DEFAULT_VALUE));
+    }
   }
 
+  /////////////////////////////// processor attributes mapping 
///////////////////////////////
+
   public Map<String, String> getAttributesWithProcessorPrefix() {
     final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
     attributes.forEach(
@@ -135,7 +181,9 @@ public class TopicConfig extends PipeParameters {
     return attributesWithProcessorPrefix;
   }
 
+  /////////////////////////////// connector attributes mapping 
///////////////////////////////
+
   public Map<String, String> getAttributesWithSinkFormat() {
-    return Collections.emptyMap(); // default to hybrid
+    return SINK_HYBRID_FORMAT_CONFIG; // default to hybrid
   }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
index d159df2983c..e36cb9361f5 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
@@ -26,6 +26,11 @@ public class TopicConstant {
   public static final String PATTERN_KEY = "pattern";
   public static final String PATTERN_DEFAULT_VALUE = "root";
 
+  public static final String DATABASE_NAME_KEY = "database-name";
+  public static final String TABLE_NAME_KEY = "table-name";
+  public static final String DATABASE_NAME_DEFAULT_VALUE = ".*";
+  public static final String TABLE_NAME_DEFAULT_VALUE = ".*";
+
   public static final String START_TIME_KEY = "start-time";
   public static final String END_TIME_KEY = "end-time";
   public static final String NOW_TIME_VALUE = "now";
@@ -46,6 +51,9 @@ public class TopicConstant {
   public static final String LOOSE_RANGE_ALL_VALUE = "all";
   public static final String LOOSE_RANGE_DEFAULT_VALUE = "";
 
+  public static final String STRICT_KEY = "strict";
+  public static final String STRICT_DEFAULT_VALUE = "true";
+
   private TopicConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 47862c947b5..870a91bb297 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -51,7 +51,18 @@ public class SubscriptionSession extends Session {
         port,
         SessionConfig.DEFAULT_USER,
         SessionConfig.DEFAULT_PASSWORD,
-        SessionConfig.DEFAULT_MAX_FRAME_SIZE);
+        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
+        SessionConfig.SQL_DIALECT);
+  }
+
+  public SubscriptionSession(final String host, final int port, final String 
sqlDialect) {
+    this(
+        host,
+        port,
+        SessionConfig.DEFAULT_USER,
+        SessionConfig.DEFAULT_PASSWORD,
+        SessionConfig.DEFAULT_MAX_FRAME_SIZE,
+        sqlDialect);
   }
 
   public SubscriptionSession(
@@ -60,6 +71,16 @@ public class SubscriptionSession extends Session {
       final String username,
       final String password,
       final int thriftMaxFrameSize) {
+    this(host, port, username, password, thriftMaxFrameSize, 
SessionConfig.SQL_DIALECT);
+  }
+
+  private SubscriptionSession(
+      final String host,
+      final int port,
+      final String username,
+      final String password,
+      final int thriftMaxFrameSize,
+      final String sqlDialect) {
     // TODO: more configs control
     super(
         new Session.Builder()
@@ -71,7 +92,8 @@ public class SubscriptionSession extends Session {
             // disable auto fetch
             .enableAutoFetch(false)
             // disable redirection
-            .enableRedirection(false));
+            .enableRedirection(false)
+            .sqlDialect(sqlDialect));
   }
 
   @Override
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 70b7f0c4bed..0e47a2e25f0 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -56,10 +56,8 @@ ddlStatement
     | createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes
     // Pipe Plugin
     | createPipePlugin | dropPipePlugin | showPipePlugins
-    // TOPIC
-    | createTopic | dropTopic | showTopics
     // Subscription
-    | showSubscriptions
+    | createTopic | dropTopic | showTopics | showSubscriptions
     // CQ
     | createContinuousQuery | dropContinuousQuery | showContinuousQueries
     // Cluster
@@ -641,7 +639,8 @@ showPipePlugins
     : SHOW PIPEPLUGINS
     ;
 
-// Topic 
=========================================================================================
+
+// Subscription 
=========================================================================================
 createTopic
     : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause?
     ;
@@ -662,11 +661,11 @@ showTopics
     : SHOW ((TOPIC topicName=identifier) | TOPICS )
     ;
 
-// Subscriptions 
=========================================================================================
 showSubscriptions
     : SHOW SUBSCRIPTIONS (ON topicName=identifier)?
     ;
 
+
 // AI Model 
=========================================================================================
 // ---- Create Model
 createModel
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 518b4b20424..11288549123 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -497,8 +497,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         stuckPipes.add(pipeMeta);
       }
-      LOGGER.warn(
-          "All {} pipe(s) will be restarted because of forced restart 
policy.", stuckPipes.size());
+      if (!stuckPipes.isEmpty()) {
+        LOGGER.warn(
+            "All {} pipe(s) will be restarted because of forced restart 
policy.",
+            stuckPipes.size());
+      }
       return stuckPipes;
     }
 
@@ -507,9 +510,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         stuckPipes.add(pipeMeta);
       }
-      LOGGER.warn(
-          "All {} pipe(s) will be restarted because linked tsfiles' resource 
size exceeds memory limit.",
-          stuckPipes.size());
+      if (!stuckPipes.isEmpty()) {
+        LOGGER.warn(
+            "All {} pipe(s) will be restarted because linked tsfiles' resource 
size exceeds memory limit.",
+            stuckPipes.size());
+      }
       return stuckPipes;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 61be1471b15..eefed656e01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -79,6 +79,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubscriptionStatement;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
@@ -369,6 +370,7 @@ public class Coordinator {
         || statement instanceof ClearCache
         || statement instanceof SetConfiguration
         || statement instanceof PipeStatement
+        || statement instanceof SubscriptionStatement
         || statement instanceof ShowCurrentSqlDialect
         || statement instanceof ShowCurrentUser
         || statement instanceof ShowCurrentDatabase
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 4abaf1bbff5..9173cd74187 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -70,6 +70,10 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.DropPipeTa
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StartPipeTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StopPipeTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer;
 import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
@@ -84,6 +88,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
@@ -92,6 +97,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
@@ -116,7 +122,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StartPipe;
@@ -693,6 +701,35 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
     return new ShowPipePluginsTask();
   }
 
+  @Override
+  protected IConfigTask visitCreateTopic(CreateTopic node, MPPQueryContext 
context) {
+    context.setQueryType(QueryType.WRITE);
+
+    // Inject table model into the topic attributes
+    node.getTopicAttributes()
+        .put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+
+    return new CreateTopicTask(node);
+  }
+
+  @Override
+  protected IConfigTask visitDropTopic(DropTopic node, MPPQueryContext 
context) {
+    context.setQueryType(QueryType.WRITE);
+    return new DropTopicTask(node);
+  }
+
+  @Override
+  protected IConfigTask visitShowTopics(ShowTopics node, MPPQueryContext 
context) {
+    context.setQueryType(QueryType.READ);
+    return new ShowTopicsTask(node);
+  }
+
+  @Override
+  protected IConfigTask visitShowSubscriptions(ShowSubscriptions node, 
MPPQueryContext context) {
+    context.setQueryType(QueryType.READ);
+    return new ShowSubscriptionsTask(node);
+  }
+
   @Override
   protected IConfigTask visitShowCurrentUser(ShowCurrentUser node, 
MPPQueryContext context) {
     context.setQueryType(QueryType.READ);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index f1fed8f7b50..5b1e513089c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -94,7 +94,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpace
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask;
-import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
@@ -516,14 +516,13 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
     return new StopPipeTask(stopPipeStatement);
   }
 
-  @Override
-  public IConfigTask visitShowSubscriptions(
-      ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext 
context) {
-    return new ShowSubscriptionTask(showSubscriptionsStatement);
-  }
-
   public IConfigTask visitCreateTopic(
       CreateTopicStatement createTopicStatement, MPPQueryContext context) {
+    // Inject tree model into the topic attributes
+    createTopicStatement
+        .getTopicAttributes()
+        .put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
+
     return new CreateTopicTask(createTopicStatement);
   }
 
@@ -539,6 +538,12 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
     return new ShowTopicsTask(showTopicsStatement);
   }
 
+  @Override
+  public IConfigTask visitShowSubscriptions(
+      ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext 
context) {
+    return new ShowSubscriptionsTask(showSubscriptionsStatement);
+  }
+
   @Override
   public IConfigTask visitDeleteTimeSeries(
       DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index bf7bc065d49..7e04626ef7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -196,7 +196,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionT
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
-import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
@@ -2212,7 +2212,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         return future;
       }
 
-      ShowSubscriptionTask.buildTSBlock(
+      ShowSubscriptionsTask.buildTSBlock(
           showSubscriptionResp.isSetSubscriptionInfoList()
               ? showSubscriptionResp.getSubscriptionInfoList()
               : Collections.emptyList(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
index 30c5beabf6b..b3bfe6dc57f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -30,12 +31,19 @@ public class CreateTopicTask implements IConfigTask {
 
   private final CreateTopicStatement createTopicStatement;
 
-  public CreateTopicTask(CreateTopicStatement createTopicStatement) {
+  public CreateTopicTask(final CreateTopicStatement createTopicStatement) {
     this.createTopicStatement = createTopicStatement;
   }
 
+  public CreateTopicTask(final CreateTopic createTopic) {
+    this.createTopicStatement = new CreateTopicStatement();
+    this.createTopicStatement.setTopicName(createTopic.getTopicName());
+    
this.createTopicStatement.setIfNotExists(createTopic.hasIfNotExistsCondition());
+    
this.createTopicStatement.setTopicAttributes(createTopic.getTopicAttributes());
+  }
+
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.createTopic(createTopicStatement);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
index 23b699878dd..fe28c387bf7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -30,12 +31,18 @@ public class DropTopicTask implements IConfigTask {
 
   private final DropTopicStatement dropTopicStatement;
 
-  public DropTopicTask(DropTopicStatement dropTopicStatement) {
+  public DropTopicTask(final DropTopicStatement dropTopicStatement) {
     this.dropTopicStatement = dropTopicStatement;
   }
 
+  public DropTopicTask(final DropTopic dropTopic) {
+    this.dropTopicStatement = new DropTopicStatement();
+    this.dropTopicStatement.setTopicName(dropTopic.getTopicName());
+    this.dropTopicStatement.setIfExists(dropTopic.hasIfExistsCondition());
+  }
+
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.dropTopic(dropTopicStatement);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
similarity index 80%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
index 29fe87699f5..dc989b50f24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -38,29 +39,35 @@ import org.apache.tsfile.utils.Binary;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class ShowSubscriptionTask implements IConfigTask {
+public class ShowSubscriptionsTask implements IConfigTask {
 
   private final ShowSubscriptionsStatement showSubscriptionsStatement;
 
-  public ShowSubscriptionTask(ShowSubscriptionsStatement 
showSubscriptionsStatement) {
+  public ShowSubscriptionsTask(final ShowSubscriptionsStatement 
showSubscriptionsStatement) {
     this.showSubscriptionsStatement = showSubscriptionsStatement;
   }
 
+  public ShowSubscriptionsTask(final ShowSubscriptions showSubscriptions) {
+    this.showSubscriptionsStatement = new ShowSubscriptionsStatement();
+    
this.showSubscriptionsStatement.setTopicName(showSubscriptions.getTopicName());
+  }
+
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.showSubscriptions(showSubscriptionsStatement);
   }
 
   public static void buildTSBlock(
-      List<TShowSubscriptionInfo> subscriptionInfoList, 
SettableFuture<ConfigTaskResult> future) {
+      final List<TShowSubscriptionInfo> subscriptionInfoList,
+      final SettableFuture<ConfigTaskResult> future) {
     final TsBlockBuilder builder =
         new TsBlockBuilder(
             ColumnHeaderConstant.showSubscriptionColumnHeaders.stream()
                 .map(ColumnHeader::getColumnType)
                 .collect(Collectors.toList()));
 
-    for (TShowSubscriptionInfo tSubscriptionInfo : subscriptionInfoList) {
+    for (final TShowSubscriptionInfo tSubscriptionInfo : subscriptionInfoList) 
{
       builder.getTimeColumnBuilder().writeLong(0L);
       builder
           .getColumnBuilder(0)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
index e12172034bf..658ff5035e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -42,18 +43,23 @@ public class ShowTopicsTask implements IConfigTask {
 
   private final ShowTopicsStatement showTopicsStatement;
 
-  public ShowTopicsTask(ShowTopicsStatement showTopicsStatement) {
+  public ShowTopicsTask(final ShowTopicsStatement showTopicsStatement) {
     this.showTopicsStatement = showTopicsStatement;
   }
 
+  public ShowTopicsTask(final ShowTopics showTopics) {
+    this.showTopicsStatement = new ShowTopicsStatement();
+    this.showTopicsStatement.setTopicName(showTopics.getTopicName());
+  }
+
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.showTopics(showTopicsStatement);
   }
 
   public static void buildTSBlock(
-      List<TShowTopicInfo> topicInfoList, SettableFuture<ConfigTaskResult> 
future) {
+      final List<TShowTopicInfo> topicInfoList, final 
SettableFuture<ConfigTaskResult> future) {
     final TsBlockBuilder builder =
         new TsBlockBuilder(
             ColumnHeaderConstant.showTopicColumnHeaders.stream()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 928354dee10..ade7aacbb1e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -58,6 +58,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateOrUpdateDev
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression;
@@ -69,6 +70,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
@@ -117,7 +119,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleGroupBy;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SingleColumn;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem;
@@ -3101,6 +3105,26 @@ public class StatementAnalyzer {
     protected Scope visitShowPipePlugins(ShowPipePlugins node, Optional<Scope> 
context) {
       return createAndAssignScope(node, context);
     }
+
+    @Override
+    protected Scope visitCreateTopic(CreateTopic node, Optional<Scope> 
context) {
+      return createAndAssignScope(node, context);
+    }
+
+    @Override
+    protected Scope visitDropTopic(DropTopic node, Optional<Scope> context) {
+      return createAndAssignScope(node, context);
+    }
+
+    @Override
+    protected Scope visitShowTopics(ShowTopics node, Optional<Scope> context) {
+      return createAndAssignScope(node, context);
+    }
+
+    @Override
+    protected Scope visitShowSubscriptions(ShowSubscriptions node, 
Optional<Scope> context) {
+      return createAndAssignScope(node, context);
+    }
   }
 
   private static boolean hasScopeAsLocalParent(Scope root, Scope parent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 66bcd7a43e2..1a2e3b9f3a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -561,6 +561,22 @@ public abstract class AstVisitor<R, C> {
     return visitStatement(node, context);
   }
 
+  protected R visitCreateTopic(CreateTopic node, C context) {
+    return visitStatement(node, context);
+  }
+
+  protected R visitDropTopic(DropTopic node, C context) {
+    return visitStatement(node, context);
+  }
+
+  protected R visitShowTopics(ShowTopics node, C context) {
+    return visitStatement(node, context);
+  }
+
+  protected R visitShowSubscriptions(ShowSubscriptions node, C context) {
+    return visitStatement(node, context);
+  }
+
   protected R visitShowVersion(ShowVersion node, C context) {
     return visitStatement(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreateTopic.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreateTopic.java
new file mode 100644
index 00000000000..57801700291
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreateTopic.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.Map;
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CreateTopic extends SubscriptionStatement {
+
+  private final String topicName;
+  private final boolean ifNotExistsCondition;
+  private final Map<String, String> topicAttributes;
+
+  public CreateTopic(
+      final String topicName,
+      final boolean ifNotExistsCondition,
+      final Map<String, String> topicAttributes) {
+    this.topicName = requireNonNull(topicName, "topic name can not be null");
+    this.ifNotExistsCondition = ifNotExistsCondition;
+    this.topicAttributes = requireNonNull(topicAttributes, "topic attributes 
can not be null");
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public boolean hasIfNotExistsCondition() {
+    return ifNotExistsCondition;
+  }
+
+  public Map<String, String> getTopicAttributes() {
+    return topicAttributes;
+  }
+
+  @Override
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+    return visitor.visitCreateTopic(this, context);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(topicAttributes, ifNotExistsCondition, 
topicAttributes);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final CreateTopic that = (CreateTopic) obj;
+    return Objects.equals(this.topicName, that.topicName)
+        && Objects.equals(this.ifNotExistsCondition, that.ifNotExistsCondition)
+        && Objects.equals(this.topicAttributes, that.topicAttributes);
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("pipeName", topicName)
+        .add("ifNotExistsCondition", ifNotExistsCondition)
+        .add("topicAttributes", topicAttributes)
+        .toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropTopic.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropTopic.java
new file mode 100644
index 00000000000..0ec50fde79a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropTopic.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class DropTopic extends SubscriptionStatement {
+
+  private final String topicName;
+  private final boolean ifExistsCondition;
+
+  public DropTopic(final String topicName, final boolean ifExistsCondition) {
+    this.topicName = requireNonNull(topicName, "topic name can not be null");
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
+  @Override
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+    return visitor.visitDropTopic(this, context);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(topicName, ifExistsCondition);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final DropTopic that = (DropTopic) obj;
+    return Objects.equals(this.topicName, that.topicName)
+        && Objects.equals(this.ifExistsCondition, that.ifExistsCondition);
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this)
+        .add("topicName", topicName)
+        .add("ifExistsCondition", ifExistsCondition)
+        .toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowSubscriptions.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowSubscriptions.java
new file mode 100644
index 00000000000..c16a9836d89
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowSubscriptions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class ShowSubscriptions extends SubscriptionStatement {
+
+  private final String topicName;
+
+  public ShowSubscriptions(@Nullable final String topicName) {
+    this.topicName = topicName;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  @Override
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+    return visitor.visitShowSubscriptions(this, context);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(topicName);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final ShowSubscriptions that = (ShowSubscriptions) obj;
+    return Objects.equals(this.topicName, that.topicName);
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this).add("topicName", topicName).toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowTopics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowTopics.java
new file mode 100644
index 00000000000..f12e6f2a0d2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowTopics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class ShowTopics extends SubscriptionStatement {
+
+  private final String topicName;
+
+  public ShowTopics(@Nullable final String topicName) {
+    this.topicName = topicName;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  @Override
+  public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+    return visitor.visitShowTopics(this, context);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(topicName);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final ShowTopics that = (ShowTopics) obj;
+    return Objects.equals(this.topicName, that.topicName);
+  }
+
+  @Override
+  public String toString() {
+    return toStringHelper(this).add("topicName", topicName).toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SubscriptionStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SubscriptionStatement.java
new file mode 100644
index 00000000000..077ab4be887
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SubscriptionStatement.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+public abstract class SubscriptionStatement extends Statement {
+
+  protected SubscriptionStatement() {
+    super(null);
+  }
+
+  @Override
+  public List<? extends Node> getChildren() {
+    return ImmutableList.of();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 3a11a476168..2408ff12907 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -49,6 +49,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentDatabase;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentTime;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentUser;
@@ -65,6 +66,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
@@ -136,7 +138,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression;
@@ -666,13 +670,13 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
 
   private Map<String, String> parseExtractorAttributesClause(
       List<RelationalSqlParser.ExtractorAttributeClauseContext> contexts) {
-    final Map<String, String> collectorMap = new HashMap<>();
+    final Map<String, String> extractorMap = new HashMap<>();
     for (RelationalSqlParser.ExtractorAttributeClauseContext context : 
contexts) {
-      collectorMap.put(
+      extractorMap.put(
           ((StringLiteral) visit(context.extractorKey)).getValue(),
           ((StringLiteral) visit(context.extractorValue)).getValue());
     }
-    return collectorMap;
+    return extractorMap;
   }
 
   private Map<String, String> parseProcessorAttributesClause(
@@ -814,6 +818,53 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
     return new ShowPipePlugins();
   }
 
+  @Override
+  public Node 
visitCreateTopicStatement(RelationalSqlParser.CreateTopicStatementContext ctx) {
+    final String topicName = ((Identifier) visit(ctx.identifier())).getValue();
+    final boolean hasIfNotExistsCondition =
+        ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null;
+
+    final Map<String, String> topicAttributes =
+        ctx.topicAttributesClause() != null
+            ? 
parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause())
+            : new HashMap<>(); // DO NOT USE Collections.emptyMap() here
+
+    return new CreateTopic(topicName, hasIfNotExistsCondition, 
topicAttributes);
+  }
+
+  private Map<String, String> parseTopicAttributesClause(
+      List<RelationalSqlParser.TopicAttributeClauseContext> contexts) {
+    final Map<String, String> tppicMap = new HashMap<>();
+    for (RelationalSqlParser.TopicAttributeClauseContext context : contexts) {
+      tppicMap.put(
+          ((StringLiteral) visit(context.topicKey)).getValue(),
+          ((StringLiteral) visit(context.topicValue)).getValue());
+    }
+    return tppicMap;
+  }
+
+  @Override
+  public Node 
visitDropTopicStatement(RelationalSqlParser.DropTopicStatementContext ctx) {
+    final String topicName = ((Identifier) visit(ctx.identifier())).getValue();
+    final boolean hasIfExistsCondition = ctx.IF() != null && ctx.EXISTS() != 
null;
+    return new DropTopic(topicName, hasIfExistsCondition);
+  }
+
+  @Override
+  public Node 
visitShowTopicsStatement(RelationalSqlParser.ShowTopicsStatementContext ctx) {
+    final String topicName =
+        
getIdentifierIfPresent(ctx.identifier()).map(Identifier::getValue).orElse(null);
+    return new ShowTopics(topicName);
+  }
+
+  @Override
+  public Node visitShowSubscriptionsStatement(
+      RelationalSqlParser.ShowSubscriptionsStatementContext ctx) {
+    final String topicName =
+        
getIdentifierIfPresent(ctx.identifier()).map(Identifier::getValue).orElse(null);
+    return new ShowSubscriptions(topicName);
+  }
+
   @Override
   public Node visitShowDevicesStatement(final 
RelationalSqlParser.ShowDevicesStatementContext ctx) {
     return new ShowDevice(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
index 4db8b41cf2e..7c0fefc8c6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
@@ -37,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
@@ -75,7 +77,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SingleColumn;
@@ -96,6 +100,7 @@ import 
com.google.errorprone.annotations.CanIgnoreReturnValue;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -1055,6 +1060,8 @@ public final class SqlFormatter {
     protected Void visitShowPipes(ShowPipes node, Integer context) {
       builder.append("SHOW PIPES");
 
+      // TODO: consider pipeName and hasWhereClause in node
+
       return null;
     }
 
@@ -1096,6 +1103,69 @@ public final class SqlFormatter {
       return null;
     }
 
+    @Override
+    protected Void visitCreateTopic(CreateTopic node, Integer context) {
+      builder.append("CREATE TOPIC ");
+      if (node.hasIfNotExistsCondition()) {
+        builder.append("IF NOT EXISTS ");
+      }
+      builder.append(node.getTopicName());
+      builder.append(" \n");
+
+      if (!node.getTopicAttributes().isEmpty()) {
+        builder
+            .append("WITH (")
+            .append("\n")
+            .append(
+                node.getTopicAttributes().entrySet().stream()
+                    .map(
+                        entry ->
+                            indentString(1)
+                                + "\""
+                                + entry.getKey()
+                                + "\" = \""
+                                + entry.getValue()
+                                + "\"")
+                    .collect(joining(", " + "\n")))
+            .append(")\n");
+      }
+
+      return null;
+    }
+
+    @Override
+    protected Void visitDropTopic(DropTopic node, Integer context) {
+      builder.append("DROP TOPIC ");
+      if (node.hasIfExistsCondition()) {
+        builder.append("IF EXISTS ");
+      }
+      builder.append(node.getTopicName());
+
+      return null;
+    }
+
+    @Override
+    protected Void visitShowTopics(ShowTopics node, Integer context) {
+      if (Objects.isNull(node.getTopicName())) {
+        builder.append("SHOW TOPICS");
+      } else {
+        builder.append("SHOW TOPIC ").append(node.getTopicName());
+      }
+
+      return null;
+    }
+
+    @Override
+    protected Void visitShowSubscriptions(ShowSubscriptions node, Integer 
context) {
+      if (Objects.isNull(node.getTopicName())) {
+        builder.append("SHOW SUBSCRIPTIONS");
+      } else {
+        builder.append("SHOW SUBSCRIPTIONS ON ").append(node.getTopicName());
+      }
+
+      return null;
+    }
+
     private void appendBeginLabel(Optional<Identifier> label) {
       label.ifPresent(value -> builder.append(formatName(value)).append(": "));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 4f49a5909e7..296fd1137ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -182,7 +182,7 @@ public enum StatementType {
   CREATE_TOPIC,
   DROP_TOPIC,
   SHOW_TOPICS,
-
   SHOW_SUBSCRIPTIONS,
+
   SET_CONFIGURATION
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
index 36525b1846e..495a0167cbb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 
 public class DropTopicStatement extends Statement implements IConfigStatement {
+
   private String topicName;
   private boolean ifExistsCondition;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 3e409a9160e..f93fbb074f4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -186,16 +186,23 @@ public class TopicMeta {
     extractorAttributes.put("source", "iotdb-source");
     extractorAttributes.put("inclusion", "data.insert");
     extractorAttributes.put("inclusion.exclusion", "data.delete");
-    // path
-    extractorAttributes.putAll(config.getAttributesWithPathOrPattern());
+    // sql dialect
+    extractorAttributes.putAll(config.getAttributeWithSqlDialect());
+    if (config.isTableTopic()) {
+      // table model: database name and table name
+      
extractorAttributes.putAll(config.getAttributesWithSourceDatabaseAndTableName());
+    } else {
+      // tree model: path or pattern
+      
extractorAttributes.putAll(config.getAttributesWithSourcePathOrPattern());
+    }
     // time
-    extractorAttributes.putAll(config.getAttributesWithTimeRange());
+    extractorAttributes.putAll(config.getAttributesWithSourceTimeRange());
     // realtime mode
-    extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
+    extractorAttributes.putAll(config.getAttributesWithSourceRealtimeMode());
     // source mode
     extractorAttributes.putAll(config.getAttributesWithSourceMode());
-    // loose range
-    extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
+    // loose range or strict
+    
extractorAttributes.putAll(config.getAttributesWithSourceLooseRangeOrStrict());
     return extractorAttributes;
   }
 
diff --git 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index 5aa8f80fe7a..a0011275212 100644
--- 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++ 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -85,6 +85,12 @@ statement
     | dropPipePluginStatement
     | showPipePluginsStatement
 
+    // Subscription Statement
+    | createTopicStatement
+    | dropTopicStatement
+    | showTopicsStatement
+    | showSubscriptionsStatement
+
     // Show Statement
     | showDevicesStatement
     | countDevicesStatement
@@ -366,6 +372,31 @@ showPipePluginsStatement
     ;
 
 
+// -------------------------------------------- Subscription Statement 
---------------------------------------------------------
+createTopicStatement
+    : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause?
+    ;
+
+topicAttributesClause
+    : WITH '(' topicAttributeClause (',' topicAttributeClause)* ')'
+    ;
+
+topicAttributeClause
+    : topicKey=string EQ topicValue=string
+    ;
+
+dropTopicStatement
+    : DROP TOPIC (IF EXISTS)? topicName=identifier
+    ;
+
+showTopicsStatement
+    : SHOW ((TOPIC topicName=identifier) | TOPICS )
+    ;
+
+showSubscriptionsStatement
+    : SHOW SUBSCRIPTIONS (ON topicName=identifier)?
+    ;
+
 
 // -------------------------------------------- Show Statement 
---------------------------------------------------------
 showDevicesStatement
@@ -914,8 +945,8 @@ nonReserved
     | QUERIES | QUERY | QUOTES
     | RANGE | READ | READONLY | REFRESH | REGION | REGIONID | REGIONS | RENAME 
| REPAIR | REPEAT  | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN 
| RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROW | ROWS | RUNNING
     | SERIESSLOTID | SCALAR | SCHEMA | SCHEMAS | SECOND | SECURITY | SEEK | 
SERIALIZABLE | SESSION | SET | SETS
-    | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSET | SUBSTRING 
| SYSTEM
-    | TABLES | TABLESAMPLE | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION 
| TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TRAILING | TRANSACTION | TRUNCATE 
| TRY_CAST | TYPE
+    | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSCRIPTIONS | 
SUBSET | SUBSTRING | SYSTEM
+    | TABLES | TABLESAMPLE | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION 
| TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TOPIC | TOPICS | TRAILING | 
TRANSACTION | TRUNCATE | TRY_CAST | TYPE
     | UNBOUNDED | UNCOMMITTED | UNCONDITIONAL | UNIQUE | UNKNOWN | UNMATCHED | 
UNTIL | UPDATE | URI | USE | USED | USER | UTF16 | UTF32 | UTF8
     | VALIDATE | VALUE | VARIABLES | VARIATION | VERBOSE | VERSION | VIEW
     | WEEK | WHILE | WINDOW | WITHIN | WITHOUT | WORK | WRAPPER | WRITE
@@ -1224,6 +1255,7 @@ SOURCE: 'SOURCE';
 START: 'START';
 STATS: 'STATS';
 STOP: 'STOP';
+SUBSCRIPTIONS: 'SUBSCRIPTIONS';
 SUBSET: 'SUBSET';
 SUBSTRING: 'SUBSTRING';
 SYSTEM: 'SYSTEM';
@@ -1242,6 +1274,8 @@ TIMESERIES: 'TIMESERIES';
 TIMESLOTID: 'TIMESLOTID';
 TIMESTAMP: 'TIMESTAMP';
 TO: 'TO';
+TOPIC: 'TOPIC';
+TOPICS: 'TOPICS';
 TRAILING: 'TRAILING';
 TRANSACTION: 'TRANSACTION';
 TRIM: 'TRIM';

Reply via email to