This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 524c727c238ebd57140f4cab8f571f2f89ea140d Author: shuwenwei <[email protected]> AuthorDate: Thu Jan 15 17:21:52 2026 +0800 add cache --- .../operator/source/ShowDiskUsageOperator.java | 2 +- .../InformationSchemaContentSupplierFactory.java | 138 +++++--- .../optimizations/PushPredicateIntoTableScan.java | 1 + .../dataregion/utils/DiskUsageStatisticUtil.java | 8 +- .../utils/StorageEngineTimePartitionIterator.java | 4 +- .../utils/TableDiskUsageStatisticUtil.java | 111 +++--- .../utils/TreeDiskUsageStatisticUtil.java | 1 - .../tableDiskUsageCache/TableDiskUsageCache.java | 188 +++++++++++ .../TableDiskUsageCacheReader.java | 143 ++++++++ .../TableDiskUsageCacheWriter.java | 295 ++++++++++++++++ .../TimePartitionTableSizeQueryContext.java | 68 ++++ .../TsFileTableSizeCacheReader.java | 373 +++++++++++++++++++++ .../iotdb/commons/concurrent/ThreadName.java | 1 + 13 files changed, 1226 insertions(+), 107 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java index 79b01cbcd3d..4b82557f06c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java @@ -135,7 +135,7 @@ public class ShowDiskUsageOperator implements SourceOperator { paginationController.consumeLimit(); statisticUtil.close(); } - if (paginationController.hasCurLimit() && timePartitionIterator.next()) { + if (paginationController.hasCurLimit() && timePartitionIterator.nextTimePartition()) { DataRegion dataRegion = timePartitionIterator.currentDataRegion(); long timePartition = timePartitionIterator.currentTimePartition(); statisticUtil = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index cbb50c6b293..63edf01ccb9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -86,7 +86,8 @@ import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlKeywords; import org.apache.iotdb.db.schemaengine.table.InformationSchemaUtils; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.utils.StorageEngineTimePartitionIterator; -import org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCacheReader; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TimePartitionTableSizeQueryContext; import org.apache.iotdb.db.utils.MathUtils; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -106,17 +107,18 @@ import org.apache.tsfile.utils.Pair; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -1156,12 +1158,13 @@ public class InformationSchemaContentSupplierFactory { private final OperatorContext operatorContext; private DataRegion currentDataRegion; - private long currentTimePartition; - private List<String> currentTablesToScan; private boolean currentDatabaseOnlyHasOneTable; - private TableDiskUsageStatisticUtil statisticUtil; - private final StorageEngineTimePartitionIterator timePartitionIterator; + private TableDiskUsageCacheReader currentDataRegionCacheReader; + private final Map<Long, TimePartitionTableSizeQueryContext> timePartitionsContextMap = + new LinkedHashMap<>(); + + private final StorageEngineTimePartitionIterator dataRegionIterator; private TableDiskUsageSupplier( final List<TSDataType> dataTypes, @@ -1179,7 +1182,7 @@ public class InformationSchemaContentSupplierFactory { ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { this.databaseTableInfoMap = client.showTables4InformationSchema().getDatabaseTableInfoMap(); } - this.timePartitionIterator = + this.dataRegionIterator = new StorageEngineTimePartitionIterator( Optional.of( dataRegion -> { @@ -1188,51 +1191,57 @@ public class InformationSchemaContentSupplierFactory { if (tTableInfos == null || tTableInfos.isEmpty()) { return false; } + timePartitionsContextMap.clear(); return PathUtils.isTableModelDatabase(dataRegion.getDatabaseName()); }), - Optional.of( - (dataRegion, timePartition) -> { - currentTablesToScan = getTablesToScan(dataRegion, timePartition); - return !currentTablesToScan.isEmpty(); - })); + Optional.empty()); } @Override public boolean hasNext() { - if (statisticUtil != null) { + if (currentDataRegionCacheReader != null) { return true; } if (!paginationController.hasCurLimit()) { return false; } try { - if (timePartitionIterator.next()) { - currentDataRegion = timePartitionIterator.currentDataRegion(); - currentTimePartition = timePartitionIterator.currentTimePartition(); - statisticUtil = - new TableDiskUsageStatisticUtil( - currentDataRegion.getTsFileManager(), - currentTimePartition, - currentTablesToScan, + while (dataRegionIterator.nextDataRegion()) { + currentDataRegion = dataRegionIterator.currentDataRegion(); + for (Long timePartition : currentDataRegion.getTsFileManager().getTimePartitions()) { + Map<String, Long> tablesToScan = getTablesToScan(currentDataRegion, timePartition); + if (!tablesToScan.isEmpty()) { + timePartitionsContextMap.put( + timePartition, new TimePartitionTableSizeQueryContext(tablesToScan)); + } + } + if (timePartitionsContextMap.isEmpty()) { + continue; + } + currentDataRegionCacheReader = + new TableDiskUsageCacheReader( + currentDataRegion, + timePartitionsContextMap, currentDatabaseOnlyHasOneTable, Optional.ofNullable(operatorContext.getInstanceContext())); return true; } - return false; } catch (Exception e) { - closeStatisticUtil(); - throw new RuntimeException(e.getMessage(), e); + closeDataRegionReader(); + throw new IoTDBRuntimeException( + e.getMessage(), e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); } + return false; } - private List<String> getTablesToScan(DataRegion dataRegion, long timePartition) { + private Map<String, Long> getTablesToScan(DataRegion dataRegion, long timePartition) { String databaseName = dataRegion.getDatabaseName(); List<TTableInfo> tTableInfos = databaseTableInfoMap.get(databaseName); if (tTableInfos == null || tTableInfos.isEmpty()) { - return Collections.emptyList(); + return Collections.emptyMap(); } - List<String> tablesToScan = new ArrayList<>(tTableInfos.size()); + Map<String, Long> tablesToScan = new TreeMap<>(); int totalValidTableCount = 0; for (TTableInfo tTableInfo : tTableInfos) { if (tTableInfo.getType() != TableType.BASE_TABLE.ordinal()) { @@ -1258,7 +1267,7 @@ public class InformationSchemaContentSupplierFactory { continue; } paginationController.consumeLimit(); - tablesToScan.add(tTableInfo.getTableName()); + tablesToScan.put(tTableInfo.getTableName(), 0L); } currentDatabaseOnlyHasOneTable = totalValidTableCount == 1; return tablesToScan; @@ -1273,47 +1282,72 @@ public class InformationSchemaContentSupplierFactory { long maxRuntime = OperatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long start = System.nanoTime(); - if (statisticUtil.hasNextFile()) { + try { + if (!currentDataRegionCacheReader.prepareCachedTsFileIDKeys(start, maxRuntime)) { + return null; + } + + boolean finished = false; do { - statisticUtil.calculateNextFile(); - } while (System.nanoTime() - start < maxRuntime && statisticUtil.hasNextFile()); - if (statisticUtil.hasNextFile()) { + if (!currentDataRegionCacheReader.calculateNextFile()) { + finished = true; + break; + } + } while (System.nanoTime() - start < maxRuntime); + + if (!finished) { return null; } + + if (!currentDataRegionCacheReader.readCacheValueFilesAndUpdateResultMap( + start, maxRuntime)) { + return null; + } + return buildTsBlock(); + } catch (Exception e) { + throw new IoTDBRuntimeException( + e.getMessage(), e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); } + } + private TsBlock buildTsBlock() { TsBlockBuilder builder = new TsBlockBuilder(dataTypes); - long[] resultArr = statisticUtil.getResult(); - - for (int i = 0; i < currentTablesToScan.size(); i++) { - builder.getTimeColumnBuilder().writeLong(0); - ColumnBuilder[] columns = builder.getValueColumnBuilders(); - - columns[0].writeBinary( - new Binary(currentDataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET)); - columns[1].writeBinary(new Binary(currentTablesToScan.get(i), TSFileConfig.STRING_CHARSET)); - columns[2].writeInt(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); - columns[3].writeInt(Integer.parseInt(currentDataRegion.getDataRegionIdString())); - columns[4].writeLong(currentTimePartition); - columns[5].writeLong(resultArr[i]); - builder.declarePosition(); + for (Map.Entry<Long, TimePartitionTableSizeQueryContext> entry : + timePartitionsContextMap.entrySet()) { + long timePartition = entry.getKey(); + for (Map.Entry<String, Long> tableSizeEntry : + entry.getValue().getTableSizeResultMap().entrySet()) { + String tableName = tableSizeEntry.getKey(); + long size = tableSizeEntry.getValue(); + builder.getTimeColumnBuilder().writeLong(0); + ColumnBuilder[] columns = builder.getValueColumnBuilders(); + + columns[0].writeBinary( + new Binary(currentDataRegion.getDatabaseName(), TSFileConfig.STRING_CHARSET)); + columns[1].writeBinary(new Binary(tableName, TSFileConfig.STRING_CHARSET)); + columns[2].writeInt(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()); + columns[3].writeInt(Integer.parseInt(currentDataRegion.getDataRegionIdString())); + columns[4].writeLong(timePartition); + columns[5].writeLong(size); + builder.declarePosition(); + } } - closeStatisticUtil(); + closeDataRegionReader(); return builder.build(); } @Override public void close() throws IOException { - closeStatisticUtil(); + closeDataRegionReader(); } - private void closeStatisticUtil() { - if (statisticUtil == null) { + private void closeDataRegionReader() { + if (currentDataRegionCacheReader == null) { return; } try { - statisticUtil.close(); - statisticUtil = null; + currentDataRegionCacheReader.close(); + currentDataRegionCacheReader = null; } catch (IOException ignored) { } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index fc6c47090fb..6c1a4af4a41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -475,6 +475,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { } return node; } + // push down for other information schema tables return combineFilterAndScan(node, context.inheritedPredicate); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java index 07296d217d2..c8298a7aa21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java @@ -53,6 +53,7 @@ public abstract class DiskUsageStatisticUtil implements Closeable { protected static final Logger logger = LoggerFactory.getLogger(DiskUsageStatisticUtil.class); protected Queue<TsFileResource> resourcesWithReadLock; + protected final long timePartition; protected final Iterator<TsFileResource> iterator; protected final LongConsumer timeSeriesMetadataIoSizeRecorder; @@ -60,6 +61,7 @@ public abstract class DiskUsageStatisticUtil implements Closeable { TsFileManager tsFileManager, long timePartition, Optional<FragmentInstanceContext> operatorContext) { + this.timePartition = timePartition; this.timeSeriesMetadataIoSizeRecorder = operatorContext .<LongConsumer>map( @@ -74,12 +76,14 @@ public abstract class DiskUsageStatisticUtil implements Closeable { iterator = resourcesWithReadLock.iterator(); } + public long getTimePartition() { + return timePartition; + } + public boolean hasNextFile() { return iterator.hasNext(); } - public abstract long[] getResult(); - protected void acquireReadLocks(List<TsFileResource> resources) { this.resourcesWithReadLock = new LinkedList<>(); try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java index 8cae9ad0e2d..a7a0096f2bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/StorageEngineTimePartitionIterator.java @@ -47,7 +47,7 @@ public class StorageEngineTimePartitionIterator implements Accountable { this.timePartitionFilter = timePartitionFilter; } - public boolean next() throws Exception { + public boolean nextTimePartition() throws Exception { while (true) { if (timePartitionIterator != null && timePartitionIterator.hasNext()) { currentTimePartition = timePartitionIterator.next(); @@ -62,7 +62,7 @@ public class StorageEngineTimePartitionIterator implements Accountable { } } - private boolean nextDataRegion() throws Exception { + public boolean nextDataRegion() throws Exception { while (dataRegionIterator.hasNext()) { currentDataRegion = dataRegionIterator.next(); if (currentDataRegion == null || currentDataRegion.isDeleted()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java index f1236c1673e..2f0e5b48045 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java @@ -20,8 +20,11 @@ package org.apache.iotdb.db.storageengine.dataregion.utils; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TimePartitionTableSizeQueryContext; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.MetadataIndexNode; @@ -32,6 +35,7 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -41,39 +45,51 @@ import java.util.Optional; public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil { public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableDiskUsageStatisticUtil.class); - private final Map<String, Integer> tableIndexMap; + private final String database; + private final List<Pair<TsFileID, Long>> tsFilesToQueryInCache; + private final TimePartitionTableSizeQueryContext tableSizeQueryContext; private final boolean databaseHasOnlyOneTable; - private final long[] resultArr; public TableDiskUsageStatisticUtil( - TsFileManager tsFileManager, + DataRegion dataRegion, long timePartition, - List<String> tableNames, + TimePartitionTableSizeQueryContext tableSizeQueryContext, boolean databaseHasOnlyOneTable, + List<Pair<TsFileID, Long>> tsFilesToQueryInCache, Optional<FragmentInstanceContext> context) { - super(tsFileManager, timePartition, context); - this.tableIndexMap = new HashMap<>(); - for (int i = 0; i < tableNames.size(); i++) { - tableIndexMap.put(tableNames.get(i), i); - } + super(dataRegion.getTsFileManager(), timePartition, context); + this.database = dataRegion.getDatabaseName(); + this.tableSizeQueryContext = tableSizeQueryContext; + this.tsFilesToQueryInCache = tsFilesToQueryInCache; this.databaseHasOnlyOneTable = databaseHasOnlyOneTable; - this.resultArr = new long[tableNames.size()]; - } - - @Override - public long[] getResult() { - return resultArr; } @Override protected boolean calculateWithoutOpenFile(TsFileResource tsFileResource) { + TsFileID tsFileID = tsFileResource.getTsFileID(); + Long cachedValueOffset = tableSizeQueryContext.getCachedTsFileIdOffset(tsFileID); + if (cachedValueOffset != null) { + tsFilesToQueryInCache.add(new Pair<>(tsFileID, cachedValueOffset)); + return true; + } + if (!databaseHasOnlyOneTable || tsFileResource.anyModFileExists()) { return false; } - resultArr[0] += tsFileResource.getTsFileSize(); + String table = tableSizeQueryContext.getTableSizeResultMap().keySet().iterator().next(); + tableSizeQueryContext.updateResult(table, tsFileResource.getTsFileSize()); + TableDiskUsageCache.getInstance() + .write( + database, + tsFileResource.getTsFileID(), + Collections.singletonMap(table, tsFileResource.getTsFileSize())); return true; } + public List<Pair<TsFileID, Long>> getTsFilesToQueryInCache() { + return tsFilesToQueryInCache; + } + @Override protected void calculateNextFile(TsFileResource tsFileResource, TsFileSequenceReader reader) throws IOException { @@ -81,53 +97,50 @@ public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil { if (!hasSatisfiedData(tsFileMetadata)) { return; } - Pair<Integer, Boolean> allSatisfiedTableIndexPair = getAllSatisfiedTableIndex(tsFileMetadata); - int allSatisfiedTableIndex = allSatisfiedTableIndexPair.getLeft(); - // the only one table in this tsfile might be deleted by mods, and it is not the table we - // queried - boolean mayContainSearchedTable = allSatisfiedTableIndexPair.getRight(); - if (allSatisfiedTableIndex != -1) { - resultArr[allSatisfiedTableIndex] += - mayContainSearchedTable ? tsFileResource.getTsFileSize() : 0; + + if (tsFileMetadata.getTableMetadataIndexNodeMap().size() == 1) { + String satisfiedTable = + tsFileMetadata.getTableMetadataIndexNodeMap().keySet().iterator().next(); + tableSizeQueryContext.updateResult(satisfiedTable, tsFileResource.getTsFileSize()); + TableDiskUsageCache.getInstance() + .write( + database, + tsFileResource.getTsFileID(), + Collections.singletonMap( + tsFileMetadata.getTableMetadataIndexNodeMap().keySet().iterator().next(), + tsFileResource.getTsFileSize())); return; } - calculateDiskUsageInBytesByOffset(reader); + + calculateDiskUsageInBytesByOffset(tsFileResource, reader); } private boolean hasSatisfiedData(TsFileMetadata tsFileMetadata) { Map<String, MetadataIndexNode> tableMetadataIndexNodeMap = tsFileMetadata.getTableMetadataIndexNodeMap(); - return tableIndexMap.keySet().stream().anyMatch(tableMetadataIndexNodeMap::containsKey); + return tableSizeQueryContext.getTableSizeResultMap().keySet().stream() + .anyMatch(tableMetadataIndexNodeMap::containsKey); } - private Pair<Integer, Boolean> getAllSatisfiedTableIndex(TsFileMetadata tsFileMetadata) { - if (tsFileMetadata.getTableMetadataIndexNodeMap().size() != 1) { - return new Pair<>(-1, true); - } - String satisfiedTableName = - tsFileMetadata.getTableMetadataIndexNodeMap().keySet().iterator().next(); - String searchedTableName = tableIndexMap.keySet().iterator().next(); - return new Pair<>( - tableIndexMap.get(satisfiedTableName), satisfiedTableName.equals(searchedTableName)); - } - - private void calculateDiskUsageInBytesByOffset(TsFileSequenceReader reader) throws IOException { + private void calculateDiskUsageInBytesByOffset( + TsFileResource resource, TsFileSequenceReader reader) throws IOException { TsFileMetadata tsFileMetadata = reader.readFileMetadata(); Map<String, MetadataIndexNode> tableMetadataIndexNodeMap = tsFileMetadata.getTableMetadataIndexNodeMap(); - String nextTable = null; + String currentTable = null, nextTable = null; Iterator<String> iterator = tableMetadataIndexNodeMap.keySet().iterator(); Map<String, Offsets> tableOffsetMap = new HashMap<>(); - while (iterator.hasNext()) { - String currentTable = iterator.next(); - while (currentTable != null && tableIndexMap.containsKey(currentTable)) { - nextTable = iterator.hasNext() ? iterator.next() : null; - long tableSize = - calculateTableSize(tableOffsetMap, tsFileMetadata, reader, currentTable, nextTable); - resultArr[tableIndexMap.get(currentTable)] += tableSize; - currentTable = nextTable; - } + Map<String, Long> tsFileTableSizeMap = new HashMap<>(); + while (currentTable != null || iterator.hasNext()) { + currentTable = currentTable == null ? iterator.next() : currentTable; + nextTable = iterator.hasNext() ? iterator.next() : null; + long tableSize = + calculateTableSize(tableOffsetMap, tsFileMetadata, reader, currentTable, nextTable); + tableSizeQueryContext.updateResult(currentTable, tableSize); + tsFileTableSizeMap.put(currentTable, tableSize); + currentTable = nextTable; } + TableDiskUsageCache.getInstance().write(database, resource.getTsFileID(), tsFileTableSizeMap); } private long calculateTableSize( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java index 5d6f2aaed5e..7b01faf6eb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java @@ -70,7 +70,6 @@ public class TreeDiskUsageStatisticUtil extends DiskUsageStatisticUtil { this.isMatchedDeviceSequential = true; } - @Override public long[] getResult() { return new long[] {result}; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java new file mode 100644 index 00000000000..c63dca4a36a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; + +public class TableDiskUsageCache { + private static final Logger LOGGER = LoggerFactory.getLogger(TableDiskUsageCache.class); + private final BlockingQueue<Operation> queue = new LinkedBlockingQueue<>(); + private final Map<Integer, TableDiskUsageCacheWriter> writerMap = new HashMap<>(); + private final ScheduledExecutorService scheduledExecutorService; + + private TableDiskUsageCache() { + scheduledExecutorService = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.FILE_TIME_INDEX_RECORD.getName()); + scheduledExecutorService.submit(this::run); + } + + private void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + try { + Operation operation = queue.take(); + operation.apply(this); + } catch (InterruptedException e) { + return; + } catch (Exception e) { + LOGGER.error("Meet exception when apply TableDiskUsageCache.", e); + } + } + } finally { + writerMap.values().forEach(TableDiskUsageCacheWriter::close); + } + } + + public void write(String database, TsFileID tsFileID, Map<String, Long> tableSizeMap) { + queue.add(new WriteOperation(database, tsFileID, tableSizeMap)); + } + + public void write(String database, TsFileID originTsFileID, TsFileID newTsFileID) { + queue.add(new ReplaceOperation(database, originTsFileID, newTsFileID)); + } + + public CompletableFuture<TsFileTableSizeCacheReader> startRead(String database, int regionId) { + StartReadOperation operation = new StartReadOperation(database, regionId); + queue.add(operation); + return operation.future; + } + + public void endRead(String database, int regionId) { + EndReadOperation operation = new EndReadOperation(database, regionId); + queue.add(operation); + } + + public abstract static class Operation { + protected final String database; + protected final int regionId; + + protected Operation(String database, int regionId) { + this.database = database; + this.regionId = regionId; + } + + public abstract void apply(TableDiskUsageCache tableDiskUsageCache) throws IOException; + } + + private static class StartReadOperation extends Operation { + public CompletableFuture<TsFileTableSizeCacheReader> future = new CompletableFuture<>(); + + public StartReadOperation(String database, int regionId) { + super(database, regionId); + } + + @Override + public void apply(TableDiskUsageCache tableDiskUsageCache) throws IOException { + try { + TableDiskUsageCacheWriter writer = + tableDiskUsageCache.writerMap.computeIfAbsent( + regionId, k -> new TableDiskUsageCacheWriter(database, regionId)); + writer.flush(); + writer.increaseActiveReaderNum(); + future.complete( + new TsFileTableSizeCacheReader( + writer.keyFileLength(), + writer.getKeyFile(), + writer.valueFileLength(), + writer.getValueFile(), + regionId)); + } catch (Throwable t) { + future.completeExceptionally(t); + } + } + } + + private static class EndReadOperation extends Operation { + public EndReadOperation(String database, int regionId) { + super(database, regionId); + } + + @Override + public void apply(TableDiskUsageCache tableDiskUsageCache) throws IOException { + TableDiskUsageCacheWriter writer = + tableDiskUsageCache.writerMap.computeIfAbsent( + regionId, k -> new TableDiskUsageCacheWriter(database, regionId)); + writer.decreaseActiveReaderNum(); + } + } + + private static class WriteOperation extends Operation { + + private final TsFileID tsFileID; + private final Map<String, Long> tableSizeMap; + + protected WriteOperation(String database, TsFileID tsFileID, Map<String, Long> tableSizeMap) { + super(database, tsFileID.regionId); + this.tsFileID = tsFileID; + this.tableSizeMap = tableSizeMap; + } + + @Override + public void apply(TableDiskUsageCache tableDiskUsageCache) throws IOException { + tableDiskUsageCache + .writerMap + .computeIfAbsent(regionId, k -> new TableDiskUsageCacheWriter(database, regionId)) + .write(tsFileID, tableSizeMap); + } + } + + private static class ReplaceOperation extends Operation { + private final TsFileID originTsFileID; + private final TsFileID newTsFileID; + + public ReplaceOperation(String database, TsFileID originTsFileID, TsFileID newTsFileID) { + super(database, originTsFileID.regionId); + this.originTsFileID = originTsFileID; + this.newTsFileID = newTsFileID; + } + + @Override + public void apply(TableDiskUsageCache tableDiskUsageCache) throws IOException { + TableDiskUsageCacheWriter writer = tableDiskUsageCache.writerMap.get(regionId); + if (writer != null) { + writer.write(originTsFileID, newTsFileID); + } + } + } + + public static TableDiskUsageCache getInstance() { + return TableDiskUsageCache.InstanceHolder.INSTANCE; + } + + private static class InstanceHolder { + private InstanceHolder() {} + + private static final TableDiskUsageCache INSTANCE = new TableDiskUsageCache(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java new file mode 100644 index 00000000000..7e7e459e95f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; + +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil; + +import org.apache.tsfile.utils.Pair; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public class TableDiskUsageCacheReader implements Closeable { + + private final DataRegion dataRegion; + private final int regionId; + private final Map<Long, TimePartitionTableSizeQueryContext> timePartitionQueryContexts; + private CompletableFuture<TsFileTableSizeCacheReader> future; + private TsFileTableSizeCacheReader cacheFileReader; + + private final Iterator<Map.Entry<Long, TimePartitionTableSizeQueryContext>> timePartitionIterator; + + private final boolean currentDatabaseOnlyHasOneTable; + private final Optional<FragmentInstanceContext> context; + private TableDiskUsageStatisticUtil tableDiskUsageStatisticUtil; + + private final List<Pair<TsFileID, Long>> tsFilesToQueryInCache = new ArrayList<>(); + private Iterator<Pair<TsFileID, Long>> tsFilesToQueryInCacheIterator = null; + + public TableDiskUsageCacheReader( + DataRegion dataRegion, + Map<Long, TimePartitionTableSizeQueryContext> resultMap, + boolean databaseHasOnlyOneTable, + Optional<FragmentInstanceContext> context) { + this.dataRegion = dataRegion; + this.regionId = Integer.parseInt(dataRegion.getDataRegionIdString()); + this.timePartitionQueryContexts = resultMap; + this.currentDatabaseOnlyHasOneTable = databaseHasOnlyOneTable; + this.context = context; + this.timePartitionIterator = timePartitionQueryContexts.entrySet().iterator(); + } + + public boolean prepareCachedTsFileIDKeys(long startTime, long maxRunTime) throws Exception { + if (this.cacheFileReader == null) { + this.future = + this.future == null + ? TableDiskUsageCache.getInstance().startRead(dataRegion.getDatabaseName(), regionId) + : future; + do { + try { + if (future.isDone()) { + this.cacheFileReader = future.get(); + this.cacheFileReader.openKeyFile(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } while (!future.isDone() && System.nanoTime() - startTime < maxRunTime); + } + if (this.cacheFileReader == null) { + return false; + } + return cacheFileReader.readFromKeyFile(timePartitionQueryContexts, startTime, maxRunTime); + } + + public boolean calculateNextFile() { + while (true) { + if (tableDiskUsageStatisticUtil != null && tableDiskUsageStatisticUtil.hasNextFile()) { + tableDiskUsageStatisticUtil.calculateNextFile(); + return true; + } + if (timePartitionIterator.hasNext()) { + Map.Entry<Long, TimePartitionTableSizeQueryContext> currentTimePartitionEntry = + timePartitionIterator.next(); + long timePartition = currentTimePartitionEntry.getKey(); + tableDiskUsageStatisticUtil = + new TableDiskUsageStatisticUtil( + dataRegion, + timePartition, + currentTimePartitionEntry.getValue(), + currentDatabaseOnlyHasOneTable, + tsFilesToQueryInCache, + context); + } else { + return false; + } + } + } + + public boolean readCacheValueFilesAndUpdateResultMap(long startTime, long maxRunTime) + throws IOException { + if (this.tsFilesToQueryInCacheIterator == null) { + this.tsFilesToQueryInCache.sort(Comparator.comparingLong(Pair::getRight)); + this.tsFilesToQueryInCacheIterator = tsFilesToQueryInCache.iterator(); + this.cacheFileReader.openValueFile(); + } + return cacheFileReader.readFromValueFile( + tsFilesToQueryInCacheIterator, timePartitionQueryContexts, startTime, maxRunTime); + } + + public DataRegion getDataRegion() { + return dataRegion; + } + + @Override + public void close() throws IOException { + if (future != null) { + TableDiskUsageCache.getInstance().endRead(dataRegion.getDatabaseName(), regionId); + future = null; + } + if (cacheFileReader != null) { + cacheFileReader.closeCurrentFile(); + cacheFileReader = null; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java new file mode 100644 index 00000000000..5c35b0701bf --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; + +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; + +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TableDiskUsageCacheWriter { + private static final String TSFILE_CACHE_KEY_FILENAME_PREFIX = "TableSizeKeyFile_"; + private static final String TSFILE_CACHE_VALUE_FILENAME_PREFIX = "TableSizeValueFile_"; + public static final int KEY_FILE_OFFSET_RECORD_LENGTH = 5 * Long.BYTES; + public static final int KEY_FILE_REDIRECT_RECORD_LENGTH = 7 * Long.BYTES; + private static final String TEMP_CACHE_FILE_SUBFIX = ".tmp"; + public static final byte KEY_FILE_RECORD_TYPE_OFFSET = 1; + public static final byte KEY_FILE_RECORD_TYPE_REDIRECT = 2; + + private final int regionId; + private int activeReaderNum = 0; + private int currentTsFileIndexFileVersion = 0; + private final File dir; + private File currentKeyIndexFile; + private File currentValueIndexFile; + private FileOutputStream keyFileOutputStream; + private FileOutputStream valueFileOutputStream; + private BufferedOutputStream keyBufferedOutputStream; + private BufferedOutputStream valueBufferedOutputStream; + private long keyFileSize; + private long valueFileSize; + + public TableDiskUsageCacheWriter(String database, int regionId) { + this.regionId = regionId; + this.dir = StorageEngine.getDataRegionSystemDir(database, regionId + ""); + recoverTsFileTableSizeIndexFile(); + } + + private void recoverTsFileTableSizeIndexFile() { + dir.mkdirs(); + File[] files = dir.listFiles(); + currentTsFileIndexFileVersion = 0; + List<File> keyFiles = new ArrayList<>(); + List<File> valueFiles = new ArrayList<>(); + if (files != null) { + for (File file : files) { + String fileName = file.getName(); + boolean isKeyFile = fileName.startsWith(TSFILE_CACHE_KEY_FILENAME_PREFIX); + boolean isValueFile = !isKeyFile && fileName.startsWith(TSFILE_CACHE_VALUE_FILENAME_PREFIX); + boolean isTempFile = fileName.endsWith(TEMP_CACHE_FILE_SUBFIX); + if (!isKeyFile) { + if (isValueFile && !isTempFile) { + valueFiles.add(file); + } + continue; + } + if (isTempFile) { + try { + Files.delete(file.toPath()); + } catch (IOException ignored) { + } + } + int version; + try { + version = Integer.parseInt(fileName.substring(TSFILE_CACHE_KEY_FILENAME_PREFIX.length())); + currentTsFileIndexFileVersion = Math.max(currentTsFileIndexFileVersion, version); + } catch (NumberFormatException ignored) { + continue; + } + File valueFile = + new File(dir + File.separator + TSFILE_CACHE_VALUE_FILENAME_PREFIX + version); + // may have a valid value index file + if (!valueFile.exists()) { + File tempValueFile = new File(valueFile.getPath() + TEMP_CACHE_FILE_SUBFIX); + if (tempValueFile.exists()) { + tempValueFile.renameTo(valueFile); + valueFiles.add(valueFile); + } else { + // lost value file + try { + Files.delete(file.toPath()); + } catch (IOException ignored) { + } + continue; + } + } + keyFiles.add(file); + } + if (keyFiles.size() > 1) { + deleteOldVersionFiles( + currentTsFileIndexFileVersion, TSFILE_CACHE_KEY_FILENAME_PREFIX, keyFiles); + } + if (valueFiles.size() > 1) { + deleteOldVersionFiles( + currentTsFileIndexFileVersion, TSFILE_CACHE_VALUE_FILENAME_PREFIX, valueFiles); + } + } + currentKeyIndexFile = + keyFiles.isEmpty() + ? new File( + dir + + File.separator + + TSFILE_CACHE_KEY_FILENAME_PREFIX + + currentTsFileIndexFileVersion) + : keyFiles.get(0); + currentValueIndexFile = + valueFiles.isEmpty() + ? new File( + dir + + File.separator + + TSFILE_CACHE_VALUE_FILENAME_PREFIX + + currentTsFileIndexFileVersion) + : valueFiles.get(0); + try { + cacheFileSelfCheck(); + } catch (IOException ignored) { + } + } + + private void cacheFileSelfCheck() throws IOException { + currentKeyIndexFile.createNewFile(); + currentValueIndexFile.createNewFile(); + TsFileTableSizeCacheReader cacheFileReader = + new TsFileTableSizeCacheReader( + currentKeyIndexFile.length(), + currentKeyIndexFile, + currentValueIndexFile.length(), + currentValueIndexFile, + regionId); + Pair<Long, Long> truncateSize = cacheFileReader.selfCheck(); + if (truncateSize.left != currentKeyIndexFile.length()) { + try (FileChannel channel = + FileChannel.open(currentKeyIndexFile.toPath(), StandardOpenOption.WRITE)) { + channel.truncate(truncateSize.left); + } + } + if (truncateSize.right != currentValueIndexFile.length()) { + try (FileChannel channel = + FileChannel.open(currentValueIndexFile.toPath(), StandardOpenOption.WRITE)) { + channel.truncate(truncateSize.right); + } + } + this.keyFileSize = truncateSize.left; + this.valueFileSize = truncateSize.right; + } + + private void deleteOldVersionFiles(int maxVersion, String prefix, List<File> files) { + for (File file : files) { + try { + int version = Integer.parseInt(file.getName().substring(prefix.length())); + if (version != maxVersion) { + Files.deleteIfExists(file.toPath()); + } + } catch (Exception e) { + } + } + } + + public void write(TsFileID tsFileID, Map<String, Long> tableSizeMap) throws IOException { + if (keyFileOutputStream == null) { + keyFileOutputStream = new FileOutputStream(currentKeyIndexFile, true); + keyFileSize = currentKeyIndexFile.length(); + keyBufferedOutputStream = new BufferedOutputStream(keyFileOutputStream); + } + if (valueFileOutputStream == null) { + valueFileOutputStream = new FileOutputStream(currentValueIndexFile, true); + valueFileSize = currentValueIndexFile.length(); + valueBufferedOutputStream = new BufferedOutputStream(valueFileOutputStream); + } + + long valueOffset = valueFileSize; + valueFileSize += + ReadWriteForEncodingUtils.writeVarInt(tableSizeMap.size(), valueBufferedOutputStream); + for (Map.Entry<String, Long> entry : tableSizeMap.entrySet()) { + valueFileSize += ReadWriteIOUtils.writeVar(entry.getKey(), valueBufferedOutputStream); + valueFileSize += ReadWriteIOUtils.write(entry.getValue(), valueBufferedOutputStream); + } + keyFileSize += ReadWriteIOUtils.write(KEY_FILE_RECORD_TYPE_OFFSET, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(tsFileID.timePartitionId, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(tsFileID.timestamp, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(tsFileID.fileVersion, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(tsFileID.compactionVersion, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(valueOffset, keyBufferedOutputStream); + } + + public void write(TsFileID originTsFileID, TsFileID newTsFileID) throws IOException { + if (keyFileOutputStream == null) { + keyFileOutputStream = new FileOutputStream(currentKeyIndexFile, true); + keyFileSize = currentKeyIndexFile.length(); + keyBufferedOutputStream = new BufferedOutputStream(keyFileOutputStream); + } + keyFileSize += ReadWriteIOUtils.write(KEY_FILE_RECORD_TYPE_REDIRECT, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(newTsFileID.timePartitionId, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(newTsFileID.timestamp, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(newTsFileID.fileVersion, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(newTsFileID.compactionVersion, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(originTsFileID.timestamp, keyBufferedOutputStream); + keyFileSize += ReadWriteIOUtils.write(originTsFileID.fileVersion, keyBufferedOutputStream); + keyFileSize += + ReadWriteIOUtils.write(originTsFileID.compactionVersion, keyBufferedOutputStream); + } + + public void compact() {} + + public void flush() throws IOException { + if (valueBufferedOutputStream != null) { + valueBufferedOutputStream.flush(); + } + if (keyFileOutputStream != null) { + keyBufferedOutputStream.flush(); + } + } + + public File getKeyFile() { + return currentKeyIndexFile; + } + + public File getValueFile() { + return currentValueIndexFile; + } + + public long keyFileLength() { + return keyFileSize; + } + + public long valueFileLength() { + return valueFileSize; + } + + public void fsync() throws IOException { + flush(); + valueFileOutputStream.getFD().sync(); + keyFileOutputStream.getFD().sync(); + } + + public void increaseActiveReaderNum() { + activeReaderNum++; + } + + public void decreaseActiveReaderNum() { + activeReaderNum--; + } + + public int getActiveReaderNum() { + return activeReaderNum; + } + + public void close() { + try { + fsync(); + } catch (IOException ignored) { + } + try { + if (valueBufferedOutputStream != null) { + valueBufferedOutputStream.close(); + } + } catch (IOException ignored) { + } + try { + if (keyBufferedOutputStream != null) { + keyBufferedOutputStream.close(); + } + } catch (IOException ignored) { + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TimePartitionTableSizeQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TimePartitionTableSizeQueryContext.java new file mode 100644 index 00000000000..bc7f8271d99 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TimePartitionTableSizeQueryContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; + +import java.util.HashMap; +import java.util.Map; + +public class TimePartitionTableSizeQueryContext { + private final Map<String, Long> tableSizeResultMap; + Map<TsFileID, Long> tsFileIDOffsetInValueFileMap; + + public TimePartitionTableSizeQueryContext(Map<String, Long> tableSizeResultMap) { + this.tableSizeResultMap = tableSizeResultMap; + } + + public void addCachedTsFileIDAndOffsetInValueFile(TsFileID tsFileID, long offset) { + if (tsFileIDOffsetInValueFileMap == null) { + tsFileIDOffsetInValueFileMap = new HashMap<>(); + } + tsFileIDOffsetInValueFileMap.put(tsFileID, offset); + } + + public void replaceCachedTsFileID(TsFileID originTsFileID, TsFileID newTsFileID) { + if (tsFileIDOffsetInValueFileMap == null) { + return; + } + Long offset = tsFileIDOffsetInValueFileMap.remove(originTsFileID); + if (offset != null) { + tsFileIDOffsetInValueFileMap.put(newTsFileID, offset); + } + } + + public void updateResult(String table, long size) { + tableSizeResultMap.computeIfPresent(table, (k, v) -> v + size); + } + + public Map<String, Long> getTableSizeResultMap() { + return tableSizeResultMap; + } + + public boolean hasCachedTsFileID(TsFileID tsFileID) { + return tsFileIDOffsetInValueFileMap != null + && tsFileIDOffsetInValueFileMap.containsKey(tsFileID); + } + + public Long getCachedTsFileIdOffset(TsFileID tsFileID) { + return tsFileIDOffsetInValueFileMap == null ? null : tsFileIDOffsetInValueFileMap.get(tsFileID); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheReader.java new file mode 100644 index 00000000000..7229497ae36 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheReader.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.utils.MmapUtil; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class TsFileTableSizeCacheReader { + + private long readSize = 0; + private final File keyFile; + private final long keyFileLength; + private final File valueFile; + private final long valueFileLength; + private DirectBufferedSeekableFileInputStream inputStream; + private final int regionId; + + public TsFileTableSizeCacheReader( + long keyFileLength, File keyFile, long valueFileLength, File valueFile, int regionId) { + this.keyFile = keyFile; + this.keyFileLength = keyFileLength; + this.valueFile = valueFile; + this.valueFileLength = valueFileLength; + this.regionId = regionId; + } + + public void openKeyFile() throws IOException { + if (keyFileLength > 0 && inputStream == null) { + this.inputStream = new DirectBufferedSeekableFileInputStream(keyFile.toPath(), 4 * 1024); + } + } + + public void openValueFile() throws IOException { + if (valueFileLength > 0 && inputStream == null) { + this.inputStream = new DirectBufferedSeekableFileInputStream(valueFile.toPath(), 4 * 1024); + } + } + + public Pair<Long, Long> selfCheck() { + List<Pair<Long, Long>> offsetsByReadValueFile = new ArrayList<>(); + try { + openValueFile(); + while (readSize < valueFileLength) { + long offset = inputStream.position(); + int tableNum = ReadWriteForEncodingUtils.readVarInt(inputStream); + if (tableNum <= 0) { + break; + } + for (int i = 0; i < tableNum; i++) { + ReadWriteIOUtils.readVarIntString(inputStream); + ReadWriteIOUtils.readLong(inputStream); + } + offsetsByReadValueFile.add(new Pair<>(offset, inputStream.position())); + } + } catch (Exception ignored) { + } finally { + closeCurrentFile(); + } + + if (offsetsByReadValueFile.isEmpty()) { + return new Pair<>(0L, 0L); + } + Iterator<Pair<Long, Long>> valueOffsetIterator = offsetsByReadValueFile.iterator(); + long keyFileTruncateSize = 0; + long valueFileTruncateSize = 0; + try { + openKeyFile(); + while (readSize < keyFileLength) { + KeyFileEntry keyFileEntry = readOneEntryFromKeyFile(); + if (keyFileEntry.originTsFileID != null) { + continue; + } + if (!valueOffsetIterator.hasNext()) { + break; + } + Pair<Long, Long> startEndOffsetInValueFile = valueOffsetIterator.next(); + if (startEndOffsetInValueFile.left != keyFileEntry.offset) { + break; + } + keyFileTruncateSize = readSize; + valueFileTruncateSize = startEndOffsetInValueFile.right; + } + } catch (Exception ignored) { + } finally { + closeCurrentFile(); + } + return new Pair<>(keyFileTruncateSize, valueFileTruncateSize); + } + + public boolean readFromKeyFile( + Map<Long, TimePartitionTableSizeQueryContext> timePartitionContexts, + long startTime, + long maxRunTime) + throws IOException { + long previousTimePartition = 0; + TimePartitionTableSizeQueryContext timePartitionContext = null; + do { + if (readSize >= keyFileLength) { + closeCurrentFile(); + return true; + } + try { + KeyFileEntry keyFileEntry = readOneEntryFromKeyFile(); + if (timePartitionContext == null + || keyFileEntry.tsFileID.timePartitionId != previousTimePartition) { + previousTimePartition = keyFileEntry.tsFileID.timePartitionId; + timePartitionContext = timePartitionContexts.get(previousTimePartition); + } + if (keyFileEntry.originTsFileID == null) { + timePartitionContext.addCachedTsFileIDAndOffsetInValueFile( + keyFileEntry.tsFileID, keyFileEntry.offset); + } else { + timePartitionContext.replaceCachedTsFileID( + keyFileEntry.tsFileID, keyFileEntry.originTsFileID); + } + } catch (IOException e) { + readSize = keyFileLength; + closeCurrentFile(); + throw e; + } + } while (System.nanoTime() - startTime < maxRunTime); + return false; + } + + private KeyFileEntry readOneEntryFromKeyFile() throws IOException { + byte type = ReadWriteIOUtils.readByte(inputStream); + long timePartition = ReadWriteIOUtils.readLong(inputStream); + long timestamp = ReadWriteIOUtils.readLong(inputStream); + long fileVersion = ReadWriteIOUtils.readLong(inputStream); + long compactionVersion = ReadWriteIOUtils.readLong(inputStream); + TsFileID tsFileID = + new TsFileID(regionId, timePartition, timestamp, fileVersion, compactionVersion); + KeyFileEntry keyFileEntry; + if (type == TableDiskUsageCacheWriter.KEY_FILE_RECORD_TYPE_OFFSET) { + long offset = ReadWriteIOUtils.readLong(inputStream); + keyFileEntry = new KeyFileEntry(tsFileID, offset); + readSize += TableDiskUsageCacheWriter.KEY_FILE_OFFSET_RECORD_LENGTH + 1; + } else if (type == TableDiskUsageCacheWriter.KEY_FILE_RECORD_TYPE_REDIRECT) { + long originTimestamp = ReadWriteIOUtils.readLong(inputStream); + long originFileVersion = ReadWriteIOUtils.readLong(inputStream); + long originCompactionVersion = ReadWriteIOUtils.readLong(inputStream); + TsFileID originTsFileID = + new TsFileID( + regionId, timePartition, originTimestamp, originFileVersion, originCompactionVersion); + keyFileEntry = new KeyFileEntry(tsFileID, originTsFileID); + readSize += TableDiskUsageCacheWriter.KEY_FILE_REDIRECT_RECORD_LENGTH + 1; + } else { + throw new IoTDBRuntimeException( + "Unsupported record type in file: " + keyFile.getPath() + ", type: " + type, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + return keyFileEntry; + } + + public boolean readFromValueFile( + Iterator<Pair<TsFileID, Long>> tsFilesToQueryInCache, + Map<Long, TimePartitionTableSizeQueryContext> resultMap, + long startTime, + long maxRunTime) + throws IOException { + long previousTimePartition = 0; + TimePartitionTableSizeQueryContext currentTimePartition = null; + do { + if (!tsFilesToQueryInCache.hasNext()) { + closeCurrentFile(); + return true; + } + Pair<TsFileID, Long> pair = tsFilesToQueryInCache.next(); + long timePartition = pair.left.timePartitionId; + if (currentTimePartition == null || timePartition != previousTimePartition) { + currentTimePartition = resultMap.get(timePartition); + previousTimePartition = timePartition; + } + long offset = pair.right; + inputStream.seek(offset); + + readSize = offset; + int tableNum = ReadWriteForEncodingUtils.readVarInt(inputStream); + for (int i = 0; i < tableNum; i++) { + String tableName = ReadWriteIOUtils.readVarIntString(inputStream); + long size = ReadWriteIOUtils.readLong(inputStream); + currentTimePartition.updateResult(tableName, size); + } + } while (System.nanoTime() - startTime < maxRunTime); + return false; + } + + public void closeCurrentFile() { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException ignored) { + } + inputStream = null; + readSize = 0; + } + } + + private static class KeyFileEntry { + public TsFileID tsFileID; + public TsFileID originTsFileID; + public long offset; + + public KeyFileEntry(TsFileID tsFileID, long offset) { + this.tsFileID = tsFileID; + this.offset = offset; + } + + public KeyFileEntry(TsFileID tsFileID, TsFileID originTsFileID) { + this.tsFileID = tsFileID; + this.originTsFileID = originTsFileID; + } + } + + private static class DirectBufferedSeekableFileInputStream extends InputStream { + + private final FileChannel channel; + private ByteBuffer buffer; + + private long bufferStartPos = 0; + private long position = 0; + + private final int seekThreshold; + + public DirectBufferedSeekableFileInputStream(Path path, int bufferSize) throws IOException { + this.channel = FileChannel.open(path, StandardOpenOption.READ); + this.buffer = ByteBuffer.allocateDirect(bufferSize); + this.buffer.limit(0); + this.seekThreshold = bufferSize * 2; + } + + public void seek(long newPos) throws IOException { + if (newPos == position) { + return; + } + + if (newPos > position) { + + long bufferEnd = bufferStartPos + buffer.limit(); + + if (newPos < bufferEnd) { + buffer.position((int) (newPos - bufferStartPos)); + position = newPos; + return; + } + + long gap = newPos - position; + + if (gap <= seekThreshold) { + discardBuffer(); + bufferStartPos = position; + refill(); + if (newPos < bufferStartPos + buffer.limit()) { + buffer.position((int) (newPos - bufferStartPos)); + position = newPos; + return; + } + } + } + + discardBuffer(); + channel.position(newPos); + bufferStartPos = newPos; + position = newPos; + } + + @Override + public int read() throws IOException { + if (!buffer.hasRemaining()) { + if (!refill()) { + return -1; + } + } + position++; + return buffer.get() & 0xFF; + } + + @Override + public int read(byte[] dst, int off, int len) throws IOException { + if (len == 0) { + return 0; + } + + int totalRead = 0; + + while (len > 0) { + if (!buffer.hasRemaining()) { + if (!refill()) { + return totalRead == 0 ? -1 : totalRead; + } + } + + int n = Math.min(len, buffer.remaining()); + buffer.get(dst, off, n); + + off += n; + len -= n; + totalRead += n; + position += n; + } + + return totalRead; + } + + private boolean refill() throws IOException { + buffer.clear(); + bufferStartPos = channel.position(); + + int read = channel.read(buffer); + if (read <= 0) { + buffer.limit(0); + return false; + } + + buffer.flip(); + return true; + } + + private void discardBuffer() { + buffer.clear(); + buffer.limit(0); + } + + public long position() { + return position; + } + + @Override + public void close() throws IOException { + if (buffer != null) { + MmapUtil.clean(buffer); + buffer = null; + } + channel.close(); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 6f9f95ca8fe..3201213dd89 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -200,6 +200,7 @@ public enum ThreadName { REGION_MIGRATE("Region-Migrate-Pool"), STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"), FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"), + TABLE_SIZE_INDEX_RECORD("TableSizeIndexRecord"), BINARY_ALLOCATOR_SAMPLE_EVICTOR("BinaryAllocator-SampleEvictor"), BINARY_ALLOCATOR_AUTO_RELEASER("BinaryAllocator-Auto-Releaser"),
