This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a18f763267caa1059e68055d195a49161620216e Author: shuwenwei <[email protected]> AuthorDate: Tue Feb 3 16:47:57 2026 +0800 add statistics --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++ .../execution/fragment/DataNodeQueryContext.java | 30 +++++ .../InformationSchemaContentSupplierFactory.java | 140 +++++++++++++++++---- .../plan/planner/TableOperatorGenerator.java | 10 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 15 +++ .../plan/planner/plan/node/PlanNodeType.java | 4 + .../distribute/TableDistributedPlanGenerator.java | 62 ++++++++- .../node/InformationSchemaTableScanNode.java | 11 +- ...leDiskUsageInformationSchemaTableScanNode.java} | 92 +++++++------- .../DataNodeLocationSupplierFactory.java | 33 +++++ .../dataregion/utils/DiskUsageStatisticUtil.java | 25 +++- .../tableDiskUsageCache/TableDiskUsageCache.java | 26 ++++ .../planner/node/source/SourceNodeSerdeTest.java | 54 ++++++++ .../commons/schema/table/InformationSchema.java | 3 +- 14 files changed, 426 insertions(+), 89 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c78ae21c066..f6661deb8b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1219,6 +1219,8 @@ public class IoTDBConfig { private long maxObjectSizeInByte = 4 * 1024 * 1024 * 1024L; + private int maxSubTaskNumForInformationTableScan = 4; + IoTDBConfig() {} public int getMaxLogEntriesNumPerBatch() { @@ -4367,4 +4369,12 @@ public class IoTDBConfig { public void setMaxObjectSizeInByte(long maxObjectSizeInByte) { this.maxObjectSizeInByte = maxObjectSizeInByte; } + + public int getMaxSubTaskNumForInformationTableScan() { + return maxSubTaskNumForInformationTableScan; + } + + public void setMaxSubTaskNumForInformationTableScan(int maxSubTaskNumForInformationTableScan) { + this.maxSubTaskNumForInformationTableScan = maxSubTaskNumForInformationTableScan; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java index 814fcc7df63..259afd8814e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java @@ -19,17 +19,24 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.confignode.rpc.thrift.TTableInfo; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; +import org.apache.thrift.TException; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.Pair; import javax.annotation.concurrent.GuardedBy; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -59,6 +66,10 @@ public class DataNodeQueryContext { private final AtomicInteger dataNodeFINum; + // Used for TableModel information table_disk_usage scan + @GuardedBy("lock") + private Map<String, List<TTableInfo>> databaseTableInfoMap; + // TODO consider more fine-grained locks, now the AtomicInteger in uncachedPathToSeriesScanInfo is // unnecessary private final ReentrantLock lock = new ReentrantLock(); @@ -77,6 +88,25 @@ public class DataNodeQueryContext { uncachedPathToSeriesScanInfo.put(path, new Pair<>(dataNodeSeriesScanNum, null)); } + public Map<String, List<TTableInfo>> getDatabaseTableInfoMap() + throws ClientManagerException, TException { + if (databaseTableInfoMap != null) { + return databaseTableInfoMap; + } + lock.lock(); + if (databaseTableInfoMap != null) { + lock.unlock(); + return databaseTableInfoMap; + } + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + this.databaseTableInfoMap = client.showTables4InformationSchema().getDatabaseTableInfoMap(); + } finally { + lock.unlock(); + } + return databaseTableInfoMap; + } + public void decreaseDeviceAndMayUpdateLastCache( QualifiedObjectName tableName, DeviceEntry deviceEntry, Integer initialCount) { checkArgument(initialCount != null, "initialCount shouldn't be null here"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index 5b486713e8f..d39143c173c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -44,7 +44,6 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; -import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo4InformationSchema; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo4InformationSchema; @@ -78,7 +77,10 @@ import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter; import org.apache.iotdb.db.queryengine.plan.relational.function.TableBuiltinTableFunction; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; @@ -93,6 +95,7 @@ import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.Ta import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TimePartitionTableSizeQueryContext; import org.apache.iotdb.db.utils.MathUtils; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -113,6 +116,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -152,12 +156,11 @@ public class InformationSchemaContentSupplierFactory { public static IInformationSchemaContentSupplier getSupplier( final OperatorContext context, - final String tableName, final List<TSDataType> dataTypes, - final Expression predicate, final UserEntity userEntity, - final Filter pushDownFilter, - final PaginationController paginationController) { + final InformationSchemaTableScanNode node, + final Filter pushDownFilter) { + String tableName = node.getQualifiedObjectName().getObjectName(); try { switch (tableName) { case InformationSchema.QUERIES: @@ -194,11 +197,16 @@ public class InformationSchemaContentSupplierFactory { return new DataNodesSupplier(dataTypes, userEntity); case InformationSchema.TABLE_DISK_USAGE: return new TableDiskUsageSupplier( - dataTypes, userEntity, pushDownFilter, paginationController, context); + dataTypes, + userEntity, + pushDownFilter, + new PaginationController(node.getPushDownLimit(), node.getPushDownOffset()), + context, + ((TableDiskUsageInformationSchemaTableScanNode) node).getRegions()); case InformationSchema.CONNECTIONS: return new ConnectionsSupplier(dataTypes, userEntity); case InformationSchema.CURRENT_QUERIES: - return new CurrentQueriesSupplier(dataTypes, predicate, userEntity); + return new CurrentQueriesSupplier(dataTypes, node.getPushDownPredicate(), userEntity); case InformationSchema.QUERIES_COSTS_HISTOGRAM: return new QueriesCostsHistogramSupplier(dataTypes, userEntity); case InformationSchema.SERVICES: @@ -1192,6 +1200,7 @@ public class InformationSchemaContentSupplierFactory { private final Map<String, List<TTableInfo>> databaseTableInfoMap; private final Filter pushDownFilter; private final PaginationController paginationController; + private final OperatorContext operatorContext; private DataRegion currentDataRegion; private boolean currentDatabaseOnlyHasOneTable; @@ -1201,24 +1210,30 @@ public class InformationSchemaContentSupplierFactory { private final StorageEngineTimePartitionIterator dataRegionIterator; + private long prepareCacheReaderCostInNS = 0; + private long loadObjectFileCostInNS = 0; + private long prepareCachedTsFileIDCostInNS = 0; + private long checkAllFilesInTsFileManagerCostInNS = 0; + private long readTsFileCacheValueFilesCostInNS = 0; + private TableDiskUsageSupplier( final List<TSDataType> dataTypes, final UserEntity userEntity, final Filter pushDownFilter, final PaginationController paginationController, - final OperatorContext operatorContext) + final OperatorContext operatorContext, + final List<Integer> regionsForCurrentSubTask) throws TException, ClientManagerException { this.dataTypes = dataTypes; this.pushDownFilter = pushDownFilter; this.paginationController = paginationController; + this.operatorContext = operatorContext; AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity); - try (final ConfigNodeClient client = - ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - // It is better to use an async ConfigNode client here. - // Using a synchronous client may block the calling thread when the ConfigNode response is - // slow or temporarily unavailable, which can cause the operator to exceed its maxRunTime - this.databaseTableInfoMap = client.showTables4InformationSchema().getDatabaseTableInfoMap(); - } + this.databaseTableInfoMap = + operatorContext.getInstanceContext().getDataNodeQueryContext().getDatabaseTableInfoMap(); + Set<Integer> regions = new HashSet<>(regionsForCurrentSubTask); + operatorContext.recordSpecifiedInfo( + PlanGraphPrinter.REGIONS_OF_CURRENT_SUB_TASK, regionsForCurrentSubTask.toString()); this.dataRegionIterator = new StorageEngineTimePartitionIterator( Optional.of( @@ -1228,16 +1243,27 @@ public class InformationSchemaContentSupplierFactory { if (tTableInfos == null || tTableInfos.isEmpty()) { return false; } + if (!regions.contains(dataRegion.getDataRegionId().getId())) { + return false; + } currentDataRegionTableSizeQueryContext = new DataRegionTableSizeQueryContext( false, operatorContext.getInstanceContext()); - return PathUtils.isTableModelDatabase(dataRegion.getDatabaseName()); + return true; }), Optional.empty()); } @Override public boolean hasNext() { + boolean result = hasNextInternal(); + if (!result) { + updateSpecifiedInfo(); + } + return result; + } + + public boolean hasNextInternal() { if (currentDataRegionCacheReader != null) { return true; } @@ -1272,6 +1298,39 @@ public class InformationSchemaContentSupplierFactory { return false; } + private void updateSpecifiedInfo() { + if (operatorContext + .getSpecifiedInfo() + .containsKey(PlanGraphPrinter.PREPARE_CACHE_READER_COST)) { + return; + } + operatorContext.recordSpecifiedInfo( + PlanGraphPrinter.PREPARE_CACHE_READER_COST, + TimeUnit.NANOSECONDS.toMillis(prepareCacheReaderCostInNS) + + IoTDBConstant.SPACE + + RpcUtils.MILLISECOND); + operatorContext.recordSpecifiedInfo( + PlanGraphPrinter.LOAD_OBJECT_FILE_COST, + TimeUnit.NANOSECONDS.toMillis(loadObjectFileCostInNS) + + IoTDBConstant.SPACE + + RpcUtils.MILLISECOND); + operatorContext.recordSpecifiedInfo( + PlanGraphPrinter.PREPARE_CACHED_TSFILE_ID_COST, + TimeUnit.NANOSECONDS.toMillis(prepareCachedTsFileIDCostInNS) + + IoTDBConstant.SPACE + + RpcUtils.MILLISECOND); + operatorContext.recordSpecifiedInfo( + PlanGraphPrinter.CHECK_ALL_FILES_IN_TSFILE_MANAGER_COST, + TimeUnit.NANOSECONDS.toMillis(checkAllFilesInTsFileManagerCostInNS) + + IoTDBConstant.SPACE + + RpcUtils.MILLISECOND); + operatorContext.recordSpecifiedInfo( + PlanGraphPrinter.READ_TSFILE_CACHE_VALUE_FILES_COST, + TimeUnit.NANOSECONDS.toMillis(readTsFileCacheValueFilesCostInNS) + + IoTDBConstant.SPACE + + RpcUtils.MILLISECOND); + } + private Map<String, Long> getTablesToScan(DataRegion dataRegion, long timePartition) { String databaseName = dataRegion.getDatabaseName(); List<TTableInfo> tTableInfos = databaseTableInfoMap.get(databaseName); @@ -1319,28 +1378,55 @@ public class InformationSchemaContentSupplierFactory { long maxRuntime = OperatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long start = System.nanoTime(); + long prevStageEndTime; try { - if (!currentDataRegionCacheReader.prepareCacheReader(start, maxRuntime)) { - return null; + try { + if (!currentDataRegionCacheReader.prepareCacheReader(start, maxRuntime)) { + return null; + } + } finally { + prevStageEndTime = System.nanoTime(); + prepareCacheReaderCostInNS += prevStageEndTime - start; } - if (!currentDataRegionCacheReader.loadObjectFileTableSizeCache(start, maxRuntime)) { - return null; + try { + if (!currentDataRegionCacheReader.loadObjectFileTableSizeCache(start, maxRuntime)) { + return null; + } + } finally { + prevStageEndTime = System.nanoTime(); + loadObjectFileCostInNS += prevStageEndTime - start; } - if (!currentDataRegionCacheReader.prepareCachedTsFileIDKeys(start, maxRuntime)) { - return null; + try { + if (!currentDataRegionCacheReader.prepareCachedTsFileIDKeys(start, maxRuntime)) { + return null; + } + } finally { + prevStageEndTime = System.nanoTime(); + prepareCachedTsFileIDCostInNS += prevStageEndTime - start; } - if (!currentDataRegionCacheReader.checkAllFilesInTsFileManager(start, maxRuntime)) { - return null; + try { + if (!currentDataRegionCacheReader.checkAllFilesInTsFileManager(start, maxRuntime)) { + return null; + } + } finally { + prevStageEndTime = System.nanoTime(); + checkAllFilesInTsFileManagerCostInNS += prevStageEndTime - start; } - if (!currentDataRegionCacheReader.readCacheValueFilesAndUpdateResultMap( - start, maxRuntime)) { - return null; + try { + if (!currentDataRegionCacheReader.readCacheValueFilesAndUpdateResultMap( + start, maxRuntime)) { + return null; + } + } finally { + prevStageEndTime = System.nanoTime(); + readTsFileCacheValueFilesCostInNS += prevStageEndTime - start; } + return buildTsBlock(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 021c123ff66..a04dc8cfcdb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -274,7 +274,6 @@ import org.apache.tsfile.read.common.type.ObjectType; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.read.common.type.TypeFactory; import org.apache.tsfile.read.filter.basic.Filter; -import org.apache.tsfile.read.reader.series.PaginationController; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; @@ -1320,9 +1319,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution context.getZoneId(), TimestampPrecisionUtils.currPrecision); } - PaginationController paginationController = - new PaginationController(node.getPushDownLimit(), node.getPushDownOffset()); - final List<TSDataType> dataTypes = node.getOutputSymbols().stream() .map(symbol -> getTSDataType(context.getTypeProvider().getTableModelType(symbol))) @@ -1333,16 +1329,14 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution node.getPlanNodeId(), getSupplier( operatorContext, - node.getQualifiedObjectName().getObjectName(), dataTypes, - node.getPushDownPredicate(), context .getDriverContext() .getFragmentInstanceContext() .getSessionInfo() .getUserEntity(), - pushDownFilter, - paginationController)); + node, + pushDownFilter)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 2a3e3a6769c..a402da55d75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -85,6 +85,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinct import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; @@ -128,6 +129,13 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter public static final String CURRENT_USED_MEMORY = "CurrentUsedMemory"; public static final String MAX_USED_MEMORY = "MaxUsedMemory"; public static final String MAX_RESERVED_MEMORY = "MaxReservedMemory"; + public static final String REGIONS_OF_CURRENT_SUB_TASK = "RegionsOfCurrentSubTask"; + public static final String PREPARE_CACHE_READER_COST = "PrepareCacheReaderCost"; + public static final String LOAD_OBJECT_FILE_COST = "LoadObjectFileCost"; + public static final String PREPARE_CACHED_TSFILE_ID_COST = "PrepareCachedTsFileIdCost"; + public static final String CHECK_ALL_FILES_IN_TSFILE_MANAGER_COST = + "CheckAllFilesInTSFileManagerCost"; + public static final String READ_TSFILE_CACHE_VALUE_FILES_COST = "ReadTsFileCacheValueFilesCost"; @Override public List<String> visitPlan(PlanNode node, GraphContext context) { @@ -690,6 +698,13 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter String.format( "MeasurementToColumnName: %s", treeDeviceViewScanNode.getMeasurementColumnNameMap())); } + if (node instanceof TableDiskUsageInformationSchemaTableScanNode) { + boxValue.add( + String.format( + "%s: %s", + REGIONS_OF_CURRENT_SUB_TASK, + ((TableDiskUsageInformationSchemaTableScanNode) node).getRegions())); + } return render(node, boxValue, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 5cdf679da68..fa1d02f2d3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -132,6 +132,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinct import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; @@ -321,6 +322,7 @@ public enum PlanNodeType { TABLE_UNION_NODE((short) 1034), TABLE_INTERSECT_NODE((short) 1035), TABLE_EXCEPT_NODE((short) 1036), + TABLE_DISK_USAGE_INFORMATION_SCHEMA_TABLE_SCAN_NODE((short) 1037), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), @@ -723,6 +725,8 @@ public enum PlanNodeType { return IntersectNode.deserialize(buffer); case 1036: return ExceptNode.deserialize(buffer); + case 1037: + return TableDiskUsageInformationSchemaTableScanNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index e13d09de4a5..4b7de7c50c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -26,9 +26,12 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.SchemaPartition; +import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -79,6 +82,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; @@ -1003,15 +1007,23 @@ public class TableDistributedPlanGenerator @Override public List<PlanNode> visitInformationSchemaTableScan( InformationSchemaTableScanNode node, PlanContext context) { + final String tableName = node.getQualifiedObjectName().getObjectName(); List<TDataNodeLocation> dataNodeLocations = - dataNodeLocationSupplier.getDataNodeLocations( - node.getQualifiedObjectName().getObjectName()); + dataNodeLocationSupplier.getDataNodeLocations(tableName); if (dataNodeLocations.isEmpty()) { throw new IoTDBRuntimeException( "No available dataNodes, may be the cluster is closing", TSStatusCode.NO_AVAILABLE_REPLICA.getStatusCode()); } + if (InformationSchema.TABLE_DISK_USAGE.equals(tableName)) { + return buildTableDiskUsageScanNodes(node, dataNodeLocations); + } + return buildNormalInformationSchemaScanNodes(node, dataNodeLocations); + } + + private List<PlanNode> buildNormalInformationSchemaScanNodes( + InformationSchemaTableScanNode node, List<TDataNodeLocation> dataNodeLocations) { List<PlanNode> resultTableScanNodeList = new ArrayList<>(); dataNodeLocations.forEach( dataNodeLocation -> @@ -1028,6 +1040,52 @@ public class TableDistributedPlanGenerator return resultTableScanNodeList; } + private List<PlanNode> buildTableDiskUsageScanNodes( + InformationSchemaTableScanNode node, List<TDataNodeLocation> dataNodeLocations) { + final int maxSubTaskNum = + Math.max( + 1, IoTDBDescriptor.getInstance().getConfig().getMaxSubTaskNumForInformationTableScan()); + final Map<Integer, List<TRegionInfo>> regionsByDataNode = + DataNodeLocationSupplierFactory.getReadableRegionsForTableDiskUsageInformationSchemaTable(); + final List<PlanNode> result = new ArrayList<>(dataNodeLocations.size() * maxSubTaskNum); + for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { + final List<TRegionInfo> regionInfos = regionsByDataNode.get(dataNodeLocation.getDataNodeId()); + if (regionInfos == null || regionInfos.isEmpty()) { + continue; + } + List<List<Integer>> subTaskRegions = splitRegionsBySubTask(regionInfos, maxSubTaskNum); + for (List<Integer> regionIds : subTaskRegions) { + result.add( + new TableDiskUsageInformationSchemaTableScanNode( + queryId.genPlanNodeId(), + node.getQualifiedObjectName(), + node.getOutputSymbols(), + node.getAssignments(), + node.getPushDownPredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + new TRegionReplicaSet(null, ImmutableList.of(dataNodeLocation)), + regionIds)); + } + } + return result; + } + + private List<List<Integer>> splitRegionsBySubTask( + List<TRegionInfo> regionInfos, int maxSubTaskNum) { + int subTaskNum = Math.min(maxSubTaskNum, regionInfos.size()); + final List<List<Integer>> result = new ArrayList<>(subTaskNum); + for (int i = 0; i < subTaskNum; i++) { + result.add(new ArrayList<>()); + } + for (int i = 0; i < regionInfos.size(); i++) { + int groupIndex = i % subTaskNum; + result.get(groupIndex).add(regionInfos.get(i).getConsensusGroupId().getId()); + } + + return result; + } + @Override public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context) { List<Symbol> preGroupedSymbols = node.getPreGroupedSymbols(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/InformationSchemaTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/InformationSchemaTableScanNode.java index 08a11adbbdb..cca7dd4df0f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/InformationSchemaTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/InformationSchemaTableScanNode.java @@ -81,7 +81,7 @@ public class InformationSchemaTableScanNode extends TableScanNode { this.regionReplicaSet = regionReplicaSet; } - private InformationSchemaTableScanNode() {} + protected InformationSchemaTableScanNode() {} @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { @@ -101,16 +101,21 @@ public class InformationSchemaTableScanNode extends TableScanNode { regionReplicaSet); } + @Override + public PlanNodeType getType() { + return PlanNodeType.INFORMATION_SCHEMA_TABLE_SCAN_NODE; + } + @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.INFORMATION_SCHEMA_TABLE_SCAN_NODE.serialize(byteBuffer); + getType().serialize(byteBuffer); TableScanNode.serializeMemberVariables(this, byteBuffer, true); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.INFORMATION_SCHEMA_TABLE_SCAN_NODE.serialize(stream); + getType().serialize(stream); TableScanNode.serializeMemberVariables(this, stream, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/InformationSchemaTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableDiskUsageInformationSchemaTableScanNode.java similarity index 61% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/InformationSchemaTableScanNode.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableDiskUsageInformationSchemaTableScanNode.java index 08a11adbbdb..a135da76c62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/InformationSchemaTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableDiskUsageInformationSchemaTableScanNode.java @@ -16,52 +16,32 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.tsfile.utils.ReadWriteIOUtils; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; -public class InformationSchemaTableScanNode extends TableScanNode { - public InformationSchemaTableScanNode( - PlanNodeId id, - QualifiedObjectName qualifiedObjectName, - List<Symbol> outputSymbols, - Map<Symbol, ColumnSchema> assignments) { - super(id, qualifiedObjectName, outputSymbols, assignments); - } +public class TableDiskUsageInformationSchemaTableScanNode extends InformationSchemaTableScanNode { - public InformationSchemaTableScanNode( - PlanNodeId id, - QualifiedObjectName qualifiedObjectName, - List<Symbol> outputSymbols, - Map<Symbol, ColumnSchema> assignments, - Expression pushDownPredicate, - long pushDownLimit, - long pushDownOffset) { - super( - id, - qualifiedObjectName, - outputSymbols, - assignments, - pushDownPredicate, - pushDownLimit, - pushDownOffset); - } + private List<Integer> regions; - public InformationSchemaTableScanNode( + public TableDiskUsageInformationSchemaTableScanNode( PlanNodeId id, QualifiedObjectName qualifiedObjectName, List<Symbol> outputSymbols, @@ -69,7 +49,8 @@ public class InformationSchemaTableScanNode extends TableScanNode { Expression pushDownPredicate, long pushDownLimit, long pushDownOffset, - TRegionReplicaSet regionReplicaSet) { + TRegionReplicaSet regionReplicaSet, + List<Integer> regions) { super( id, qualifiedObjectName, @@ -77,20 +58,31 @@ public class InformationSchemaTableScanNode extends TableScanNode { assignments, pushDownPredicate, pushDownLimit, - pushDownOffset); - this.regionReplicaSet = regionReplicaSet; + pushDownOffset, + regionReplicaSet); + this.regions = regions; } - private InformationSchemaTableScanNode() {} + private TableDiskUsageInformationSchemaTableScanNode() { + super(); + } @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitInformationSchemaTableScan(this, context); + public PlanNodeType getType() { + return PlanNodeType.TABLE_DISK_USAGE_INFORMATION_SCHEMA_TABLE_SCAN_NODE; + } + + public void setRegions(List<Integer> regions) { + this.regions = regions; + } + + public List<Integer> getRegions() { + return regions; } @Override public PlanNode clone() { - return new InformationSchemaTableScanNode( + return new TableDiskUsageInformationSchemaTableScanNode( id, qualifiedObjectName, outputSymbols, @@ -98,32 +90,40 @@ public class InformationSchemaTableScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset, - regionReplicaSet); + regionReplicaSet, + regions); } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.INFORMATION_SCHEMA_TABLE_SCAN_NODE.serialize(byteBuffer); - - TableScanNode.serializeMemberVariables(this, byteBuffer, true); + super.serializeAttributes(byteBuffer); + ReadWriteIOUtils.write(regions.size(), byteBuffer); + for (Integer region : regions) { + ReadWriteIOUtils.write(region, byteBuffer); + } } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.INFORMATION_SCHEMA_TABLE_SCAN_NODE.serialize(stream); - - TableScanNode.serializeMemberVariables(this, stream, true); + super.serializeAttributes(stream); + ReadWriteIOUtils.write(regions.size(), stream); + for (Integer region : regions) { + ReadWriteIOUtils.write(region, stream); + } } public static InformationSchemaTableScanNode deserialize(ByteBuffer byteBuffer) { - InformationSchemaTableScanNode node = new InformationSchemaTableScanNode(); + TableDiskUsageInformationSchemaTableScanNode node = + new TableDiskUsageInformationSchemaTableScanNode(); TableScanNode.deserializeMemberVariables(byteBuffer, node, true); + int length = ReadWriteIOUtils.readInt(byteBuffer); + List<Integer> regions = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + regions.add(ReadWriteIOUtils.readInt(byteBuffer)); + } node.setPlanNodeId(PlanNodeId.deserialize(byteBuffer)); + node.regions = regions; return node; } - - public String toString() { - return "InformationSchemaTableScanNode-" + this.getPlanNodeId(); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java index 722d430444c..a0632017091 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java @@ -19,11 +19,15 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -32,8 +36,11 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.iotdb.rpc.TSStatusCode.QUERY_PROCESS_ERROR; @@ -49,6 +56,32 @@ public class DataNodeLocationSupplierFactory { List<TDataNodeLocation> getDataNodeLocations(String table); } + public static Map<Integer, List<TRegionInfo>> + getReadableRegionsForTableDiskUsageInformationSchemaTable() { + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TShowRegionReq req = new TShowRegionReq(); + req.setIsTableModel(true); + req.setConsensusGroupType(TConsensusGroupType.DataRegion); + TShowRegionResp tShowRegionResp = client.showRegion(req); + if (tShowRegionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException( + "An error occurred when executing getReadableDataRegions():" + + tShowRegionResp.getStatus().getMessage(), + QUERY_PROCESS_ERROR.getStatusCode()); + } + Map<Integer, List<TRegionInfo>> map = new HashMap<>(); + for (TRegionInfo tRegionInfo : tShowRegionResp.getRegionInfoList()) { + map.computeIfAbsent(tRegionInfo.getDataNodeId(), k -> new ArrayList<>()).add(tRegionInfo); + } + return map; + } catch (final ClientManagerException | TException e) { + throw new IoTDBRuntimeException( + "An error occurred when executing getReadableDataNodeLocations():" + e.getMessage(), + QUERY_PROCESS_ERROR.getStatusCode()); + } + } + /** DataNode in these states is readable: Running, ReadOnly, Removing */ public static List<TDataNodeLocation> getReadableDataNodeLocations() { try (final ConfigNodeClient client = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java index 02f7339d56b..4f8073941cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java @@ -56,18 +56,26 @@ public abstract class DiskUsageStatisticUtil implements Closeable { protected final long timePartition; protected final Iterator<TsFileResource> iterator; protected final LongConsumer timeSeriesMetadataIoSizeRecorder; + protected final LongConsumer timeSeriesMetadataCountRecorder; public DiskUsageStatisticUtil( TsFileManager tsFileManager, long timePartition, - Optional<FragmentInstanceContext> operatorContext) { + Optional<FragmentInstanceContext> fragmentInstanceContext) { this.timePartition = timePartition; this.timeSeriesMetadataIoSizeRecorder = - operatorContext + fragmentInstanceContext .<LongConsumer>map( context -> context.getQueryStatistics().getLoadTimeSeriesMetadataActualIOSize()::addAndGet) .orElse(null); + this.timeSeriesMetadataCountRecorder = + fragmentInstanceContext + .<LongConsumer>map( + context -> + context.getQueryStatistics().getLoadTimeSeriesMetadataFromDiskCount() + ::addAndGet) + .orElse(null); List<TsFileResource> seqResources = tsFileManager.getTsFileListSnapshot(timePartition, true); List<TsFileResource> unseqResources = tsFileManager.getTsFileListSnapshot(timePartition, false); List<TsFileResource> resources = @@ -88,6 +96,9 @@ public abstract class DiskUsageStatisticUtil implements Closeable { this.resourcesWithReadLock = new LinkedList<>(); try { for (TsFileResource resource : resources) { + if (!resource.isClosed()) { + continue; + } resource.readLock(); if (resource.isDeleted() || !resource.isClosed()) { resource.readUnlock(); @@ -159,6 +170,11 @@ public abstract class DiskUsageStatisticUtil implements Closeable { .getChunkMetadataListByTimeseriesMetadataOffset( timeseriesMetadataOffsetPair.getLeft(), timeseriesMetadataOffsetPair.getRight()) .get(0); + if (timeSeriesMetadataCountRecorder != null) { + timeSeriesMetadataIoSizeRecorder.accept( + timeseriesMetadataOffsetPair.getRight() - timeseriesMetadataOffsetPair.getLeft()); + timeSeriesMetadataCountRecorder.accept(1); + } return new Offsets( firstChunkMetadata.getOffsetOfChunkHeader() - chunkGroupHeaderSize, timeseriesMetadataOffsetPair.getLeft(), @@ -175,6 +191,11 @@ public abstract class DiskUsageStatisticUtil implements Closeable { minTimeseriesMetadataOffset == 0 ? entry.getValue().getRight().getLeft() : minTimeseriesMetadataOffset; + if (timeSeriesMetadataIoSizeRecorder != null) { + timeSeriesMetadataIoSizeRecorder.accept( + entry.getValue().getRight().getRight() - entry.getValue().getRight().getLeft()); + timeSeriesMetadataCountRecorder.accept(1); + } for (IChunkMetadata chunkMetadata : entry.getValue().getLeft()) { minChunkOffset = Math.min(minChunkOffset, chunkMetadata.getOffsetOfChunkHeader()); break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java index 9e8795b1594..000a3f0c355 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java @@ -19,10 +19,15 @@ package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.confignode.rpc.thrift.TTableInfo; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader; @@ -30,12 +35,14 @@ import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.ob import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableDiskUsageCacheWriter; import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader; +import org.apache.thrift.TException; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.concurrent.BlockingQueue; @@ -45,6 +52,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class TableDiskUsageCache { + protected static Map<String, List<TTableInfo>> databaseTableInfoMap; protected static final Logger LOGGER = LoggerFactory.getLogger(TableDiskUsageCache.class); protected final BlockingQueue<Operation> queue = new LinkedBlockingQueue<>(1000); // regionId -> writer mapping @@ -136,8 +144,26 @@ public class TableDiskUsageCache { } } + public static Map<String, List<TTableInfo>> getDatabaseTableInfoMap() + throws TException, ClientManagerException { + Map<String, List<TTableInfo>> result = databaseTableInfoMap; + if (result == null) { + synchronized (TableDiskUsageCache.class) { + if (databaseTableInfoMap == null) { + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + databaseTableInfoMap = client.showTables4InformationSchema().getDatabaseTableInfoMap(); + } + result = databaseTableInfoMap; + } + } + } + return result; + } + public void write(String database, TsFileID tsFileID, Map<String, Long> tableSizeMap) { if (tableSizeMap == null || tableSizeMap.isEmpty()) { + // tree model return; } addOperationToQueue(new WriteOperation(database, tsFileID, tableSizeMap)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java index b61f4c24cd8..6093184331d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java @@ -19,20 +19,32 @@ package org.apache.iotdb.db.queryengine.plan.planner.node.source; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelper; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowDiskUsageNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableDiskUsageInformationSchemaTableScanNode; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.type.IntType; +import org.apache.tsfile.read.common.type.LongType; +import org.apache.tsfile.read.common.type.StringType; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -102,4 +114,46 @@ public class SourceNodeSerdeTest { byteBuffer.flip(); assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); } + + @Test + public void testTableDiskUsageInformationTableScanNode() throws IllegalPathException { + List<Symbol> symbols = Arrays.asList(new Symbol("database"), new Symbol("size_in_bytes")); + Map<Symbol, ColumnSchema> assignments = new HashMap<>(); + assignments.put( + new Symbol("database"), + new ColumnSchema("database", StringType.getInstance(), false, TsTableColumnCategory.FIELD)); + assignments.put( + new Symbol("table_name"), + new ColumnSchema( + "table_name", StringType.getInstance(), false, TsTableColumnCategory.FIELD)); + assignments.put( + new Symbol("datanode_id"), + new ColumnSchema("datanode_id", IntType.getInstance(), false, TsTableColumnCategory.FIELD)); + assignments.put( + new Symbol("region_id"), + new ColumnSchema("region_id", IntType.getInstance(), false, TsTableColumnCategory.FIELD)); + assignments.put( + new Symbol("time_partition"), + new ColumnSchema( + "time_partition", LongType.getInstance(), false, TsTableColumnCategory.FIELD)); + assignments.put( + new Symbol("size_in_bytes"), + new ColumnSchema( + "size_in_bytes", LongType.getInstance(), false, TsTableColumnCategory.FIELD)); + TableDiskUsageInformationSchemaTableScanNode node = + new TableDiskUsageInformationSchemaTableScanNode( + new PlanNodeId("test"), + new QualifiedObjectName("test", "table1"), + symbols, + assignments, + null, + 1, + 1, + new TRegionReplicaSet(), + Arrays.asList(1)); + ByteBuffer byteBuffer = ByteBuffer.allocate(2048); + node.serialize(byteBuffer); + byteBuffer.flip(); + assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index 5957ea888fa..34675b628ce 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -358,7 +358,8 @@ public class InformationSchema { final TsTable tableDiskUsageTable = new TsTable(TABLE_DISK_USAGE); tableDiskUsageTable.addColumnSchema( - new FieldColumnSchema(ColumnHeaderConstant.DATABASE, TSDataType.STRING)); + new FieldColumnSchema( + ColumnHeaderConstant.DATABASE.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); tableDiskUsageTable.addColumnSchema( new FieldColumnSchema(ColumnHeaderConstant.TABLE_NAME_TABLE_MODEL, TSDataType.STRING)); tableDiskUsageTable.addColumnSchema(
