This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/autoCreateTable in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6983cae5420209c7451d70729b1b907b0d0499c1 Author: JackieTien97 <[email protected]> AuthorDate: Wed Jul 3 10:43:48 2024 +0800 Support auto create table --- .../execution/config/TableConfigTaskVisitor.java | 35 +--------- .../fetcher/TableHeaderSchemaValidator.java | 78 +++++++++++++++++++++- 2 files changed, 78 insertions(+), 35 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index f1dc7ba7403..39cebe3007f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -20,9 +20,6 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; import org.apache.iotdb.commons.schema.table.TsTable; -import org.apache.iotdb.commons.schema.table.column.AttributeColumnSchema; -import org.apache.iotdb.commons.schema.table.column.IdColumnSchema; -import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.IClientSession; @@ -36,6 +33,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowTablesTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.UseDBTask; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableHeaderSchemaValidator; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; @@ -54,7 +52,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use; import org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException; -import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import java.util.HashMap; @@ -66,7 +63,6 @@ import java.util.Set; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toTypeSignature; -import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryContext> { @@ -150,34 +146,7 @@ public class TableConfigTaskVisitor extends AstVisitor<IConfigTask, MPPQueryCont String.format("Columns in table shall not share the same name %s.", columnName)); } TSDataType dataType = getDataType(columnDefinition.getType()); - switch (category) { - case ID: - if (!TSDataType.STRING.equals(dataType)) { - throw new SemanticException( - "DataType of ID Column should only be STRING, current is " + dataType); - } - table.addColumnSchema(new IdColumnSchema(columnName, dataType)); - break; - case ATTRIBUTE: - if (!TSDataType.STRING.equals(dataType)) { - throw new SemanticException( - "DataType of ATTRIBUTE Column should only be STRING, current is " + dataType); - } - table.addColumnSchema(new AttributeColumnSchema(columnName, dataType)); - break; - case TIME: - break; - case MEASUREMENT: - table.addColumnSchema( - new MeasurementColumnSchema( - columnName, - dataType, - getDefaultEncoding(dataType), - TSFileDescriptor.getInstance().getConfig().getCompressor())); - break; - default: - throw new IllegalArgumentException(); - } + TableHeaderSchemaValidator.generateColumnSchema(table, category, columnName, dataType); } return new CreateTableTask(table, database, node.isIfNotExists()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java index 4a2e93384a6..3588d670298 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java @@ -21,6 +21,10 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.AttributeColumnSchema; +import org.apache.iotdb.commons.schema.table.column.IdColumnSchema; +import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.db.exception.metadata.table.TableNotExistsException; import org.apache.iotdb.db.exception.sql.SemanticException; @@ -28,12 +32,15 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.AlterTableAddColumnTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateTableTask; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.type.TypeFactory; import org.apache.tsfile.read.common.type.UnknownType; import org.slf4j.Logger; @@ -43,6 +50,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + public class TableHeaderSchemaValidator { private static final Logger LOGGER = LoggerFactory.getLogger(TableHeaderSchemaValidator.class); @@ -125,8 +135,12 @@ public class TableHeaderSchemaValidator { // auto create missing table or columns if (table == null) { - autoCreateTable(database, tableSchema, context); + autoCreateTable(database, tableSchema); table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); + if (table == null) { + throw new IllegalStateException( + "auto create table succeed, but cannot get table schema in current node's DataNodeTableCache, may be caused by concurrently auto creating table"); + } } else if (inputColumnList == null) { // do nothing } else { @@ -147,10 +161,70 @@ public class TableHeaderSchemaValidator { return new TableSchema(tableSchema.getTableName(), resultColumnList); } - private void autoCreateTable(String database, TableSchema tableSchema, MPPQueryContext context) { + private void autoCreateTable(String database, TableSchema tableSchema) { + TsTable tsTable = new TsTable(tableSchema.getTableName()); + addColumnSchema(tableSchema.getColumns(), tsTable); + CreateTableTask createTableTask = new CreateTableTask(tsTable, database, true); + try { + ListenableFuture<ConfigTaskResult> future = createTableTask.execute(configTaskExecutor); + ConfigTaskResult result = future.get(); + if (result.getStatusCode().getStatusCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new RuntimeException( + new IoTDBException( + "Auto create table column failed.", result.getStatusCode().getStatusCode())); + } + } catch (ExecutionException | InterruptedException e) { + LOGGER.warn("Auto create table column failed.", e); + throw new RuntimeException(e); + } throw new SemanticException(new TableNotExistsException(database, tableSchema.getTableName())); } + private void addColumnSchema(List<ColumnSchema> columnSchemas, TsTable tsTable) { + for (ColumnSchema columnSchema : columnSchemas) { + TsTableColumnCategory category = columnSchema.getColumnCategory(); + String columnName = columnSchema.getName(); + if (tsTable.getColumnSchema(columnName) != null) { + throw new SemanticException( + String.format("Columns in table shall not share the same name %s.", columnName)); + } + TSDataType dataType = getTSDataType(columnSchema.getType()); + generateColumnSchema(tsTable, category, columnName, dataType); + } + } + + public static void generateColumnSchema( + TsTable tsTable, TsTableColumnCategory category, String columnName, TSDataType dataType) { + switch (category) { + case ID: + if (!TSDataType.STRING.equals(dataType)) { + throw new SemanticException( + "DataType of ID Column should only be STRING, current is " + dataType); + } + tsTable.addColumnSchema(new IdColumnSchema(columnName, dataType)); + break; + case ATTRIBUTE: + if (!TSDataType.STRING.equals(dataType)) { + throw new SemanticException( + "DataType of ATTRIBUTE Column should only be STRING, current is " + dataType); + } + tsTable.addColumnSchema(new AttributeColumnSchema(columnName, dataType)); + break; + case TIME: + break; + case MEASUREMENT: + tsTable.addColumnSchema( + new MeasurementColumnSchema( + columnName, + dataType, + getDefaultEncoding(dataType), + TSFileDescriptor.getInstance().getConfig().getCompressor())); + break; + default: + throw new IllegalArgumentException(); + } + } + private void autoCreateColumn( String database, String tableName,
