This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 079fc8fd085 Subscription: basic support for table model from SQL to
IConfigTask & Pipe: fix too many warn logs from findAllStuckPipes() (#14287)
079fc8fd085 is described below
commit 079fc8fd085ce26921935656c0485858ea0959f0
Author: VGalaxies <[email protected]>
AuthorDate: Tue Dec 3 18:54:18 2024 +0800
Subscription: basic support for table model from SQL to IConfigTask & Pipe:
fix too many warn logs from findAllStuckPipes() (#14287)
---
.../iotdb/rpc/subscription/config/TopicConfig.java | 80 ++++++++++++++++----
.../rpc/subscription/config/TopicConstant.java | 8 ++
.../session/subscription/SubscriptionSession.java | 26 ++++++-
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 9 +--
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 15 ++--
.../iotdb/db/queryengine/plan/Coordinator.java | 2 +
.../execution/config/TableConfigTaskVisitor.java | 37 +++++++++
.../execution/config/TreeConfigTaskVisitor.java | 19 +++--
.../config/executor/ClusterConfigTaskExecutor.java | 4 +-
.../config/sys/subscription/CreateTopicTask.java | 12 ++-
.../config/sys/subscription/DropTopicTask.java | 11 ++-
...riptionTask.java => ShowSubscriptionsTask.java} | 17 +++--
.../config/sys/subscription/ShowTopicsTask.java | 12 ++-
.../relational/analyzer/StatementAnalyzer.java | 24 ++++++
.../plan/relational/sql/ast/AstVisitor.java | 16 ++++
.../plan/relational/sql/ast/CreateTopic.java | 87 ++++++++++++++++++++++
.../plan/relational/sql/ast/DropTopic.java | 75 +++++++++++++++++++
.../plan/relational/sql/ast/ShowSubscriptions.java | 66 ++++++++++++++++
.../plan/relational/sql/ast/ShowTopics.java | 66 ++++++++++++++++
.../relational/sql/ast/SubscriptionStatement.java | 36 +++++++++
.../plan/relational/sql/parser/AstBuilder.java | 57 +++++++++++++-
.../plan/relational/sql/util/SqlFormatter.java | 70 +++++++++++++++++
.../queryengine/plan/statement/StatementType.java | 2 +-
.../metadata/subscription/DropTopicStatement.java | 1 +
.../commons/subscription/meta/topic/TopicMeta.java | 19 +++--
.../db/relational/grammar/sql/RelationalSql.g4 | 38 +++++++++-
26 files changed, 748 insertions(+), 61 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index 46dc7601e0f..83d18d4155e 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -33,9 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.rpc.subscription.config.TopicConstant.MODE_LIVE_VALUE;
-import static
org.apache.iotdb.rpc.subscription.config.TopicConstant.MODE_SNAPSHOT_VALUE;
-
public class TopicConfig extends PipeParameters {
public TopicConfig() {
@@ -46,6 +43,12 @@ public class TopicConfig extends PipeParameters {
super(attributes);
}
+ // TODO: hide from the client
+ // refer to org.apache.iotdb.commons.pipe.config.constant.SystemConstant
+ private static final String SQL_DIALECT_KEY = "__system.sql-dialect";
+ private static final String SQL_DIALECT_TREE_VALUE = "tree";
+ private static final String SQL_DIALECT_TABLE_VALUE = "table";
+
private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
Collections.singletonMap("realtime.mode", "batch");
private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
@@ -53,11 +56,18 @@ public class TopicConfig extends PipeParameters {
private static final Map<String, String> SINK_TABLET_FORMAT_CONFIG =
Collections.singletonMap("format", "tablet");
+ private static final Map<String, String> SINK_TS_FILE_FORMAT_CONFIG =
+ Collections.singletonMap("format", "tsfile");
+ private static final Map<String, String> SINK_HYBRID_FORMAT_CONFIG =
+ Collections.singletonMap("format", "hybrid");
private static final Map<String, String> SNAPSHOT_MODE_CONFIG =
- Collections.singletonMap("mode", MODE_SNAPSHOT_VALUE);
+ Collections.singletonMap("mode", TopicConstant.MODE_SNAPSHOT_VALUE);
private static final Map<String, String> LIVE_MODE_CONFIG =
- Collections.singletonMap("mode", MODE_LIVE_VALUE);
+ Collections.singletonMap("mode", TopicConstant.MODE_LIVE_VALUE);
+
+ private static final Map<String, String> STRICT_MODE_CONFIG =
+ Collections.singletonMap("mode.strict", "true");
private static final Set<String> LOOSE_RANGE_KEY_SET;
@@ -80,7 +90,19 @@ public class TopicConfig extends PipeParameters {
/////////////////////////////// utilities ///////////////////////////////
- public Map<String, String> getAttributesWithPathOrPattern() {
+ public boolean isTableTopic() {
+ return SQL_DIALECT_TABLE_VALUE.equalsIgnoreCase(
+ attributes.getOrDefault(SQL_DIALECT_KEY, SQL_DIALECT_TREE_VALUE));
+ }
+
+ /////////////////////////////// extractor attributes mapping
///////////////////////////////
+
+ public Map<String, String> getAttributeWithSqlDialect() {
+ return Collections.singletonMap(
+ SQL_DIALECT_KEY, attributes.getOrDefault(SQL_DIALECT_KEY,
SQL_DIALECT_TREE_VALUE));
+ }
+
+ public Map<String, String> getAttributesWithSourcePathOrPattern() {
if (attributes.containsKey(TopicConstant.PATTERN_KEY)) {
return Collections.singletonMap(
TopicConstant.PATTERN_KEY,
attributes.get(TopicConstant.PATTERN_KEY));
@@ -91,7 +113,20 @@ public class TopicConfig extends PipeParameters {
attributes.getOrDefault(TopicConstant.PATH_KEY,
TopicConstant.PATH_DEFAULT_VALUE));
}
- public Map<String, String> getAttributesWithTimeRange() {
+ public Map<String, String> getAttributesWithSourceDatabaseAndTableName() {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(
+ TopicConstant.DATABASE_NAME_KEY,
+ this.attributes.getOrDefault(
+ TopicConstant.DATABASE_NAME_KEY,
TopicConstant.DATABASE_NAME_DEFAULT_VALUE));
+ attributes.put(
+ TopicConstant.TABLE_NAME_KEY,
+ this.attributes.getOrDefault(
+ TopicConstant.TABLE_NAME_KEY,
TopicConstant.TABLE_NAME_DEFAULT_VALUE));
+ return attributes;
+ }
+
+ public Map<String, String> getAttributesWithSourceTimeRange() {
final Map<String, String> attributesWithTimeRange = new HashMap<>();
// there should be no TopicConstant.NOW_TIME_VALUE here
@@ -105,25 +140,36 @@ public class TopicConfig extends PipeParameters {
return attributesWithTimeRange;
}
- public Map<String, String> getAttributesWithRealtimeMode() {
+ public Map<String, String> getAttributesWithSourceRealtimeMode() {
return REALTIME_STREAM_MODE_CONFIG; // default to stream (hybrid)
}
public Map<String, String> getAttributesWithSourceMode() {
- return MODE_SNAPSHOT_VALUE.equalsIgnoreCase(
+ return TopicConstant.MODE_SNAPSHOT_VALUE.equalsIgnoreCase(
attributes.getOrDefault(TopicConstant.MODE_KEY,
TopicConstant.MODE_DEFAULT_VALUE))
? SNAPSHOT_MODE_CONFIG
: LIVE_MODE_CONFIG;
}
- public Map<String, String> getAttributesWithSourceLooseRange() {
- final String looseRangeValue =
- attributes.getOrDefault(
- TopicConstant.LOOSE_RANGE_KEY,
TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
- return LOOSE_RANGE_KEY_SET.stream()
- .collect(Collectors.toMap(key -> key, key -> looseRangeValue));
+ public Map<String, String> getAttributesWithSourceLooseRangeOrStrict() {
+ if (attributes.containsKey(TopicConstant.LOOSE_RANGE_KEY)
+ && !attributes.containsKey(TopicConstant.STRICT_KEY)) {
+ // for forwards compatibility
+ final String looseRangeValue =
+ attributes.getOrDefault(
+ TopicConstant.LOOSE_RANGE_KEY,
TopicConstant.LOOSE_RANGE_DEFAULT_VALUE);
+ return LOOSE_RANGE_KEY_SET.stream()
+ .collect(Collectors.toMap(key -> key, key -> looseRangeValue));
+ } else {
+ // only consider strict
+ return Collections.singletonMap(
+ TopicConstant.STRICT_KEY,
+ attributes.getOrDefault(TopicConstant.STRICT_KEY,
TopicConstant.STRICT_DEFAULT_VALUE));
+ }
}
+ /////////////////////////////// processor attributes mapping
///////////////////////////////
+
public Map<String, String> getAttributesWithProcessorPrefix() {
final Map<String, String> attributesWithProcessorPrefix = new HashMap<>();
attributes.forEach(
@@ -135,7 +181,9 @@ public class TopicConfig extends PipeParameters {
return attributesWithProcessorPrefix;
}
+ /////////////////////////////// connector attributes mapping
///////////////////////////////
+
public Map<String, String> getAttributesWithSinkFormat() {
- return Collections.emptyMap(); // default to hybrid
+ return SINK_HYBRID_FORMAT_CONFIG; // default to hybrid
}
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
index d159df2983c..e36cb9361f5 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
@@ -26,6 +26,11 @@ public class TopicConstant {
public static final String PATTERN_KEY = "pattern";
public static final String PATTERN_DEFAULT_VALUE = "root";
+ public static final String DATABASE_NAME_KEY = "database-name";
+ public static final String TABLE_NAME_KEY = "table-name";
+ public static final String DATABASE_NAME_DEFAULT_VALUE = ".*";
+ public static final String TABLE_NAME_DEFAULT_VALUE = ".*";
+
public static final String START_TIME_KEY = "start-time";
public static final String END_TIME_KEY = "end-time";
public static final String NOW_TIME_VALUE = "now";
@@ -46,6 +51,9 @@ public class TopicConstant {
public static final String LOOSE_RANGE_ALL_VALUE = "all";
public static final String LOOSE_RANGE_DEFAULT_VALUE = "";
+ public static final String STRICT_KEY = "strict";
+ public static final String STRICT_DEFAULT_VALUE = "true";
+
private TopicConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 47862c947b5..870a91bb297 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -51,7 +51,18 @@ public class SubscriptionSession extends Session {
port,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
- SessionConfig.DEFAULT_MAX_FRAME_SIZE);
+ SessionConfig.DEFAULT_MAX_FRAME_SIZE,
+ SessionConfig.SQL_DIALECT);
+ }
+
+ public SubscriptionSession(final String host, final int port, final String
sqlDialect) {
+ this(
+ host,
+ port,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ SessionConfig.DEFAULT_MAX_FRAME_SIZE,
+ sqlDialect);
}
public SubscriptionSession(
@@ -60,6 +71,16 @@ public class SubscriptionSession extends Session {
final String username,
final String password,
final int thriftMaxFrameSize) {
+ this(host, port, username, password, thriftMaxFrameSize,
SessionConfig.SQL_DIALECT);
+ }
+
+ private SubscriptionSession(
+ final String host,
+ final int port,
+ final String username,
+ final String password,
+ final int thriftMaxFrameSize,
+ final String sqlDialect) {
// TODO: more configs control
super(
new Session.Builder()
@@ -71,7 +92,8 @@ public class SubscriptionSession extends Session {
// disable auto fetch
.enableAutoFetch(false)
// disable redirection
- .enableRedirection(false));
+ .enableRedirection(false)
+ .sqlDialect(sqlDialect));
}
@Override
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 70b7f0c4bed..0e47a2e25f0 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -56,10 +56,8 @@ ddlStatement
| createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes
// Pipe Plugin
| createPipePlugin | dropPipePlugin | showPipePlugins
- // TOPIC
- | createTopic | dropTopic | showTopics
// Subscription
- | showSubscriptions
+ | createTopic | dropTopic | showTopics | showSubscriptions
// CQ
| createContinuousQuery | dropContinuousQuery | showContinuousQueries
// Cluster
@@ -641,7 +639,8 @@ showPipePlugins
: SHOW PIPEPLUGINS
;
-// Topic
=========================================================================================
+
+// Subscription
=========================================================================================
createTopic
: CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause?
;
@@ -662,11 +661,11 @@ showTopics
: SHOW ((TOPIC topicName=identifier) | TOPICS )
;
-// Subscriptions
=========================================================================================
showSubscriptions
: SHOW SUBSCRIPTIONS (ON topicName=identifier)?
;
+
// AI Model
=========================================================================================
// ---- Create Model
createModel
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 518b4b20424..11288549123 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -497,8 +497,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
stuckPipes.add(pipeMeta);
}
- LOGGER.warn(
- "All {} pipe(s) will be restarted because of forced restart
policy.", stuckPipes.size());
+ if (!stuckPipes.isEmpty()) {
+ LOGGER.warn(
+ "All {} pipe(s) will be restarted because of forced restart
policy.",
+ stuckPipes.size());
+ }
return stuckPipes;
}
@@ -507,9 +510,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
stuckPipes.add(pipeMeta);
}
- LOGGER.warn(
- "All {} pipe(s) will be restarted because linked tsfiles' resource
size exceeds memory limit.",
- stuckPipes.size());
+ if (!stuckPipes.isEmpty()) {
+ LOGGER.warn(
+ "All {} pipe(s) will be restarted because linked tsfiles' resource
size exceeds memory limit.",
+ stuckPipes.size());
+ }
return stuckPipes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 61be1471b15..eefed656e01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -79,6 +79,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubscriptionStatement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
@@ -369,6 +370,7 @@ public class Coordinator {
|| statement instanceof ClearCache
|| statement instanceof SetConfiguration
|| statement instanceof PipeStatement
+ || statement instanceof SubscriptionStatement
|| statement instanceof ShowCurrentSqlDialect
|| statement instanceof ShowCurrentUser
|| statement instanceof ShowCurrentDatabase
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 4abaf1bbff5..9173cd74187 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -70,6 +70,10 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.DropPipeTa
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StartPipeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.StopPipeTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer;
import
org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
@@ -84,6 +88,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
@@ -92,6 +97,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
@@ -116,7 +122,9 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDataNodes;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StartPipe;
@@ -693,6 +701,35 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
return new ShowPipePluginsTask();
}
+ @Override
+ protected IConfigTask visitCreateTopic(CreateTopic node, MPPQueryContext
context) {
+ context.setQueryType(QueryType.WRITE);
+
+ // Inject table model into the topic attributes
+ node.getTopicAttributes()
+ .put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+
+ return new CreateTopicTask(node);
+ }
+
+ @Override
+ protected IConfigTask visitDropTopic(DropTopic node, MPPQueryContext
context) {
+ context.setQueryType(QueryType.WRITE);
+ return new DropTopicTask(node);
+ }
+
+ @Override
+ protected IConfigTask visitShowTopics(ShowTopics node, MPPQueryContext
context) {
+ context.setQueryType(QueryType.READ);
+ return new ShowTopicsTask(node);
+ }
+
+ @Override
+ protected IConfigTask visitShowSubscriptions(ShowSubscriptions node,
MPPQueryContext context) {
+ context.setQueryType(QueryType.READ);
+ return new ShowSubscriptionsTask(node);
+ }
+
@Override
protected IConfigTask visitShowCurrentUser(ShowCurrentUser node,
MPPQueryContext context) {
context.setQueryType(QueryType.READ);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index f1fed8f7b50..5b1e513089c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -94,7 +94,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpace
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask;
-import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
@@ -516,14 +516,13 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
return new StopPipeTask(stopPipeStatement);
}
- @Override
- public IConfigTask visitShowSubscriptions(
- ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext
context) {
- return new ShowSubscriptionTask(showSubscriptionsStatement);
- }
-
public IConfigTask visitCreateTopic(
CreateTopicStatement createTopicStatement, MPPQueryContext context) {
+ // Inject tree model into the topic attributes
+ createTopicStatement
+ .getTopicAttributes()
+ .put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
+
return new CreateTopicTask(createTopicStatement);
}
@@ -539,6 +538,12 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
return new ShowTopicsTask(showTopicsStatement);
}
+ @Override
+ public IConfigTask visitShowSubscriptions(
+ ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext
context) {
+ return new ShowSubscriptionsTask(showSubscriptionsStatement);
+ }
+
@Override
public IConfigTask visitDeleteTimeSeries(
DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index bf7bc065d49..7e04626ef7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -196,7 +196,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionT
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
-import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
@@ -2212,7 +2212,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
- ShowSubscriptionTask.buildTSBlock(
+ ShowSubscriptionsTask.buildTSBlock(
showSubscriptionResp.isSetSubscriptionInfoList()
? showSubscriptionResp.getSubscriptionInfoList()
: Collections.emptyList(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
index 30c5beabf6b..b3bfe6dc57f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/CreateTopicTask.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
import com.google.common.util.concurrent.ListenableFuture;
@@ -30,12 +31,19 @@ public class CreateTopicTask implements IConfigTask {
private final CreateTopicStatement createTopicStatement;
- public CreateTopicTask(CreateTopicStatement createTopicStatement) {
+ public CreateTopicTask(final CreateTopicStatement createTopicStatement) {
this.createTopicStatement = createTopicStatement;
}
+ public CreateTopicTask(final CreateTopic createTopic) {
+ this.createTopicStatement = new CreateTopicStatement();
+ this.createTopicStatement.setTopicName(createTopic.getTopicName());
+
this.createTopicStatement.setIfNotExists(createTopic.hasIfNotExistsCondition());
+
this.createTopicStatement.setTopicAttributes(createTopic.getTopicAttributes());
+ }
+
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.createTopic(createTopicStatement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
index 23b699878dd..fe28c387bf7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropTopicTask.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
import com.google.common.util.concurrent.ListenableFuture;
@@ -30,12 +31,18 @@ public class DropTopicTask implements IConfigTask {
private final DropTopicStatement dropTopicStatement;
- public DropTopicTask(DropTopicStatement dropTopicStatement) {
+ public DropTopicTask(final DropTopicStatement dropTopicStatement) {
this.dropTopicStatement = dropTopicStatement;
}
+ public DropTopicTask(final DropTopic dropTopic) {
+ this.dropTopicStatement = new DropTopicStatement();
+ this.dropTopicStatement.setTopicName(dropTopic.getTopicName());
+ this.dropTopicStatement.setIfExists(dropTopic.hasIfExistsCondition());
+ }
+
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.dropTopic(dropTopicStatement);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
similarity index 80%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
index 29fe87699f5..dc989b50f24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -38,29 +39,35 @@ import org.apache.tsfile.utils.Binary;
import java.util.List;
import java.util.stream.Collectors;
-public class ShowSubscriptionTask implements IConfigTask {
+public class ShowSubscriptionsTask implements IConfigTask {
private final ShowSubscriptionsStatement showSubscriptionsStatement;
- public ShowSubscriptionTask(ShowSubscriptionsStatement
showSubscriptionsStatement) {
+ public ShowSubscriptionsTask(final ShowSubscriptionsStatement
showSubscriptionsStatement) {
this.showSubscriptionsStatement = showSubscriptionsStatement;
}
+ public ShowSubscriptionsTask(final ShowSubscriptions showSubscriptions) {
+ this.showSubscriptionsStatement = new ShowSubscriptionsStatement();
+
this.showSubscriptionsStatement.setTopicName(showSubscriptions.getTopicName());
+ }
+
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.showSubscriptions(showSubscriptionsStatement);
}
public static void buildTSBlock(
- List<TShowSubscriptionInfo> subscriptionInfoList,
SettableFuture<ConfigTaskResult> future) {
+ final List<TShowSubscriptionInfo> subscriptionInfoList,
+ final SettableFuture<ConfigTaskResult> future) {
final TsBlockBuilder builder =
new TsBlockBuilder(
ColumnHeaderConstant.showSubscriptionColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList()));
- for (TShowSubscriptionInfo tSubscriptionInfo : subscriptionInfoList) {
+ for (final TShowSubscriptionInfo tSubscriptionInfo : subscriptionInfoList)
{
builder.getTimeColumnBuilder().writeLong(0L);
builder
.getColumnBuilder(0)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
index e12172034bf..658ff5035e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowTopicsTask.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -42,18 +43,23 @@ public class ShowTopicsTask implements IConfigTask {
private final ShowTopicsStatement showTopicsStatement;
- public ShowTopicsTask(ShowTopicsStatement showTopicsStatement) {
+ public ShowTopicsTask(final ShowTopicsStatement showTopicsStatement) {
this.showTopicsStatement = showTopicsStatement;
}
+ public ShowTopicsTask(final ShowTopics showTopics) {
+ this.showTopicsStatement = new ShowTopicsStatement();
+ this.showTopicsStatement.setTopicName(showTopics.getTopicName());
+ }
+
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.showTopics(showTopicsStatement);
}
public static void buildTSBlock(
- List<TShowTopicInfo> topicInfoList, SettableFuture<ConfigTaskResult>
future) {
+ final List<TShowTopicInfo> topicInfoList, final
SettableFuture<ConfigTaskResult> future) {
final TsBlockBuilder builder =
new TsBlockBuilder(
ColumnHeaderConstant.showTopicColumnHeaders.stream()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 928354dee10..ade7aacbb1e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -58,6 +58,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateOrUpdateDev
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression;
@@ -69,6 +70,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
@@ -117,7 +119,9 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleGroupBy;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SingleColumn;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem;
@@ -3101,6 +3105,26 @@ public class StatementAnalyzer {
protected Scope visitShowPipePlugins(ShowPipePlugins node, Optional<Scope>
context) {
return createAndAssignScope(node, context);
}
+
+ @Override
+ protected Scope visitCreateTopic(CreateTopic node, Optional<Scope>
context) {
+ return createAndAssignScope(node, context);
+ }
+
+ @Override
+ protected Scope visitDropTopic(DropTopic node, Optional<Scope> context) {
+ return createAndAssignScope(node, context);
+ }
+
+ @Override
+ protected Scope visitShowTopics(ShowTopics node, Optional<Scope> context) {
+ return createAndAssignScope(node, context);
+ }
+
+ @Override
+ protected Scope visitShowSubscriptions(ShowSubscriptions node,
Optional<Scope> context) {
+ return createAndAssignScope(node, context);
+ }
}
private static boolean hasScopeAsLocalParent(Scope root, Scope parent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 66bcd7a43e2..1a2e3b9f3a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -561,6 +561,22 @@ public abstract class AstVisitor<R, C> {
return visitStatement(node, context);
}
+ protected R visitCreateTopic(CreateTopic node, C context) {
+ return visitStatement(node, context);
+ }
+
+ protected R visitDropTopic(DropTopic node, C context) {
+ return visitStatement(node, context);
+ }
+
+ protected R visitShowTopics(ShowTopics node, C context) {
+ return visitStatement(node, context);
+ }
+
+ protected R visitShowSubscriptions(ShowSubscriptions node, C context) {
+ return visitStatement(node, context);
+ }
+
protected R visitShowVersion(ShowVersion node, C context) {
return visitStatement(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreateTopic.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreateTopic.java
new file mode 100644
index 00000000000..57801700291
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreateTopic.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.Map;
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CreateTopic extends SubscriptionStatement {
+
+ private final String topicName;
+ private final boolean ifNotExistsCondition;
+ private final Map<String, String> topicAttributes;
+
+ public CreateTopic(
+ final String topicName,
+ final boolean ifNotExistsCondition,
+ final Map<String, String> topicAttributes) {
+ this.topicName = requireNonNull(topicName, "topic name can not be null");
+ this.ifNotExistsCondition = ifNotExistsCondition;
+ this.topicAttributes = requireNonNull(topicAttributes, "topic attributes
can not be null");
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public boolean hasIfNotExistsCondition() {
+ return ifNotExistsCondition;
+ }
+
+ public Map<String, String> getTopicAttributes() {
+ return topicAttributes;
+ }
+
+ @Override
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+ return visitor.visitCreateTopic(this, context);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicAttributes, ifNotExistsCondition,
topicAttributes);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final CreateTopic that = (CreateTopic) obj;
+ return Objects.equals(this.topicName, that.topicName)
+ && Objects.equals(this.ifNotExistsCondition, that.ifNotExistsCondition)
+ && Objects.equals(this.topicAttributes, that.topicAttributes);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("pipeName", topicName)
+ .add("ifNotExistsCondition", ifNotExistsCondition)
+ .add("topicAttributes", topicAttributes)
+ .toString();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropTopic.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropTopic.java
new file mode 100644
index 00000000000..0ec50fde79a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DropTopic.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class DropTopic extends SubscriptionStatement {
+
+ private final String topicName;
+ private final boolean ifExistsCondition;
+
+ public DropTopic(final String topicName, final boolean ifExistsCondition) {
+ this.topicName = requireNonNull(topicName, "topic name can not be null");
+ this.ifExistsCondition = ifExistsCondition;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public boolean hasIfExistsCondition() {
+ return ifExistsCondition;
+ }
+
+ @Override
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+ return visitor.visitDropTopic(this, context);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicName, ifExistsCondition);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final DropTopic that = (DropTopic) obj;
+ return Objects.equals(this.topicName, that.topicName)
+ && Objects.equals(this.ifExistsCondition, that.ifExistsCondition);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("topicName", topicName)
+ .add("ifExistsCondition", ifExistsCondition)
+ .toString();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowSubscriptions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowSubscriptions.java
new file mode 100644
index 00000000000..c16a9836d89
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowSubscriptions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class ShowSubscriptions extends SubscriptionStatement {
+
+ private final String topicName;
+
+ public ShowSubscriptions(@Nullable final String topicName) {
+ this.topicName = topicName;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ @Override
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+ return visitor.visitShowSubscriptions(this, context);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicName);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final ShowSubscriptions that = (ShowSubscriptions) obj;
+ return Objects.equals(this.topicName, that.topicName);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("topicName", topicName).toString();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowTopics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowTopics.java
new file mode 100644
index 00000000000..f12e6f2a0d2
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ShowTopics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class ShowTopics extends SubscriptionStatement {
+
+ private final String topicName;
+
+ public ShowTopics(@Nullable final String topicName) {
+ this.topicName = topicName;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ @Override
+ public <R, C> R accept(final AstVisitor<R, C> visitor, final C context) {
+ return visitor.visitShowTopics(this, context);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicName);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final ShowTopics that = (ShowTopics) obj;
+ return Objects.equals(this.topicName, that.topicName);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("topicName", topicName).toString();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SubscriptionStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SubscriptionStatement.java
new file mode 100644
index 00000000000..077ab4be887
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SubscriptionStatement.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+public abstract class SubscriptionStatement extends Statement {
+
+ protected SubscriptionStatement() {
+ super(null);
+ }
+
+ @Override
+ public List<? extends Node> getChildren() {
+ return ImmutableList.of();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 3a11a476168..2408ff12907 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentDatabase;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentTime;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentUser;
@@ -65,6 +66,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropIndex;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
@@ -136,7 +138,9 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowIndex;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowRegions;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression;
@@ -666,13 +670,13 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
private Map<String, String> parseExtractorAttributesClause(
List<RelationalSqlParser.ExtractorAttributeClauseContext> contexts) {
- final Map<String, String> collectorMap = new HashMap<>();
+ final Map<String, String> extractorMap = new HashMap<>();
for (RelationalSqlParser.ExtractorAttributeClauseContext context :
contexts) {
- collectorMap.put(
+ extractorMap.put(
((StringLiteral) visit(context.extractorKey)).getValue(),
((StringLiteral) visit(context.extractorValue)).getValue());
}
- return collectorMap;
+ return extractorMap;
}
private Map<String, String> parseProcessorAttributesClause(
@@ -814,6 +818,53 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
return new ShowPipePlugins();
}
+ @Override
+ public Node
visitCreateTopicStatement(RelationalSqlParser.CreateTopicStatementContext ctx) {
+ final String topicName = ((Identifier) visit(ctx.identifier())).getValue();
+ final boolean hasIfNotExistsCondition =
+ ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null;
+
+ final Map<String, String> topicAttributes =
+ ctx.topicAttributesClause() != null
+ ?
parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause())
+ : new HashMap<>(); // DO NOT USE Collections.emptyMap() here
+
+ return new CreateTopic(topicName, hasIfNotExistsCondition,
topicAttributes);
+ }
+
+ private Map<String, String> parseTopicAttributesClause(
+ List<RelationalSqlParser.TopicAttributeClauseContext> contexts) {
+ final Map<String, String> tppicMap = new HashMap<>();
+ for (RelationalSqlParser.TopicAttributeClauseContext context : contexts) {
+ tppicMap.put(
+ ((StringLiteral) visit(context.topicKey)).getValue(),
+ ((StringLiteral) visit(context.topicValue)).getValue());
+ }
+ return tppicMap;
+ }
+
+ @Override
+ public Node
visitDropTopicStatement(RelationalSqlParser.DropTopicStatementContext ctx) {
+ final String topicName = ((Identifier) visit(ctx.identifier())).getValue();
+ final boolean hasIfExistsCondition = ctx.IF() != null && ctx.EXISTS() !=
null;
+ return new DropTopic(topicName, hasIfExistsCondition);
+ }
+
+ @Override
+ public Node
visitShowTopicsStatement(RelationalSqlParser.ShowTopicsStatementContext ctx) {
+ final String topicName =
+
getIdentifierIfPresent(ctx.identifier()).map(Identifier::getValue).orElse(null);
+ return new ShowTopics(topicName);
+ }
+
+ @Override
+ public Node visitShowSubscriptionsStatement(
+ RelationalSqlParser.ShowSubscriptionsStatementContext ctx) {
+ final String topicName =
+
getIdentifierIfPresent(ctx.identifier()).map(Identifier::getValue).orElse(null);
+ return new ShowSubscriptions(topicName);
+ }
+
@Override
public Node visitShowDevicesStatement(final
RelationalSqlParser.ShowDevicesStatementContext ctx) {
return new ShowDevice(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
index 4db8b41cf2e..7c0fefc8c6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
@@ -37,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipe;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropPipePlugin;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTopic;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
@@ -75,7 +77,9 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowFunctions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipePlugins;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowPipes;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowSubscriptions;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTopics;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVariables;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowVersion;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SingleColumn;
@@ -96,6 +100,7 @@ import
com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
@@ -1055,6 +1060,8 @@ public final class SqlFormatter {
protected Void visitShowPipes(ShowPipes node, Integer context) {
builder.append("SHOW PIPES");
+ // TODO: consider pipeName and hasWhereClause in node
+
return null;
}
@@ -1096,6 +1103,69 @@ public final class SqlFormatter {
return null;
}
+ @Override
+ protected Void visitCreateTopic(CreateTopic node, Integer context) {
+ builder.append("CREATE TOPIC ");
+ if (node.hasIfNotExistsCondition()) {
+ builder.append("IF NOT EXISTS ");
+ }
+ builder.append(node.getTopicName());
+ builder.append(" \n");
+
+ if (!node.getTopicAttributes().isEmpty()) {
+ builder
+ .append("WITH (")
+ .append("\n")
+ .append(
+ node.getTopicAttributes().entrySet().stream()
+ .map(
+ entry ->
+ indentString(1)
+ + "\""
+ + entry.getKey()
+ + "\" = \""
+ + entry.getValue()
+ + "\"")
+ .collect(joining(", " + "\n")))
+ .append(")\n");
+ }
+
+ return null;
+ }
+
+ @Override
+ protected Void visitDropTopic(DropTopic node, Integer context) {
+ builder.append("DROP TOPIC ");
+ if (node.hasIfExistsCondition()) {
+ builder.append("IF EXISTS ");
+ }
+ builder.append(node.getTopicName());
+
+ return null;
+ }
+
+ @Override
+ protected Void visitShowTopics(ShowTopics node, Integer context) {
+ if (Objects.isNull(node.getTopicName())) {
+ builder.append("SHOW TOPICS");
+ } else {
+ builder.append("SHOW TOPIC ").append(node.getTopicName());
+ }
+
+ return null;
+ }
+
+ @Override
+ protected Void visitShowSubscriptions(ShowSubscriptions node, Integer
context) {
+ if (Objects.isNull(node.getTopicName())) {
+ builder.append("SHOW SUBSCRIPTIONS");
+ } else {
+ builder.append("SHOW SUBSCRIPTIONS ON ").append(node.getTopicName());
+ }
+
+ return null;
+ }
+
private void appendBeginLabel(Optional<Identifier> label) {
label.ifPresent(value -> builder.append(formatName(value)).append(": "));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 4f49a5909e7..296fd1137ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -182,7 +182,7 @@ public enum StatementType {
CREATE_TOPIC,
DROP_TOPIC,
SHOW_TOPICS,
-
SHOW_SUBSCRIPTIONS,
+
SET_CONFIGURATION
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
index 36525b1846e..495a0167cbb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.List;
public class DropTopicStatement extends Statement implements IConfigStatement {
+
private String topicName;
private boolean ifExistsCondition;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 3e409a9160e..f93fbb074f4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -186,16 +186,23 @@ public class TopicMeta {
extractorAttributes.put("source", "iotdb-source");
extractorAttributes.put("inclusion", "data.insert");
extractorAttributes.put("inclusion.exclusion", "data.delete");
- // path
- extractorAttributes.putAll(config.getAttributesWithPathOrPattern());
+ // sql dialect
+ extractorAttributes.putAll(config.getAttributeWithSqlDialect());
+ if (config.isTableTopic()) {
+ // table model: database name and table name
+
extractorAttributes.putAll(config.getAttributesWithSourceDatabaseAndTableName());
+ } else {
+ // tree model: path or pattern
+
extractorAttributes.putAll(config.getAttributesWithSourcePathOrPattern());
+ }
// time
- extractorAttributes.putAll(config.getAttributesWithTimeRange());
+ extractorAttributes.putAll(config.getAttributesWithSourceTimeRange());
// realtime mode
- extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
+ extractorAttributes.putAll(config.getAttributesWithSourceRealtimeMode());
// source mode
extractorAttributes.putAll(config.getAttributesWithSourceMode());
- // loose range
- extractorAttributes.putAll(config.getAttributesWithSourceLooseRange());
+ // loose range or strict
+
extractorAttributes.putAll(config.getAttributesWithSourceLooseRangeOrStrict());
return extractorAttributes;
}
diff --git
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index 5aa8f80fe7a..a0011275212 100644
---
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -85,6 +85,12 @@ statement
| dropPipePluginStatement
| showPipePluginsStatement
+ // Subscription Statement
+ | createTopicStatement
+ | dropTopicStatement
+ | showTopicsStatement
+ | showSubscriptionsStatement
+
// Show Statement
| showDevicesStatement
| countDevicesStatement
@@ -366,6 +372,31 @@ showPipePluginsStatement
;
+// -------------------------------------------- Subscription Statement
---------------------------------------------------------
+createTopicStatement
+ : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause?
+ ;
+
+topicAttributesClause
+ : WITH '(' topicAttributeClause (',' topicAttributeClause)* ')'
+ ;
+
+topicAttributeClause
+ : topicKey=string EQ topicValue=string
+ ;
+
+dropTopicStatement
+ : DROP TOPIC (IF EXISTS)? topicName=identifier
+ ;
+
+showTopicsStatement
+ : SHOW ((TOPIC topicName=identifier) | TOPICS )
+ ;
+
+showSubscriptionsStatement
+ : SHOW SUBSCRIPTIONS (ON topicName=identifier)?
+ ;
+
// -------------------------------------------- Show Statement
---------------------------------------------------------
showDevicesStatement
@@ -914,8 +945,8 @@ nonReserved
| QUERIES | QUERY | QUOTES
| RANGE | READ | READONLY | REFRESH | REGION | REGIONID | REGIONS | RENAME
| REPAIR | REPEAT | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN
| RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROW | ROWS | RUNNING
| SERIESSLOTID | SCALAR | SCHEMA | SCHEMAS | SECOND | SECURITY | SEEK |
SERIALIZABLE | SESSION | SET | SETS
- | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSET | SUBSTRING
| SYSTEM
- | TABLES | TABLESAMPLE | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION
| TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TRAILING | TRANSACTION | TRUNCATE
| TRY_CAST | TYPE
+ | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSCRIPTIONS |
SUBSET | SUBSTRING | SYSTEM
+ | TABLES | TABLESAMPLE | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION
| TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TOPIC | TOPICS | TRAILING |
TRANSACTION | TRUNCATE | TRY_CAST | TYPE
| UNBOUNDED | UNCOMMITTED | UNCONDITIONAL | UNIQUE | UNKNOWN | UNMATCHED |
UNTIL | UPDATE | URI | USE | USED | USER | UTF16 | UTF32 | UTF8
| VALIDATE | VALUE | VARIABLES | VARIATION | VERBOSE | VERSION | VIEW
| WEEK | WHILE | WINDOW | WITHIN | WITHOUT | WORK | WRAPPER | WRITE
@@ -1224,6 +1255,7 @@ SOURCE: 'SOURCE';
START: 'START';
STATS: 'STATS';
STOP: 'STOP';
+SUBSCRIPTIONS: 'SUBSCRIPTIONS';
SUBSET: 'SUBSET';
SUBSTRING: 'SUBSTRING';
SYSTEM: 'SYSTEM';
@@ -1242,6 +1274,8 @@ TIMESERIES: 'TIMESERIES';
TIMESLOTID: 'TIMESLOTID';
TIMESTAMP: 'TIMESTAMP';
TO: 'TO';
+TOPIC: 'TOPIC';
+TOPICS: 'TOPICS';
TRAILING: 'TRAILING';
TRANSACTION: 'TRANSACTION';
TRIM: 'TRIM';