This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 991a1b6e8421ff68fac0cefa878369c0861d7196 Author: shuwenwei <[email protected]> AuthorDate: Mon Jan 19 18:53:14 2026 +0800 record table size for when generating tsfile --- .../pipeconsensus/PipeConsensusReceiver.java | 2 +- .../scheduler/load/LoadTsFileDispatcherImpl.java | 4 +++- .../db/storageengine/dataregion/DataRegion.java | 8 ++++++- .../performer/impl/FastCompactionPerformer.java | 1 + .../impl/ReadChunkCompactionPerformer.java | 2 ++ .../impl/ReadPointCompactionPerformer.java | 1 + .../execute/task/CompactionTaskSummary.java | 15 +++++++++++++ .../execute/task/CrossSpaceCompactionTask.java | 19 ++++++++++++++++ .../execute/task/InnerSpaceCompactionTask.java | 26 +++++++++++++++++++++- .../task/InsertionCrossSpaceCompactionTask.java | 4 ++++ .../utils/writer/AbstractCompactionWriter.java | 7 ++++++ .../writer/AbstractCrossCompactionWriter.java | 3 +++ .../writer/AbstractInnerCompactionWriter.java | 4 ++++ .../dataregion/memtable/TsFileProcessor.java | 6 +++++ .../tableDiskUsageCache/TableDiskUsageCache.java | 3 +++ .../TableDiskUsageCacheReader.java | 3 ++- .../db/storageengine/load/LoadTsFileManager.java | 7 +++++- .../DataNodeInternalRPCServiceImplTest.java | 3 ++- 18 files changed, 111 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index b250046d5c0..720e5e9783a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -657,7 +657,7 @@ public class PipeConsensusReceiver { StorageEngine.getInstance().getDataRegion(((DataRegionId) consensusGroupId)); if (region != null) { TsFileResource resource = generateTsFileResource(filePath, progressIndex); - region.loadNewTsFile(resource, true, false, true); + region.loadNewTsFile(resource, true, false, true, Optional.empty()); } else { // Data region is null indicates that dr has been removed or migrated. In those cases, there // is no need to replicate data. we just return success to avoid leader keeping retry diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java index 7301ce0f406..5309bbfd8ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -62,6 +62,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -188,7 +189,8 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { cloneTsFileResource, ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(), isGeneratedByPipe, - false); + false, + Optional.empty()); } catch (LoadFileException e) { LOGGER.warn("Load TsFile Node {} error.", planNode, e); TSStatus resultStatus = new TSStatus(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 1dc22480787..45faf95e501 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3885,7 +3885,8 @@ public class DataRegion implements IDataRegionForQuery { final TsFileResource newTsFileResource, final boolean deleteOriginFile, final boolean isGeneratedByPipe, - final boolean isFromConsensus) + final boolean isFromConsensus, + final Optional<Map<String, Long>> tableSizeMap) throws LoadFileException { if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensus) { final IoTConsensusServerImpl impl = @@ -3955,6 +3956,11 @@ public class DataRegion implements IDataRegionForQuery { deleteOriginFile, isGeneratedByPipe); + tableSizeMap.ifPresent( + stringLongMap -> + TableDiskUsageCache.getInstance() + .write(databaseName, newTsFileResource.getTsFileID(), stringLongMap)); + FileMetrics.getInstance() .addTsFile( newTsFileResource.getDatabaseName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index 54b21ddd382..b6791830e64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -165,6 +165,7 @@ public class FastCompactionPerformer ? new FastCrossCompactionWriter( targetFiles, seqFiles, readerCacheMap, encryptParameter) : new FastInnerCompactionWriter(targetFiles, encryptParameter)) { + compactionWriter.setCompactionTaskSummary(subTaskSummary); List<Schema> schemas = CompactionTableSchemaCollector.collectSchema( seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index d406286e37f..3bccceeddfb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -210,6 +210,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { memoryBudgetForFileWriter, CompactionType.INNER_SEQ_COMPACTION, firstEncryptParameter); + summary.recordTargetTsFileTableSizeMap( + targetResources.get(currentTargetFileIndex), currentWriter.getTableSizeMap()); currentWriter.setSchema(CompactionTableSchemaCollector.copySchema(schema)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java index c58870357d9..d4c075da529 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java @@ -150,6 +150,7 @@ public class ReadPointCompactionPerformer summary.setTemporalFileNum(targetFiles.size()); try (AbstractCompactionWriter compactionWriter = getCompactionWriter(seqFiles, unseqFiles, targetFiles)) { + compactionWriter.setCompactionTaskSummary(summary); // Do not close device iterator, because tsfile reader is managed by FileReaderManager. MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles, unseqFiles); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CompactionTaskSummary.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CompactionTaskSummary.java index b9814d86701..d30f37c9c95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CompactionTaskSummary.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CompactionTaskSummary.java @@ -19,8 +19,13 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + import java.text.SimpleDateFormat; import java.util.Date; +import java.util.HashMap; +import java.util.Map; /** The summary of one {@link AbstractCompactionTask} execution. */ public class CompactionTaskSummary { @@ -37,6 +42,7 @@ public class CompactionTaskSummary { protected long rewritePointNum = 0; protected long temporalFileSize = 0; protected int temporalFileNum = 0; + protected Map<TsFileID, Map<String, Long>> targetFileTableSizeMap = new HashMap<>(); public void start() { this.status = Status.STARTED; @@ -189,6 +195,15 @@ public class CompactionTaskSummary { return temporalFileNum; } + public void recordTargetTsFileTableSizeMap( + TsFileResource resource, Map<String, Long> tableSizeMap) { + this.targetFileTableSizeMap.put(resource.getTsFileID(), tableSizeMap); + } + + public Map<String, Long> getTableSizeMapOfTargetResource(TsFileID targetTsFileId) { + return targetFileTableSizeMap.get(targetTsFileId); + } + @Override public String toString() { String startTimeInStr = new SimpleDateFormat().format(new Date(startTime)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index 41d859924f0..a7a3c0b6b90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; @@ -38,6 +39,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache; import org.apache.tsfile.utils.TsFileUtils; import org.slf4j.Logger; @@ -236,6 +238,8 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { } } + updateTableSizeCache(); + CompactionMetrics.getInstance().recordSummaryInfo(summary); double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; @@ -273,6 +277,21 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { return isSuccess; } + protected void updateTableSizeCache() { + if (!PathUtils.isTableModelDatabase(this.storageGroupName)) { + return; + } + for (TsFileResource resource : targetTsfileResourceList) { + if (!resource.isDeleted()) { + TableDiskUsageCache.getInstance() + .write( + storageGroupName, + resource.getTsFileID(), + summary.getTableSizeMapOfTargetResource(resource.getTsFileID())); + } + } + } + public void recover() { try { if (needRecoverTaskInfoFromLogFile) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index c61d2275ac1..a4b4cf49721 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; @@ -43,6 +44,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.exception.StopReadTsFileByInterruptException; @@ -432,7 +434,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics( filesView.sourceFilesInLog, filesView.sequence); - + updateTableSizeCache(); CompactionMetrics.getInstance().recordSummaryInfo(summary); } @@ -458,6 +460,28 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { filesView.sourceFilesInCompactionPerformer, filesView.targetFilesInPerformer); } + protected void updateTableSizeCache() { + if (!PathUtils.isTableModelDatabase(this.storageGroupName)) { + return; + } + for (int i = 0; i < filesView.renamedTargetFiles.size(); i++) { + TableDiskUsageCache.getInstance() + .write( + this.storageGroupName, + filesView.skippedSourceFiles.get(i).getTsFileID(), + filesView.renamedTargetFiles.get(i).getTsFileID()); + } + for (TsFileResource resource : filesView.targetFilesInPerformer) { + if (!resource.isDeleted()) { + TableDiskUsageCache.getInstance() + .write( + this.storageGroupName, + resource.getTsFileID(), + summary.getTableSizeMapOfTargetResource(resource.getTsFileID())); + } + } + } + public void recover() { try { if (needRecoverTaskInfoFromLogFile) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java index c2aee2bde90..d89c722f6f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache; import java.io.File; import java.io.IOException; @@ -166,6 +167,9 @@ public class InsertionCrossSpaceCompactionTask extends AbstractCompactionTask { lockWrite(Collections.singletonList(unseqFileToInsert)); CompactionUtils.deleteTsFileResourceWithoutLock(unseqFileToInsert); + TableDiskUsageCache.getInstance() + .write(storageGroupName, unseqFileToInsert.getTsFileID(), targetFile.getTsFileID()); + double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; LOGGER.info( "{}-{} [Compaction] InsertionCrossSpaceCompaction task finishes successfully, " diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java index f3cd5185b58..96ab986552d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wr import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController; @@ -105,6 +106,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { private EncryptParameter encryptParameter; + protected CompactionTaskSummary compactionTaskSummary; + public abstract void startChunkGroup(IDeviceID deviceId, boolean isAlign) throws IOException; public abstract void endChunkGroup() throws IOException; @@ -340,4 +343,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } public abstract void setSchemaForAllTargetFile(List<Schema> schemas); + + public void setCompactionTaskSummary(CompactionTaskSummary compactionTaskSummary) { + this.compactionTaskSummary = compactionTaskSummary; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index f970ad65e56..61cec47d39a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -171,6 +171,9 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr // set empty target file to DELETED if (isEmptyFile[i]) { targetResources.get(i).forceMarkDeleted(); + } else if (compactionTaskSummary != null) { + compactionTaskSummary.recordTargetTsFileTableSizeMap( + targetResources.get(i), targetFileWriters.get(i).getTableSizeMap()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index 6573bb7e96e..0ace4d5cb97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -121,6 +121,10 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr ? CompactionType.INNER_SEQ_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, encryptParameter); + if (compactionTaskSummary != null) { + compactionTaskSummary.recordTargetTsFileTableSizeMap( + targetResources.get(currentFileIndex), fileWriter.getTableSizeMap()); + } fileWriter.setSchema(CompactionTableSchemaCollector.copySchema(schemas.get(0))); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index acdac61180b..91511ad745f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -66,6 +66,7 @@ import org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl.UnclosedF import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener; @@ -1753,6 +1754,11 @@ public class TsFileProcessor { tsFileResource.serialize(); FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource); + TableDiskUsageCache.getInstance() + .write( + tsFileResource.getDatabaseName(), + tsFileResource.getTsFileID(), + writer.getTableSizeMap()); if (logger.isDebugEnabled()) { logger.debug("Ended file {}", tsFileResource); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java index 225a4629fde..5b216876f07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java @@ -89,6 +89,9 @@ public class TableDiskUsageCache { } public void write(String database, TsFileID tsFileID, Map<String, Long> tableSizeMap) { + if (tableSizeMap == null || tableSizeMap.isEmpty()) { + return; + } queue.add(new WriteOperation(database, tsFileID, tableSizeMap)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java index a7f910583ca..ad801dedaf9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java @@ -77,6 +77,7 @@ public class TableDiskUsageCacheReader implements Closeable { if (future.isDone()) { this.cacheFileReader = future.get(); this.cacheFileReader.openKeyFile(); + break; } else { Thread.sleep(1); } @@ -84,7 +85,7 @@ public class TableDiskUsageCacheReader implements Closeable { Thread.currentThread().interrupt(); return false; } - } while (!future.isDone() && System.nanoTime() - startTime < maxRunTime); + } while (System.nanoTime() - startTime < maxRunTime); } if (this.cacheFileReader == null) { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 082106bb5fd..e0464acde23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -588,7 +588,12 @@ public class LoadTsFileManager { tsFileResource, timePartitionProgressIndexMap.getOrDefault( entry.getKey().getTimePartitionSlot(), MinimumProgressIndex.INSTANCE)); - dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe, false); + dataRegion.loadNewTsFile( + tsFileResource, + true, + isGeneratedByPipe, + false, + Optional.ofNullable(writer.getTableSizeMap())); // Metrics dataRegion diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java index 622c2c4ebbf..22a2d6cdc46 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java @@ -78,6 +78,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; public class DataNodeInternalRPCServiceImplTest { @@ -209,7 +210,7 @@ public class DataNodeInternalRPCServiceImplTest { ((IoTConsensus) DataRegionConsensusImpl.getInstance()) .getImpl(new DataRegionId(1)) .setActive(false); - dataRegion.loadNewTsFile(new TsFileResource(), false, false, false); + dataRegion.loadNewTsFile(new TsFileResource(), false, false, false, Optional.empty()); ((IoTConsensus) DataRegionConsensusImpl.getInstance()) .getImpl(new DataRegionId(1)) .setActive(true);
