This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch object_ttl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 86b67bd637c44b9693952e87373f55cb85307dc1 Author: shuwenwei <[email protected]> AuthorDate: Thu Dec 18 18:41:03 2025 +0800 check object ttl --- .../db/service/metrics/CompactionMetrics.java | 14 +- .../db/storageengine/dataregion/DataRegion.java | 15 ++ .../execute/task/CrossSpaceCompactionTask.java | 6 - .../execute/task/InnerSpaceCompactionTask.java | 6 - .../execute/task/SettleCompactionTask.java | 1 - .../compaction/execute/utils/CompactionUtils.java | 258 ++++++++------------- .../execute/utils/MultiTsFileDeviceIterator.java | 10 - .../fast/FastAlignedSeriesCompactionExecutor.java | 23 -- .../compaction/schedule/TTLScheduleTask.java | 9 + .../apache/iotdb/commons/conf/CommonConfig.java | 3 +- 10 files changed, 136 insertions(+), 209 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java index 75372f0c387..8a499f9f96a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java @@ -703,6 +703,7 @@ public class CompactionMetrics implements IMetricSet { private Histogram settleCompactionTaskSelectionTimeCost = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram ttlCheckForObjectFileTimeCost = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram seqInnerSpaceCompactionTaskSelectedFileNum = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; @@ -766,6 +767,10 @@ public class CompactionMetrics implements IMetricSet { } } + public void updateTTLCheckForObjectFileCost(long time) { + ttlCheckForObjectFileTimeCost.update(time); + } + public void updateCompactionTaskSelectedFileNum( CompactionTaskType taskType, int selectedFileNum) { switch (taskType) { @@ -869,6 +874,12 @@ public class CompactionMetrics implements IMetricSet { MetricLevel.IMPORTANT, Tag.NAME.toString(), "settle"); + ttlCheckForObjectFileTimeCost = + metricService.getOrCreateHistogram( + Metric.COMPACTION_TASK_SELECTION_COST.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "settle-object"); seqInnerSpaceCompactionTaskSelectedFileNum = metricService.getOrCreateHistogram( Metric.COMPACTION_TASK_SELECTED_FILE.toString(), @@ -935,7 +946,8 @@ public class CompactionMetrics implements IMetricSet { } private void unbindCompactionTaskSelection(AbstractMetricService metricService) { - for (String taskType : Arrays.asList("seq", "unseq", "cross", "insertion", "settle")) { + for (String taskType : + Arrays.asList("seq", "unseq", "cross", "insertion", "settle", "settle-object")) { metricService.remove( MetricType.GAUGE, Metric.COMPACTION_TASK_SELECTION.toString(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 09c9ee84304..82f5b7a0a71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -103,6 +103,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.RepairUnsortedFileCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler; @@ -3428,6 +3429,20 @@ public class DataRegion implements IDataRegionForQuery { return trySubmitCount; } + public void executeTTLCheckForObjectFiles() throws InterruptedException { + long startTime = System.currentTimeMillis(); + List<String> allObjectDirs = TierManager.getInstance().getAllObjectFileFolders(); + for (String objectDir : allObjectDirs) { + File regionObjectDir = new File(objectDir, dataRegionIdString); + if (!regionObjectDir.isDirectory()) { + continue; + } + CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, databaseName); + } + CompactionMetrics.getInstance() + .updateTTLCheckForObjectFileCost(System.currentTimeMillis() - startTime); + } + private boolean skipCurrentTTLAndModificationCheck() { if (this.databaseName.equals(InformationSchema.INFORMATION_DATABASE)) { return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index 290466ffeb6..41d859924f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; @@ -193,11 +192,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { performer.setTargetFiles(targetTsfileResourceList); performer.setSummary(summary); performer.perform(); - if (performer instanceof ReadPointCompactionPerformer) { - for (TsFileResource resource : getAllSourceTsFiles()) { - CompactionUtils.removeDeletedObjectFiles(resource); - } - } CompactionUtils.updateProgressIndexAndMark( targetTsfileResourceList, selectedSequenceFiles, selectedUnsequenceFiles); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 034e4eb48b8..c61d2275ac1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IInnerCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; @@ -390,11 +389,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { performer.setTargetFiles(filesView.targetFilesInPerformer); performer.setSummary(summary); performer.perform(); - if (performer instanceof ReadPointCompactionPerformer) { - for (TsFileResource resource : filesView.sourceFilesInCompactionPerformer) { - CompactionUtils.removeDeletedObjectFiles(resource); - } - } prepareTargetFiles(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java index 7543d56ce80..08cc494232b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java @@ -219,7 +219,6 @@ public class SettleCompactionTask extends InnerSpaceCompactionTask { if (recoverMemoryStatus) { tsFileManager.remove(resource, resource.isSeq()); } - CompactionUtils.removeDeletedObjectFiles(resource); boolean res = deleteTsFileOnDisk(resource); if (res) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index bac48729167..3fff8bd2332 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -25,14 +25,16 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionAlignedChunkReader; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; @@ -44,33 +46,27 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEnt import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; import org.apache.iotdb.db.utils.ModificationUtils; -import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.SystemMetric; +import com.google.common.io.BaseEncoding; import org.apache.tsfile.common.constant.TsFileConstant; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.header.PageHeader; -import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; -import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.fileSystem.FSFactoryProducer; -import org.apache.tsfile.read.TimeValuePair; -import org.apache.tsfile.read.TsFileSequenceReader; -import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.TimeRange; -import org.apache.tsfile.read.reader.IPointReader; -import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -512,173 +508,113 @@ public class CompactionUtils { } } - public static void removeDeletedObjectFiles(TsFileResource resource) { - // check for compaction recovery - if (!resource.tsFileExists()) { + public static void executeTTLCheckObjectFilesForTableModel( + File regionObjectDir, String databaseName) { + File[] tableDirs = regionObjectDir.listFiles(); + if (tableDirs == null) { return; } - try (MultiTsFileDeviceIterator deviceIterator = - new MultiTsFileDeviceIterator(Collections.singletonList(resource))) { - while (deviceIterator.hasNextDevice()) { - deviceIterator.nextDevice(); - deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries(); + boolean restrictObjectLimit = + IoTDBDescriptor.getInstance().getConfig().getRestrictObjectLimit(); + for (File tableDir : tableDirs) { + if (!tableDir.isDirectory()) { + continue; } - } catch (Exception e) { - logger.warn("Failed to remove object files from file {}", resource.getTsFilePath(), e); - } - } - - @SuppressWarnings("java:S3776") - public static void removeDeletedObjectFiles( - TsFileSequenceReader reader, - List<AbstractAlignedChunkMetadata> alignedChunkMetadataList, - List<ModEntry> timeMods, - List<List<ModEntry>> valueMods, - int currentRegionId) - throws IOException { - if (alignedChunkMetadataList.isEmpty()) { - return; - } - List<Integer> objectColumnIndexList = new ArrayList<>(); - List<List<ModEntry>> objectDeletionIntervalList = new ArrayList<>(); - boolean objectColumnHasDeletion = false; - - TSDataType[] dataTypes = new TSDataType[valueMods.size()]; - for (AbstractAlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { - boolean hasNull = false; - for (int i = 0; i < alignedChunkMetadata.getValueChunkMetadataList().size(); i++) { - if (dataTypes[i] != null) { + String tableName = tableDir.getName(); + if (!restrictObjectLimit) { + try { + tableName = + new String( + BaseEncoding.base32().omitPadding().decode(tableName), StandardCharsets.UTF_8); + } catch (IllegalArgumentException ignored) { continue; } - IChunkMetadata chunkMetadata = alignedChunkMetadata.getValueChunkMetadataList().get(i); - if (chunkMetadata == null) { - hasNull = true; - continue; - } - dataTypes[i] = chunkMetadata.getDataType(); - if (dataTypes[i] == TSDataType.OBJECT) { - objectColumnIndexList.add(i); - List<ModEntry> deletionInterval = ModificationUtils.sortAndMerge(valueMods.get(i)); - objectColumnHasDeletion |= (!deletionInterval.isEmpty() || !timeMods.isEmpty()); - objectDeletionIntervalList.add(deletionInterval); - } - } - if (!hasNull) { - break; } - } - if (!objectColumnHasDeletion) { - return; - } - int[] deletionCursors = new int[objectColumnIndexList.size() + 1]; - List<ModEntry> timeDeletionIntervalList = ModificationUtils.sortAndMerge(timeMods); - for (AbstractAlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { - CompactionUtils.removeDeletedObjectFiles( - reader, - alignedChunkMetadata, - objectColumnIndexList, - timeDeletionIntervalList, - objectDeletionIntervalList, - deletionCursors, - currentRegionId); - } - } - - @SuppressWarnings("java:S3776") - private static void removeDeletedObjectFiles( - TsFileSequenceReader reader, - AbstractAlignedChunkMetadata alignedChunkMetadata, - List<Integer> objectColumnIndexList, - List<ModEntry> timeDeletions, - List<List<ModEntry>> objectDeletions, - int[] deletionCursors, - int currentRegionId) - throws IOException { - Chunk timeChunk = - reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); - CompactionChunkReader compactionChunkReader = new CompactionChunkReader(timeChunk); - List<Pair<PageHeader, ByteBuffer>> timePages = - compactionChunkReader.readPageDataWithoutUncompressing(); - - List<Chunk> valueChunks = new ArrayList<>(); - List<List<Pair<PageHeader, ByteBuffer>>> valuePages = new ArrayList<>(); - - for (int i = 0; i < objectColumnIndexList.size(); i++) { - int idxInAlignedChunkMetadata = objectColumnIndexList.get(i); - if (timeDeletions.isEmpty() && objectDeletions.get(i).isEmpty()) { + TsTable tsTable = DataNodeTableCache.getInstance().getTable(databaseName, tableName); + if (tsTable == null) { continue; } - ChunkMetadata valueChunkMetadata = - (ChunkMetadata) - alignedChunkMetadata.getValueChunkMetadataList().get(idxInAlignedChunkMetadata); - if (valueChunkMetadata == null) { + long ttlInMS = CommonDateTimeUtils.convertIoTDBTimeToMillis(tsTable.getCachedTableTTL()); + if (ttlInMS == Long.MAX_VALUE) { continue; } - Chunk chunk = reader.readMemChunk(valueChunkMetadata); - if (chunk != null) { - chunk - .getHeader() - .setReplaceDecoder( - decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, currentRegionId)); - } - valueChunks.add(chunk); - valuePages.add( - chunk == null - ? null - : new CompactionChunkReader(chunk).readPageDataWithoutUncompressing()); - } - - CompactionAlignedChunkReader alignedChunkReader = - new CompactionAlignedChunkReader(timeChunk, valueChunks, true); - for (int i = 0; i < timePages.size(); i++) { - Pair<PageHeader, ByteBuffer> timePage = timePages.get(i); - List<PageHeader> valuePageHeaders = new ArrayList<>(valuePages.size()); - List<ByteBuffer> compressedValuePages = new ArrayList<>(valuePages.size()); - for (int j = 0; j < valuePages.size(); j++) { - Pair<PageHeader, ByteBuffer> valuePage = valuePages.get(j).get(i); - valuePageHeaders.add(valuePage.getLeft()); - compressedValuePages.add(valuePage.getRight()); - } - IPointReader pagePointReader = - alignedChunkReader.getPagePointReader( - timePage.getLeft(), valuePageHeaders, timePage.getRight(), compressedValuePages); - - while (pagePointReader.hasNextTimeValuePair()) { - TimeValuePair timeValuePair = pagePointReader.nextTimeValuePair(); - removeDeletedObjectFiles(timeValuePair, deletionCursors, timeDeletions, objectDeletions); + // buffer 60s to avoid concurrent issues with querying + final long timeLowerBoundInMS = CommonDateTimeUtils.currentTime() - ttlInMS + 60 * 1000; + try { + recursiveTTLCheckForTableDir( + tableDir, 0, tsTable.getTagNum() + 1, !restrictObjectLimit, timeLowerBoundInMS); + } catch (Exception e) { + logger.warn( + "Meet exception when checking for object files for table {}.{} in region {}", + databaseName, + tableName, + regionObjectDir.getName(), + e); } } } - private static void removeDeletedObjectFiles( - TimeValuePair timeValuePair, - int[] cursors, - List<ModEntry> timeDeletions, - List<List<ModEntry>> objectDeletions) { - long timestamp = timeValuePair.getTimestamp(); - boolean timeDeleted = isDeleted(timestamp, timeDeletions, cursors, 0); - for (int i = 0; i < timeValuePair.getValues().length; i++) { - Binary value = (Binary) timeValuePair.getValues()[i]; - if (value == null) { - continue; + private static void recursiveTTLCheckForTableDir( + File currentFile, + int depth, + int maxObjectFileDepth, + boolean canDistinguishDirectoryByFileName, + long lowerBoundInMS) { + canDistinguishDirectoryByFileName |= depth > maxObjectFileDepth; + String fileName = currentFile.getName(); + boolean maybeObjectFile = fileName.endsWith(".bin"); + if (maybeObjectFile) { + if (canDistinguishDirectoryByFileName) { + checkTTLAndDeleteExpiredObjectFile(currentFile, null, lowerBoundInMS); + return; } - if (timeDeleted || isDeleted(timestamp, objectDeletions.get(i), cursors, i + 1)) { - ObjectTypeUtils.deleteObjectPathFromBinary(value); + try { + BasicFileAttributes basicFileAttributes = + Files.readAttributes(currentFile.toPath(), BasicFileAttributes.class); + if (!basicFileAttributes.isDirectory()) { + checkTTLAndDeleteExpiredObjectFile(currentFile, basicFileAttributes, lowerBoundInMS); + return; + } + } catch (IOException ignored) { } } + File[] children = currentFile.listFiles(); + if (children == null) { + return; + } + for (File child : children) { + recursiveTTLCheckForTableDir( + child, depth + 1, maxObjectFileDepth, canDistinguishDirectoryByFileName, lowerBoundInMS); + } } - private static boolean isDeleted( - long timestamp, List<ModEntry> deleteIntervalList, int[] deleteCursors, int idx) { - while (deleteIntervalList != null && deleteCursors[idx] < deleteIntervalList.size()) { - if (deleteIntervalList.get(deleteCursors[idx]).getTimeRange().contains(timestamp)) { - return true; - } else if (deleteIntervalList.get(deleteCursors[idx]).getTimeRange().getMax() < timestamp) { - deleteCursors[idx]++; - } else { - return false; + private static void checkTTLAndDeleteExpiredObjectFile( + File file, @Nullable BasicFileAttributes attributes, long timeLowerBoundInMS) { + String fileName = file.getName(); + long fileTimestampInMS; + try { + fileTimestampInMS = Long.parseLong(fileName.substring(0, fileName.length() - 4)); + } catch (NumberFormatException ignored) { + return; + } + + if (fileTimestampInMS >= timeLowerBoundInMS) { + return; + } + + try { + attributes = + attributes == null + ? Files.readAttributes(file.toPath(), BasicFileAttributes.class) + : attributes; + if (attributes.isDirectory()) { + return; } + Files.delete(file.toPath()); + FileMetrics.getInstance().decreaseObjectFileNum(1); + FileMetrics.getInstance().decreaseObjectFileSize(attributes.size()); + logger.info("Remove object file {}, size is {}(byte)", file.getPath(), attributes.size()); + } catch (Exception ignored) { } - return false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index c07f5a1ad40..b9b7f114e3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -483,16 +483,6 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { modificationList.isEmpty() ? Collections.emptyList() : modificationList); } - if (ttlDeletion != null) { - List<ModEntry> emptyList = Collections.emptyList(); - CompactionUtils.removeDeletedObjectFiles( - readerMap.get(tsFileResource), - alignedChunkMetadataList, - Collections.singletonList(ttlDeletion), - modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList()), - tsFileResource.getTsFileID().regionId); - } - ModificationUtils.modifyAlignedChunkMetaData( alignedChunkMetadataList, modificationForTimeColumn, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java index 7a8e884a491..c2d3524697d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java @@ -22,11 +22,8 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.ex import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PatternTreeMap; -import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.db.exception.WriteProcessException; -import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement; @@ -62,11 +59,9 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { @@ -270,24 +265,6 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto } }); - long ttlForTable = - deviceId.isTableModel() - ? DataNodeTTLCache.getInstance() - .getTTLForTable(resource.getDatabaseName(), deviceId.getTableName()) - : Long.MAX_VALUE; - if (ttlForTable != Long.MAX_VALUE) { - ModEntry ttlDeletion = - CompactionUtils.convertTtlToDeletion( - deviceId, CommonDateTimeUtils.currentTime() - ttlForTable); - List<ModEntry> emptyList = Collections.emptyList(); - CompactionUtils.removeDeletedObjectFiles( - readerCacheMap.get(resource), - alignedChunkMetadataList, - Collections.singletonList(ttlDeletion), - valueModifications.stream().map(v -> emptyList).collect(Collectors.toList()), - resource.getTsFileID().regionId); - } - // modify aligned chunk metadatas ModificationUtils.modifyAlignedChunkMetaData( alignedChunkMetadataList, timeModifications, valueModifications, ignoreAllNullRows); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java index 393a9f6d2dc..cb74a582833 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java @@ -65,6 +65,15 @@ public class TTLScheduleTask implements Callable<Void> { dataRegionListSnapshot.get(i).executeTTLCheck(); } } + // check for object files + for (int i = 0; i < dataRegionListSnapshot.size(); i++) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + if (i % workerNum == workerId) { + dataRegionListSnapshot.get(i).executeTTLCheckForObjectFiles(); + } + } } catch (InterruptedException ignored) { logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId); return null; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 62bc89ec5ae..5057bad83f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -142,7 +142,8 @@ public class CommonConfig { private int ttlRuleCapacity = 1000; /** The interval of ttl check task in each database. The unit is ms. Default is 2 hours. */ - private long ttlCheckInterval = 7_200_000L; + // private long ttlCheckInterval = 7_200_000L; + private long ttlCheckInterval = 10_000L; /** Thrift socket and connection timeout between data node and config node. */ private int cnConnectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(60);
