This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/table_disk_usage_statistics_with_cache by this push:
new 0bbf45dd9e5 fix
0bbf45dd9e5 is described below
commit 0bbf45dd9e50b6a3bea5e03b54f6cef4cdf69b8a
Author: shuwenwei <[email protected]>
AuthorDate: Wed Feb 4 11:22:52 2026 +0800
fix
---
.../pipeconsensus/PipeConsensusReceiver.java | 10 ++-
.../operator/source/ShowDiskUsageOperator.java | 3 +-
.../scheduler/load/LoadTsFileDispatcherImpl.java | 19 +++---
.../db/storageengine/dataregion/DataRegion.java | 8 +++
.../dataregion/utils/DiskUsageStatisticUtil.java | 19 ++++--
.../utils/TableDiskUsageStatisticUtil.java | 75 +++++++++++++++++++---
.../utils/TreeDiskUsageStatisticUtil.java | 14 +++-
.../tableDiskUsageCache/TableDiskUsageCache.java | 10 +--
8 files changed, 124 insertions(+), 34 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 720e5e9783a..ededb9d580c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
@@ -657,7 +658,14 @@ public class PipeConsensusReceiver {
StorageEngine.getInstance().getDataRegion(((DataRegionId)
consensusGroupId));
if (region != null) {
TsFileResource resource = generateTsFileResource(filePath,
progressIndex);
- region.loadNewTsFile(resource, true, false, true, Optional.empty());
+ region.loadNewTsFile(
+ resource,
+ true,
+ false,
+ true,
+ region.isTableModel()
+ ? TableDiskUsageStatisticUtil.calculateTableSizeMap(resource)
+ : Optional.empty());
} else {
// Data region is null indicates that dr has been removed or migrated.
In those cases, there
// is no need to replicate data. we just return success to avoid leader
keeping retry
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java
index 4b82557f06c..91f77f8018b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowDiskUsageOperator.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.queryengine.execution.operator.source;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
@@ -73,7 +72,7 @@ public class ShowDiskUsageOperator implements SourceOperator {
Optional.of(
dataRegion -> {
String databaseName = dataRegion.getDatabaseName();
- return !PathUtils.isTableModelDatabase(databaseName)
+ return !dataRegion.isTableModel()
&& pathPattern.matchPrefixPath(new
PartialPath(databaseName));
}),
Optional.of(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 5309bbfd8ef..87fc8f2e5ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -44,7 +44,9 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IFragInstanceDispatcher;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
@@ -183,14 +185,15 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
cloneTsFileResource = tsFileResource.shallowClone();
}
- StorageEngine.getInstance()
- .getDataRegion((DataRegionId) groupId)
- .loadNewTsFile(
- cloneTsFileResource,
- ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
- isGeneratedByPipe,
- false,
- Optional.empty());
+ DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion((DataRegionId) groupId);
+ dataRegion.loadNewTsFile(
+ cloneTsFileResource,
+ ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
+ isGeneratedByPipe,
+ false,
+ dataRegion.isTableModel()
+ ?
TableDiskUsageStatisticUtil.calculateTableSizeMap(cloneTsFileResource)
+ : Optional.empty());
} catch (LoadFileException e) {
LOGGER.warn("Load TsFile Node {} error.", planNode, e);
TSStatus resultStatus = new TSStatus();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 89910e27499..aef13376c9c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -379,6 +379,8 @@ public class DataRegion implements IDataRegionForQuery {
private ILoadDiskSelector ordinaryLoadDiskSelector;
private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
+ private boolean isTableModel;
+
/**
* Construct a database processor.
*
@@ -397,6 +399,7 @@ public class DataRegion implements IDataRegionForQuery {
this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString));
this.databaseName = databaseName;
this.fileFlushPolicy = fileFlushPolicy;
+ this.isTableModel = isTableModelDatabase(databaseName);
acquireDirectBufferMemory();
dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir,
dataRegionIdString);
@@ -458,6 +461,7 @@ public class DataRegion implements IDataRegionForQuery {
@TestOnly
public DataRegion(String databaseName, String dataRegionIdString) {
this.databaseName = databaseName;
+ this.isTableModel = isTableModelDatabase(databaseName);
this.dataRegionIdString = dataRegionIdString;
this.dataRegionId = new
DataRegionId(Integer.parseInt(this.dataRegionIdString));
this.tsFileManager = new TsFileManager(databaseName, dataRegionIdString,
"");
@@ -501,6 +505,10 @@ public class DataRegion implements IDataRegionForQuery {
return databaseName;
}
+ public boolean isTableModel() {
+ return isTableModel;
+ }
+
public boolean isReady() {
return isReady;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
index 4f8073941cb..a913e417c85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
@@ -154,17 +154,20 @@ public abstract class DiskUsageStatisticUtil implements
Closeable {
TsFileResource tsFileResource, TsFileSequenceReader reader)
throws IOException, IllegalPathException;
- protected Offsets calculateStartOffsetOfChunkGroupAndTimeseriesMetadata(
+ protected static Offsets
calculateStartOffsetOfChunkGroupAndTimeseriesMetadata(
TsFileSequenceReader reader,
MetadataIndexNode firstMeasurementNodeOfCurrentDevice,
Pair<IDeviceID, Boolean> deviceIsAlignedPair,
- long rootMeasurementNodeStartOffset)
+ long rootMeasurementNodeStartOffset,
+ LongConsumer timeSeriesMetadataCountRecorder,
+ LongConsumer timeSeriesMetadataIoSizeRecorder)
throws IOException {
int chunkGroupHeaderSize =
new
ChunkGroupHeader(deviceIsAlignedPair.getLeft()).getSerializedSize();
if (deviceIsAlignedPair.getRight()) {
Pair<Long, Long> timeseriesMetadataOffsetPair =
- getTimeColumnMetadataOffset(reader,
firstMeasurementNodeOfCurrentDevice);
+ getTimeColumnMetadataOffset(
+ reader, firstMeasurementNodeOfCurrentDevice,
timeSeriesMetadataIoSizeRecorder);
IChunkMetadata firstChunkMetadata =
reader
.getChunkMetadataListByTimeseriesMetadataOffset(
@@ -208,8 +211,11 @@ public abstract class DiskUsageStatisticUtil implements
Closeable {
}
}
- private Pair<Long, Long> getTimeColumnMetadataOffset(
- TsFileSequenceReader reader, MetadataIndexNode measurementNode) throws
IOException {
+ private static Pair<Long, Long> getTimeColumnMetadataOffset(
+ TsFileSequenceReader reader,
+ MetadataIndexNode measurementNode,
+ LongConsumer timeSeriesMetadataIoSizeRecorder)
+ throws IOException {
if (measurementNode.isDeviceLevel()) {
throw new IllegalArgumentException("device level metadata index node is
not supported");
}
@@ -223,7 +229,8 @@ public abstract class DiskUsageStatisticUtil implements
Closeable {
MetadataIndexNode metadataIndexNode =
reader.readMetadataIndexNode(
startOffset, endOffset, false, timeSeriesMetadataIoSizeRecorder);
- return getTimeColumnMetadataOffset(reader, metadataIndexNode);
+ return getTimeColumnMetadataOffset(
+ reader, metadataIndexNode, timeSeriesMetadataIoSizeRecorder);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java
index d19bcf5752f..7357cf86bfc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageStatisticUtil.java
@@ -34,6 +34,8 @@ import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -41,6 +43,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.LongConsumer;
public class TableDiskUsageStatisticUtil extends DiskUsageStatisticUtil {
public static final long SHALLOW_SIZE =
@@ -124,6 +127,32 @@ public class TableDiskUsageStatisticUtil extends
DiskUsageStatisticUtil {
private void calculateDiskUsageInBytesByOffset(
TsFileResource resource, TsFileSequenceReader reader) throws IOException
{
+ Map<String, Long> tsFileTableSizeMap =
+ calculateTableSizeMap(
+ reader, timeSeriesMetadataCountRecorder,
timeSeriesMetadataIoSizeRecorder);
+ for (Map.Entry<String, Long> entry : tsFileTableSizeMap.entrySet()) {
+ tableSizeQueryContext.updateResult(entry.getKey(), entry.getValue(),
needAllData);
+ }
+ TableDiskUsageCache.getInstance().write(database, resource.getTsFileID(),
tsFileTableSizeMap);
+ }
+
+ public static Optional<Map<String, Long>>
calculateTableSizeMap(TsFileResource resource) {
+ if (!resource.getTsFile().exists()) {
+ return Optional.empty();
+ }
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(resource.getTsFilePath())) {
+ return Optional.of(calculateTableSizeMap(reader, null, null));
+ } catch (Exception e) {
+ logger.error("Failed to calculate tsfile table sizes", e);
+ return Optional.empty();
+ }
+ }
+
+ public static Map<String, Long> calculateTableSizeMap(
+ TsFileSequenceReader reader,
+ @Nullable LongConsumer timeSeriesMetadataCountRecorder,
+ @Nullable LongConsumer timeSeriesMetadataIoSizeRecorder)
+ throws IOException {
TsFileMetadata tsFileMetadata = reader.readFileMetadata();
Map<String, MetadataIndexNode> tableMetadataIndexNodeMap =
tsFileMetadata.getTableMetadataIndexNodeMap();
@@ -135,22 +164,36 @@ public class TableDiskUsageStatisticUtil extends
DiskUsageStatisticUtil {
currentTable = currentTable == null ? iterator.next() : currentTable;
nextTable = iterator.hasNext() ? iterator.next() : null;
long tableSize =
- calculateTableSize(tableOffsetMap, tsFileMetadata, reader,
currentTable, nextTable);
- tableSizeQueryContext.updateResult(currentTable, tableSize, needAllData);
+ calculateTableSize(
+ tableOffsetMap,
+ tsFileMetadata,
+ reader,
+ currentTable,
+ nextTable,
+ timeSeriesMetadataCountRecorder,
+ timeSeriesMetadataIoSizeRecorder);
tsFileTableSizeMap.put(currentTable, tableSize);
currentTable = nextTable;
}
- TableDiskUsageCache.getInstance().write(database, resource.getTsFileID(),
tsFileTableSizeMap);
+ return tsFileTableSizeMap;
}
- private long calculateTableSize(
+ private static long calculateTableSize(
Map<String, Offsets> tableOffsetMap,
TsFileMetadata tsFileMetadata,
TsFileSequenceReader reader,
String tableName,
- String nextTable)
+ String nextTable,
+ LongConsumer timeSeriesMetadataCountRecorder,
+ LongConsumer timeSeriesMetadataIoSizeRecorder)
throws IOException {
- Offsets startOffset = getTableOffset(tableOffsetMap, reader, tableName);
+ Offsets startOffset =
+ getTableOffset(
+ tableOffsetMap,
+ reader,
+ tableName,
+ timeSeriesMetadataCountRecorder,
+ timeSeriesMetadataIoSizeRecorder);
Offsets endOffset;
if (nextTable == null) {
long firstMeasurementNodeOffsetOfFirstTable;
@@ -173,13 +216,23 @@ public class TableDiskUsageStatisticUtil extends
DiskUsageStatisticUtil {
firstMeasurementNodeOffsetOfFirstTable,
reader.getFileMetadataPos());
} else {
- endOffset = getTableOffset(tableOffsetMap, reader, nextTable);
+ endOffset =
+ getTableOffset(
+ tableOffsetMap,
+ reader,
+ nextTable,
+ timeSeriesMetadataCountRecorder,
+ timeSeriesMetadataIoSizeRecorder);
}
return endOffset.minusOffsetForTableModel(startOffset);
}
- private Offsets getTableOffset(
- Map<String, Offsets> tableOffsetMap, TsFileSequenceReader reader, String
tableName) {
+ private static Offsets getTableOffset(
+ Map<String, Offsets> tableOffsetMap,
+ TsFileSequenceReader reader,
+ String tableName,
+ LongConsumer timeSeriesMetadataCountRecorder,
+ LongConsumer timeSeriesMetadataIoSizeRecorder) {
return tableOffsetMap.computeIfAbsent(
tableName,
k -> {
@@ -191,7 +244,9 @@ public class TableDiskUsageStatisticUtil extends
DiskUsageStatisticUtil {
reader,
deviceIterator.getFirstMeasurementNodeOfCurrentDevice(),
pair,
- deviceIterator.getCurrentDeviceMeasurementNodeOffset()[0]);
+ deviceIterator.getCurrentDeviceMeasurementNodeOffset()[0],
+ timeSeriesMetadataCountRecorder,
+ timeSeriesMetadataIoSizeRecorder);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java
index 7b01faf6eb3..ec4d8f03d86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TreeDiskUsageStatisticUtil.java
@@ -145,11 +145,21 @@ public class TreeDiskUsageStatisticUtil extends
DiskUsageStatisticUtil {
} else {
chunkGroupTimeseriesMetadataEndOffsetPair =
calculateStartOffsetOfChunkGroupAndTimeseriesMetadata(
- reader, nodeOfNextNotMatchedDevice, nextNotMatchedDevice, 0);
+ reader,
+ nodeOfNextNotMatchedDevice,
+ nextNotMatchedDevice,
+ 0,
+ timeSeriesMetadataCountRecorder,
+ timeSeriesMetadataIoSizeRecorder);
}
chunkGroupTimeseriesMetadataStartOffsetPair =
calculateStartOffsetOfChunkGroupAndTimeseriesMetadata(
- reader, nodeOfFirstMatchedDevice, firstMatchedDevice, 0);
+ reader,
+ nodeOfFirstMatchedDevice,
+ firstMatchedDevice,
+ 0,
+ timeSeriesMetadataCountRecorder,
+ timeSeriesMetadataIoSizeRecorder);
return chunkGroupTimeseriesMetadataEndOffsetPair.minusOffsetForTreeModel(
chunkGroupTimeseriesMetadataStartOffsetPair);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index fc4845e3883..ee2eeff544d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -21,9 +21,8 @@ package
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.rpc.thrift.TTableInfo;
+import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
@@ -37,7 +36,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.BlockingQueue;
@@ -47,7 +45,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TableDiskUsageCache {
- protected static Map<String, List<TTableInfo>> databaseTableInfoMap;
protected static final Logger LOGGER =
LoggerFactory.getLogger(TableDiskUsageCache.class);
protected final BlockingQueue<Operation> queue = new
LinkedBlockingQueue<>(1000);
// regionId -> writer mapping
@@ -119,6 +116,9 @@ public class TableDiskUsageCache {
protected void
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
protected void compactIfNecessary(long maxRunTime) {
+ if (!StorageEngine.getInstance().isReadyForReadAndWrite()) {
+ return;
+ }
long startTime = System.currentTimeMillis();
for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
if (System.currentTimeMillis() - startTime > maxRunTime) {
@@ -177,7 +177,7 @@ public class TableDiskUsageCache {
public void registerRegion(DataRegion region) {
RegisterRegionOperation operation = new RegisterRegionOperation(region);
- if (!PathUtils.isTableModelDatabase(region.getDatabaseName())) {
+ if (!region.isTableModel()) {
return;
}
addOperationToQueue(operation);