This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new c149aad8acb [Table-Model] Implement db management
c149aad8acb is described below
commit c149aad8acb463d8544dba927466c24ab72a31c7
Author: Marcos_Zyk <[email protected]>
AuthorDate: Mon Apr 8 15:37:05 2024 +0800
[Table-Model] Implement db management
---
.../common/header/ColumnHeaderConstant.java | 7 ++
.../common/header/DatasetHeaderFactory.java | 4 +
.../iotdb/db/queryengine/plan/Coordinator.java | 2 -
.../execution/config/TableConfigTaskVisitor.java | 14 +++-
.../config/executor/ClusterConfigTaskExecutor.java | 97 ++++++++++++++++++++--
.../config/metadata/relational/CreateDBTask.java | 42 ++++++++++
.../config/metadata/relational/DropDBTask.java | 42 ++++++++++
.../config/metadata/relational/ShowDBTask.java | 82 ++++++++++++++++++
8 files changed, 279 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
index 6a98af5281d..059a4a218b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
@@ -489,4 +489,11 @@ public class ColumnHeaderConstant {
public static final List<ColumnHeader> showCurrentTimestampColumnHeaders =
ImmutableList.of(new ColumnHeader(CURRENT_TIMESTAMP, TSDataType.INT64));
+
+ public static final List<ColumnHeader> showDBColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(DATABASE, TSDataType.TEXT),
+ new ColumnHeader(SCHEMA_REPLICATION_FACTOR, TSDataType.INT32),
+ new ColumnHeader(DATA_REPLICATION_FACTOR, TSDataType.INT32),
+ new ColumnHeader(TIME_PARTITION_INTERVAL, TSDataType.INT64));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
index 78250b05bcb..ae684cec09e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java
@@ -204,4 +204,8 @@ public class DatasetHeaderFactory {
public static DatasetHeader getShowCurrentTimestampHeader() {
return new
DatasetHeader(ColumnHeaderConstant.showCurrentTimestampColumnHeaders, true);
}
+
+ public static DatasetHeader getShowDBHeader() {
+ return new DatasetHeader(ColumnHeaderConstant.showDBColumnHeaders, true);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 6ab15a5a3a7..8501ee60a6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
@@ -267,7 +266,6 @@ public class Coordinator {
|| statement instanceof DescribeTable
|| statement instanceof ShowTables
|| statement instanceof DropTable) {
- queryContext.setQueryType(QueryType.WRITE);
return new ConfigExecution(
queryContext,
null,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 6061bb63d87..65c045b5dd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -21,7 +21,11 @@ package
org.apache.iotdb.db.queryengine.plan.execution.config;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateTableTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.DropDBTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowDBTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.UseDBTask;
import org.apache.iotdb.db.relational.sql.tree.AstVisitor;
import org.apache.iotdb.db.relational.sql.tree.CreateDB;
@@ -51,22 +55,26 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
@Override
protected IConfigTask visitCreateDB(CreateDB node, MPPQueryContext context) {
- return super.visitCreateDB(node, context);
+ context.setQueryType(QueryType.WRITE);
+ return new CreateDBTask(node);
}
@Override
protected IConfigTask visitUse(Use node, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
return new UseDBTask(node, clientSession);
}
@Override
protected IConfigTask visitDropDB(DropDB node, MPPQueryContext context) {
- return super.visitDropDB(node, context);
+ context.setQueryType(QueryType.WRITE);
+ return new DropDBTask(node);
}
@Override
protected IConfigTask visitShowDB(ShowDB node, MPPQueryContext context) {
- return super.visitShowDB(node, context);
+ context.setQueryType(QueryType.READ);
+ return new ShowDBTask(node);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 24fc52de6bd..aa2da4fce12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -142,6 +142,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowRegion
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowTTLTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowTriggersTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowVariablesTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowDBTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.template.ShowNodesInSchemaTemplateTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
@@ -253,7 +254,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE;
+import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_RESULT_NODES;
import static
org.apache.iotdb.db.protocol.client.ConfigNodeClient.MSG_RECONNECTION_FAIL;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT;
+import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR_CHAR;
public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
@@ -2677,26 +2682,106 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> showDatabases(ShowDB showDB) {
- return null;
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ // Construct request using statement
+ List<String> databasePathPattern = Arrays.asList(ALL_RESULT_NODES);
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // Send request to some API server
+ TGetDatabaseReq req = new TGetDatabaseReq(databasePathPattern,
ALL_MATCH_SCOPE.serialize());
+ TShowDatabaseResp resp = client.showDatabase(req);
+ // build TSBlock
+ ShowDBTask.buildTSBlock(resp.getDatabaseInfoMap(), future);
+ } catch (IOException | ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
}
@Override
public SettableFuture<ConfigTaskResult> useDatabase(Use useDB,
IClientSession clientSession) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- // TODO check whether the database exists
- clientSession.setDatabaseName(useDB.getDatabase().getValue());
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ // Construct request using statement
+ List<String> databasePathPattern = Arrays.asList(ROOT,
useDB.getDatabase().getValue());
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // Send request to some API server
+ TGetDatabaseReq req = new TGetDatabaseReq(databasePathPattern,
ALL_MATCH_SCOPE.serialize());
+ TShowDatabaseResp resp = client.showDatabase(req);
+ if (!resp.getDatabaseInfoMap().isEmpty()) {
+ clientSession.setDatabaseName(useDB.getDatabase().getValue());
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(
+ new IoTDBException(
+ String.format("Database %s doesn't exists.",
useDB.getDatabase().getValue()),
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()));
+ }
+ } catch (IOException | ClientManagerException | TException e) {
+ future.setException(e);
+ }
return future;
}
+ private String transformDBName(String dbName) {
+ return ROOT + PATH_SEPARATOR_CHAR + dbName;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> dropDatabase(DropDB dropDB) {
- return null;
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TDeleteDatabasesReq req =
+ new TDeleteDatabasesReq(
+
Collections.singletonList(transformDBName(dropDB.getDbName().getValue())));
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TSStatus tsStatus = client.deleteDatabases(req);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ LOGGER.warn(
+ "Failed to execute delete database {} in config node, status is
{}.",
+ dropDB.getDbName().getValue(),
+ tsStatus);
+ future.setException(new IoTDBException(tsStatus.message,
tsStatus.getCode()));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
}
@Override
public SettableFuture<ConfigTaskResult> createDatabase(CreateDB createDB) {
- return null;
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ // Construct request using statement
+ TDatabaseSchema databaseSchema = new TDatabaseSchema();
+ databaseSchema.setName(ROOT + PATH_SEPARATOR_CHAR + createDB.getDbName());
+ try (ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // Send request to some API server
+ TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema);
+ // Get response or throw exception
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ // If database already exists when loading, we do not throw exceptions
to avoid printing too
+ // many logs
+ if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
tsStatus.getCode()
+ && createDB.isSetIfNotExists()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ LOGGER.warn(
+ "Failed to execute create database {} in config node, status is
{}.",
+ createDB.getDbName(),
+ tsStatus);
+ future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
+ }
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/CreateDBTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/CreateDBTask.java
new file mode 100644
index 00000000000..f835d9496d1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/CreateDBTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational;
+
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.relational.sql.tree.CreateDB;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class CreateDBTask implements IConfigTask {
+
+ private final CreateDB node;
+
+ public CreateDBTask(CreateDB node) {
+ this.node = node;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.createDatabase(node);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/DropDBTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/DropDBTask.java
new file mode 100644
index 00000000000..05345cbc106
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/DropDBTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational;
+
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.relational.sql.tree.DropDB;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DropDBTask implements IConfigTask {
+
+ private final DropDB node;
+
+ public DropDBTask(DropDB node) {
+ this.node = node;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.dropDatabase(node);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/ShowDBTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/ShowDBTask.java
new file mode 100644
index 00000000000..38afd62b15c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/ShowDBTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational;
+
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseInfo;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.relational.sql.tree.ShowDB;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ShowDBTask implements IConfigTask {
+
+ private final ShowDB node;
+
+ public ShowDBTask(ShowDB node) {
+ this.node = node;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.showDatabases(node);
+ }
+
+ public static void buildTSBlock(
+ Map<String, TDatabaseInfo> storageGroupInfoMap,
SettableFuture<ConfigTaskResult> future) {
+
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.showDBColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+
+ TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ for (Map.Entry<String, TDatabaseInfo> entry :
storageGroupInfoMap.entrySet()) {
+ String dbName = entry.getKey().substring(5);
+ TDatabaseInfo storageGroupInfo = entry.getValue();
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getColumnBuilder(0).writeBinary(new Binary(dbName,
TSFileConfig.STRING_CHARSET));
+
+
builder.getColumnBuilder(1).writeInt(storageGroupInfo.getSchemaReplicationFactor());
+
builder.getColumnBuilder(2).writeInt(storageGroupInfo.getDataReplicationFactor());
+
builder.getColumnBuilder(3).writeLong(storageGroupInfo.getTimePartitionInterval());
+ builder.declarePosition();
+ }
+
+ DatasetHeader datasetHeader = DatasetHeaderFactory.getShowDBHeader();
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS,
builder.build(), datasetHeader));
+ }
+}