This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/FixDropDB in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b4ae33a9b846cedbc7e8226d20018eae9c0d6b0c Author: JackieTien97 <[email protected]> AuthorDate: Thu Jul 4 16:39:34 2024 +0800 Change bug of validateTableHeaderSchema --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../manager/schema/ClusterSchemaManager.java | 4 +- .../impl/schema/table/AddTableColumnProcedure.java | 13 ++- .../state/schema/AddTableColumnState.java | 4 +- .../impl/DataNodeInternalRPCServiceImpl.java | 7 ++ .../cache/schema/dualkeycache/IDualKeyCache.java | 13 +++ .../schema/dualkeycache/impl/DualKeyCacheImpl.java | 37 +++++++ .../relational/AlterTableAddColumnTask.java | 3 + .../plan/relational/metadata/Metadata.java | 21 +++- .../relational/metadata/TableMetadataImpl.java | 2 +- .../metadata/fetcher/TableDeviceSchemaFetcher.java | 2 +- .../fetcher/TableDeviceSchemaValidator.java | 9 +- .../fetcher/TableHeaderSchemaValidator.java | 113 +++++++++------------ .../metadata/fetcher/cache/TableDeviceId.java | 12 +-- .../fetcher/cache/TableDeviceSchemaCache.java | 11 +- .../relational/metadata/fetcher/cache/TableId.java | 16 +-- .../db/schemaengine/table/DataNodeTableCache.java | 13 +++ .../iotdb/db/schemaengine/table/ITableCache.java | 5 + .../plan/relational/analyzer/TestMatadata.java | 2 +- .../apache/iotdb/commons/schema/table/TsTable.java | 90 ++++++++++++---- .../table/column/TsTableColumnSchemaUtil.java | 2 +- 21 files changed, 261 insertions(+), 120 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index e57365b33ea..95123228234 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -102,6 +102,8 @@ public enum TSStatusCode { DISK_SPACE_INSUFFICIENT(611), OVERSIZE_TTL(612), TTL_CONFIG_ERROR(613), + COLUMN_TYPE_MISMATCH(614), + COLUMN_CATEGORY_MISMATCH(615), // Query Engine SQL_PARSE_ERROR(700), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java index e7787c8ad80..7b65d63896c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java @@ -1139,7 +1139,9 @@ public class ClusterSchemaManager { List<TsTableColumnSchema> copiedList = new ArrayList<>(); for (TsTableColumnSchema columnSchema : columnSchemaList) { - if (targetTable.getColumnSchema(columnSchema.getColumnName()) == null) { + TsTableColumnSchema existingColumnSchema = + targetTable.getColumnSchema(columnSchema.getColumnName()); + if (existingColumnSchema == null) { copiedList.add(columnSchema); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AddTableColumnProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AddTableColumnProcedure.java index 2bc01bd842b..fc7c18c6561 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AddTableColumnProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AddTableColumnProcedure.java @@ -94,14 +94,15 @@ public class AddTableColumnProcedure LOGGER.info("Pre release info of table {}.{} when adding column", database, tableName); preRelease(env); break; - case COMMIT_RELEASE: - LOGGER.info("Commit release info of table {}.{} when adding column", database, tableName); - commitRelease(env); - break; case ADD_COLUMN: LOGGER.info("Add column to table {}.{}", database, tableName); addColumn(env); + break; + case COMMIT_RELEASE: + LOGGER.info("Commit release info of table {}.{} when adding column", database, tableName); + commitRelease(env); return Flow.NO_MORE_STATE; + default: setFailure(new ProcedureException("Unrecognized AddTableColumnState " + state)); return Flow.NO_MORE_STATE; @@ -154,7 +155,7 @@ public class AddTableColumnProcedure return; } } - setNextState(AddTableColumnState.COMMIT_RELEASE); + setNextState(AddTableColumnState.ADD_COLUMN); } private void commitRelease(ConfigNodeProcedureEnv env) { @@ -187,6 +188,8 @@ public class AddTableColumnProcedure .addTableColumn(database, tableName, inputColumnList); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode()))); + } else { + setNextState(AddTableColumnState.ADD_COLUMN); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AddTableColumnState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AddTableColumnState.java index 3d4456e2de2..cdcea88aa3c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AddTableColumnState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/AddTableColumnState.java @@ -22,6 +22,6 @@ package org.apache.iotdb.confignode.procedure.state.schema; public enum AddTableColumnState { COLUMN_CHECK, PRE_RELEASE, - COMMIT_RELEASE, - ADD_COLUMN + ADD_COLUMN, + COMMIT_RELEASE } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 1fa74be7a23..eeca1e43d5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -136,6 +136,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; @@ -509,14 +510,20 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) { + DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION); DataNodeSchemaCache.getInstance().takeWriteLock(); try { // req.getFullPath() is a database path DataNodeSchemaCache.getInstance().invalidate(req.getFullPath()); ClusterTemplateManager.getInstance().invalid(req.getFullPath()); + // clear table related cache + String database = req.getFullPath().substring(5); + DataNodeTableCache.getInstance().invalid(database); + TableDeviceSchemaFetcher.getInstance().getTableDeviceCache().invalidate(database); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } finally { DataNodeSchemaCache.getInstance().releaseWriteLock(); + DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java index 95f0b0e8f00..83c38617461 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java @@ -95,4 +95,17 @@ public interface IDualKeyCache<FK, SK, V> { @TestOnly void evictOneEntry(); + + /** remove all entries for firstKey */ + @GuardedBy("DataNodeSchemaCache#writeLock") + void invalidate(FK firstKey); + + /** + * remove all entries of specified database, and for the reason that table model's first key is + * different from tree model, we add this new method which only be used for table model. + * + * <p>FK must be TableId in such case + */ + @GuardedBy("DataNodeSchemaCache#writeLock") + void invalidateForTable(String database); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java index d3a385467f8..b462b1fbe0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.ID import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheStats; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCacheUpdating; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.DataNodeLastCacheManager; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId; import java.util.ArrayList; import java.util.Arrays; @@ -386,6 +387,42 @@ class DualKeyCacheImpl<FK, SK, V, T extends ICacheEntry<SK, V>> cacheStats.decreaseMemoryUsage(evictOneCacheEntry()); } + @Override + public void invalidate(FK firstKey) { + int estimateSize = 0; + ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.remove(firstKey); + if (cacheEntryGroup != null) { + estimateSize += sizeComputer.computeFirstKeySize(firstKey); + for (Iterator<Map.Entry<SK, T>> it = cacheEntryGroup.getAllCacheEntries(); it.hasNext(); ) { + Map.Entry<SK, T> entry = it.next(); + estimateSize += sizeComputer.computeSecondKeySize(entry.getKey()); + estimateSize += sizeComputer.computeValueSize(entry.getValue().getValue()); + cacheEntryManager.invalid(entry.getValue()); + } + cacheStats.decreaseMemoryUsage(estimateSize); + } + } + + @Override + public void invalidateForTable(String database) { + int estimateSize = 0; + for (FK firstKey : firstKeyMap.getAllKeys()) { + TableId tableId = (TableId) firstKey; + if (tableId.belongTo(database)) { + estimateSize += sizeComputer.computeFirstKeySize(firstKey); + ICacheEntryGroup<FK, SK, V, T> entryGroup = firstKeyMap.get(firstKey); + for (Iterator<Map.Entry<SK, T>> it = entryGroup.getAllCacheEntries(); it.hasNext(); ) { + Map.Entry<SK, T> entry = it.next(); + estimateSize += sizeComputer.computeSecondKeySize(entry.getKey()); + estimateSize += sizeComputer.computeValueSize(entry.getValue().getValue()); + cacheEntryManager.invalid(entry.getValue()); + } + firstKeyMap.remove(firstKey); + } + } + cacheStats.decreaseMemoryUsage(estimateSize); + } + /** * Since the capacity of one instance of ConcurrentHashMap is about 4 million, a number of * instances are united for more capacity. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterTableAddColumnTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterTableAddColumnTask.java index 17058e6f6c0..726bd0c75db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterTableAddColumnTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/relational/AlterTableAddColumnTask.java @@ -89,6 +89,9 @@ public class AlterTableAddColumnTask implements IConfigTask { getDefaultEncoding(dataType), TSFileDescriptor.getInstance().getConfig().getCompressor())); break; + default: + throw new IllegalStateException( + "Unknown ColumnCategory for adding column: " + inputColumn.getColumnCategory()); } } return columnSchemaList; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java index 1acfdff1dbe..6eba3f8a196 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; import org.apache.iotdb.commons.partition.SchemaPartition; +import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; @@ -81,16 +82,26 @@ public interface Metadata { * * <p>This method return all the existing column schemas in the target table. * - * <p>When table or column is missing, this method will execute auto creation. + * <p>The reason that we need to return all the existing column schemas is that the caller need to + * know all id columns to construct IDeviceID + * + * <p>When table or column is missing, this method will execute auto creation if the user have + * corresponding authority. * * <p>When using SQL, the columnSchemaList could be null and there won't be any validation. * - * <p>When the input dataType or category of one column is null, the column cannot be auto - * created. + * <p>When the input dataType or category of one column is null, the column won't be auto created. * - * <p>If validation failed, a SemanticException will be thrown. + * <p>The caller need to recheck the dataType of measurement columns to decide whether to do + * partial insert + * + * @return If table doesn't exist and the user have no authority to create table, Optional.empty() + * will be returned. The returned table may not include all the columns + * in @param{tableSchema}, if the user have no authority to alter table. + * @throws SemanticException if column category mismatch or data types of id or attribute column + * are not STRING or Category, Type of any missing ColumnSchema is null */ - TableSchema validateTableHeaderSchema( + Optional<TableSchema> validateTableHeaderSchema( String database, TableSchema tableSchema, MPPQueryContext context); /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index b017f8d3d72..45f5708e8ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -312,7 +312,7 @@ public class TableMetadataImpl implements Metadata { } @Override - public TableSchema validateTableHeaderSchema( + public Optional<TableSchema> validateTableHeaderSchema( String database, TableSchema tableSchema, MPPQueryContext context) { return TableHeaderSchemaValidator.getInstance() .validateTableHeaderSchema(database, tableSchema, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java index 75704422210..5ce6001c618 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java @@ -85,7 +85,7 @@ public class TableDeviceSchemaFetcher { return TableDeviceSchemaFetcherHolder.INSTANCE; } - TableDeviceSchemaCache getTableDeviceCache() { + public TableDeviceSchemaCache getTableDeviceCache() { return cache; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java index a5ed52036c3..dd9ccbe4765 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java @@ -99,16 +99,13 @@ public class TableDeviceSchemaValidator { Object[] deviceAttributeValueList = attributeValueList.get(i); for (int j = 0; j < attributeKeyList.size(); j++) { String value = attributeMap.get(attributeKeyList.get(j)); + if (deviceAttributeValueList[j] == null) { + continue; + } if (value == null) { - if (deviceAttributeValueList[j] == null) { - continue; - } result.attributeMissingInCacheDeviceIndexList.add(i); break; } else { - if (deviceAttributeValueList[j] == null) { - continue; - } if (!value.equals(String.valueOf(deviceAttributeValueList[j]))) { result.attributeUpdateDeviceIndexList.add(i); break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java index 3588d670298..a941dc2a272 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java @@ -26,9 +26,10 @@ import org.apache.iotdb.commons.schema.table.column.IdColumnSchema; import org.apache.iotdb.commons.schema.table.column.MeasurementColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; -import org.apache.iotdb.db.exception.metadata.table.TableNotExistsException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager; +import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.AlterTableAddColumnTask; @@ -42,12 +43,12 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.type.TypeFactory; -import org.apache.tsfile.read.common.type.UnknownType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; @@ -72,82 +73,66 @@ public class TableHeaderSchemaValidator { return TableHeaderSchemaValidatorHolder.INSTANCE; } - // This method return all the existing column schemas in the target table. - // When table or column is missing, this method will execute auto creation. - // When using SQL, the columnSchemaList could be null and there won't be any validation. - // All input column schemas will be validated and auto created when necessary. - // When the input dataType or category of one column is null, the column cannot be auto created. - public TableSchema validateTableHeaderSchema( + public Optional<TableSchema> validateTableHeaderSchema( String database, TableSchema tableSchema, MPPQueryContext context) { + // The schema cache R/W and fetch operation must be locked together thus the cache clean + // operation executed by delete timeseries will be effective. + DataNodeSchemaLockManager.getInstance().takeReadLock(SchemaLockType.VALIDATE_VS_DELETION); + context.addAcquiredLockNum(SchemaLockType.VALIDATE_VS_DELETION); + List<ColumnSchema> inputColumnList = tableSchema.getColumns(); + if (inputColumnList == null || inputColumnList.isEmpty()) { + throw new IllegalArgumentException( + "Column List in TableSchema should never be null or empty."); + } TsTable table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); List<ColumnSchema> missingColumnList = new ArrayList<>(); List<ColumnSchema> resultColumnList = new ArrayList<>(); // first round validate, check existing schema if (table == null) { - if (inputColumnList == null) { - throw new SemanticException("Unknown column names. Cannot auto create table."); + // TODO table metadata: authority check for table create + // auto create missing table + // it's ok that many write requests concurrently auto create same table, the thread safety + // will be guaranteed by ProcedureManager.createTable in CN + autoCreateTable(database, tableSchema); + table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); + if (table == null) { + throw new IllegalStateException( + "auto create table succeed, but cannot get table schema in current node's DataNodeTableCache, may be caused by concurrently auto creating table"); } - // check arguments for table auto creation - for (ColumnSchema columnSchema : inputColumnList) { + } + + for (ColumnSchema columnSchema : inputColumnList) { + TsTableColumnSchema existingColumn = table.getColumnSchema(columnSchema.getName()); + if (existingColumn == null) { + // check arguments for column auto creation if (columnSchema.getColumnCategory() == null) { - throw new SemanticException("Unknown column category. Cannot auto create table."); + throw new SemanticException("Unknown column category. Cannot auto create column."); } if (columnSchema.getType() == null) { - throw new IllegalArgumentException("Unknown column data type. Cannot auto create table."); + throw new SemanticException("Unknown column data type. Cannot auto create column."); } missingColumnList.add(columnSchema); - } - } else if (inputColumnList == null) { - // SQL insert without columnName, nothing to check - } else { - for (int i = 0; i < inputColumnList.size(); i++) { - ColumnSchema columnSchema = inputColumnList.get(i); - TsTableColumnSchema existingColumn = table.getColumnSchema(columnSchema.getName()); - if (existingColumn == null) { - // check arguments for column auto creation - if (columnSchema.getColumnCategory() == null) { - throw new IllegalArgumentException( - "Unknown column category. Cannot auto create column."); - } - if (columnSchema.getType() == null) { - throw new IllegalArgumentException( - "Unknown column data type. Cannot auto create column."); - } - missingColumnList.add(columnSchema); - } else { - // check and validate column data type and category - if (!columnSchema.getType().equals(UnknownType.UNKNOWN) - && !TypeFactory.getType(existingColumn.getDataType()) - .equals(columnSchema.getType())) { - throw new SemanticException( - String.format("Wrong data type at column %s.", columnSchema.getName())); - } - if (columnSchema.getColumnCategory() != null - && !existingColumn.getColumnCategory().equals(columnSchema.getColumnCategory())) { - throw new SemanticException( - String.format("Wrong category at column %s.", columnSchema.getName())); - } + } else { + // leave measurement columns' dataType checking to the caller, then the caller can decide + // whether to do partial insert + + // only check column category + if (columnSchema.getColumnCategory() != null + && !existingColumn.getColumnCategory().equals(columnSchema.getColumnCategory())) { + throw new SemanticException( + String.format("Wrong category at column %s.", columnSchema.getName())); } } } - // auto create missing table or columns - if (table == null) { - autoCreateTable(database, tableSchema); - table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); - if (table == null) { - throw new IllegalStateException( - "auto create table succeed, but cannot get table schema in current node's DataNodeTableCache, may be caused by concurrently auto creating table"); - } - } else if (inputColumnList == null) { - // do nothing - } else { - if (!missingColumnList.isEmpty()) { - autoCreateColumn(database, tableSchema.getTableName(), missingColumnList, context); - } + if (!missingColumnList.isEmpty()) { + // TODO table metadata: authority check for table alter + // check id or attribute column data type in this method + autoCreateColumn(database, tableSchema.getTableName(), missingColumnList, context); } + table .getColumnList() .forEach( @@ -158,7 +143,7 @@ public class TableHeaderSchemaValidator { TypeFactory.getType(o.getDataType()), false, o.getColumnCategory()))); - return new TableSchema(tableSchema.getTableName(), resultColumnList); + return Optional.of(new TableSchema(tableSchema.getTableName(), resultColumnList)); } private void autoCreateTable(String database, TableSchema tableSchema) { @@ -173,11 +158,13 @@ public class TableHeaderSchemaValidator { new IoTDBException( "Auto create table column failed.", result.getStatusCode().getStatusCode())); } - } catch (ExecutionException | InterruptedException e) { - LOGGER.warn("Auto create table column failed.", e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + /* Clean up whatever needs to be handled before interrupting */ + Thread.currentThread().interrupt(); throw new RuntimeException(e); } - throw new SemanticException(new TableNotExistsException(database, tableSchema.getTableName())); } private void addColumnSchema(List<ColumnSchema> columnSchemas, TsTable tsTable) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceId.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceId.java index 210e22ad672..66787bd7492 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceId.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceId.java @@ -19,12 +19,16 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache; -import org.apache.iotdb.commons.schema.MemUsageUtil; +import org.apache.tsfile.utils.RamUsageEstimator; import java.util.Arrays; +// The reason that we don't use IDeviceID is TableDeviceId doesn't contain table name in segment 0. public class TableDeviceId { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableDeviceId.class); + private final String[] idValues; public TableDeviceId(String[] idValues) { @@ -40,11 +44,7 @@ public class TableDeviceId { } public int estimateSize() { - int size = 8 + 8 + 8 + 4; // object header + reference + String[] header + String.length - for (String node : idValues) { - size += (int) MemUsageUtil.computeStringMemUsage(node); - } - return size; + return (int) (INSTANCE_SIZE + RamUsageEstimator.sizeOf(idValues)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index f7c09001c08..390c9aa533c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -75,10 +75,19 @@ public class TableDeviceSchemaCache { } } + public void invalidate(String database) { + readWriteLock.writeLock().lock(); + try { + dualKeyCache.invalidateForTable(database); + } finally { + readWriteLock.writeLock().unlock(); + } + } + public void invalidate(String database, String tableName) { readWriteLock.writeLock().lock(); try { - dualKeyCache.invalidateAll(); + dualKeyCache.invalidate(new TableId(database, tableName)); } finally { readWriteLock.writeLock().unlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java index 288ecbe57ac..98e9317efdc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableId.java @@ -19,12 +19,14 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache; -import org.apache.iotdb.commons.schema.MemUsageUtil; +import org.apache.tsfile.utils.RamUsageEstimator; import java.util.Objects; public class TableId { + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableId.class); + private final String database; private final String tableName; @@ -42,13 +44,13 @@ public class TableId { return tableName; } + public boolean belongTo(String database) { + return Objects.equals(this.database, database); + } + public int estimateSize() { - return 8 - + 8 - + 8 - + (int) - (MemUsageUtil.computeStringMemUsage(database) - + MemUsageUtil.computeStringMemUsage(tableName)); + return (int) + (INSTANCE_SIZE + RamUsageEstimator.sizeOf(database) + RamUsageEstimator.sizeOf(tableName)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java index ffe32fd0870..3aa287c8f5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java @@ -38,6 +38,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT; import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; +/** It contains all tables' latest column schema */ public class DataNodeTableCache implements ITableCache { private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeTableCache.class); @@ -210,6 +211,18 @@ public class DataNodeTableCache implements ITableCache { } } + @Override + public void invalid(String database) { + readWriteLock.writeLock().lock(); + try { + databaseTableMap.remove(database); + preCreateTableMap.remove(database); + preAddColumnMap.remove(database); + } finally { + readWriteLock.writeLock().unlock(); + } + } + public TsTable getTable(String database, String tableName) { readWriteLock.readLock().lock(); try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java index fdc1e38fa88..79d28783efc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/ITableCache.java @@ -42,4 +42,9 @@ public interface ITableCache { void rollbackAddColumn( String database, String tableName, List<TsTableColumnSchema> columnSchemaList); + + /** + * @param database shouldn't start with `root.` + */ + void invalid(String database); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java index ae4d4a7315d..f771114ac76 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java @@ -202,7 +202,7 @@ public class TestMatadata implements Metadata { } @Override - public TableSchema validateTableHeaderSchema( + public Optional<TableSchema> validateTableHeaderSchema( String database, TableSchema tableSchema, MPPQueryContext context) { throw new UnsupportedOperationException(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java index 919f96e1708..d8ee689051d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java @@ -27,19 +27,23 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchemaUtil; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteIOUtils; +import javax.annotation.concurrent.ThreadSafe; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +@ThreadSafe public class TsTable { private static final String TIME_COLUMN_NAME = "Time"; @@ -48,8 +52,9 @@ public class TsTable { private final String tableName; - private final Map<String, TsTableColumnSchema> columnSchemaMap = - Collections.synchronizedMap(new LinkedHashMap<>()); + private final Map<String, TsTableColumnSchema> columnSchemaMap = new LinkedHashMap<>(); + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Map<String, String> props = null; @@ -65,45 +70,85 @@ public class TsTable { } public TsTableColumnSchema getColumnSchema(String columnName) { - return columnSchemaMap.get(columnName); + readWriteLock.readLock().lock(); + try { + return columnSchemaMap.get(columnName); + } finally { + readWriteLock.readLock().unlock(); + } } public void addColumnSchema(TsTableColumnSchema columnSchema) { - columnSchemaMap.put(columnSchema.getColumnName(), columnSchema); - if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.ID)) { - idNums++; + readWriteLock.writeLock().lock(); + try { + columnSchemaMap.put(columnSchema.getColumnName(), columnSchema); + if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.ID)) { + idNums++; + } + } finally { + readWriteLock.writeLock().unlock(); } } public void removeColumnSchema(String columnName) { - columnSchemaMap.remove(columnName); + readWriteLock.writeLock().lock(); + try { + TsTableColumnSchema columnSchema = columnSchemaMap.remove(columnName); + if (columnSchema != null + && columnSchema.getColumnCategory().equals(TsTableColumnCategory.ID)) { + idNums--; + } + } finally { + readWriteLock.writeLock().unlock(); + } } public int getColumnNum() { - return columnSchemaMap.size(); + readWriteLock.readLock().lock(); + try { + return columnSchemaMap.size(); + } finally { + readWriteLock.readLock().unlock(); + } } public int getIdNums() { - return idNums; + readWriteLock.readLock().lock(); + try { + return idNums; + } finally { + readWriteLock.readLock().unlock(); + } } public List<TsTableColumnSchema> getColumnList() { - return new ArrayList<>(columnSchemaMap.values()); + readWriteLock.readLock().lock(); + try { + return new ArrayList<>(columnSchemaMap.values()); + } finally { + readWriteLock.readLock().unlock(); + } } public String getPropValue(String propKey) { - return props == null ? null : props.get(propKey); + readWriteLock.readLock().lock(); + try { + return props == null ? null : props.get(propKey); + } finally { + readWriteLock.readLock().unlock(); + } } public void addProp(String key, String value) { - if (props == null) { - synchronized (this) { - if (props == null) { - props = new ConcurrentHashMap<>(); - } + readWriteLock.writeLock().lock(); + try { + if (props == null) { + props = new HashMap<>(); } + props.put(key, value); + } finally { + readWriteLock.writeLock().unlock(); } - props.put(key, value); } public byte[] serialize() { @@ -148,7 +193,12 @@ public class TsTable { } public void setProps(Map<String, String> props) { - this.props = props; + readWriteLock.writeLock().lock(); + try { + this.props = props; + } finally { + readWriteLock.writeLock().unlock(); + } } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java index eac67b2b3b2..30b006a3fbd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchemaUtil.java @@ -106,7 +106,7 @@ public class TsTableColumnSchemaUtil { public static List<TsTableColumnSchema> deserializeColumnSchemaList(ByteBuffer buffer) { int size = ReadWriteIOUtils.readInt(buffer); if (size == -1) { - return null; + throw new IllegalArgumentException("size should not be -1"); } List<TsTableColumnSchema> columnSchemaList = new ArrayList<>(size); for (int i = 0; i < size; i++) {
