This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e9c614eb3ae Add independent ttl check for object file (#16929)
e9c614eb3ae is described below
commit e9c614eb3ae3d13f9a4b71c56db5fd328133069c
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 23 15:23:29 2025 +0800
Add independent ttl check for object file (#16929)
---
.../db/service/metrics/CompactionMetrics.java | 14 +-
.../db/storageengine/dataregion/DataRegion.java | 15 ++
.../execute/task/CrossSpaceCompactionTask.java | 6 -
.../execute/task/InnerSpaceCompactionTask.java | 6 -
.../execute/task/SettleCompactionTask.java | 1 -
.../compaction/execute/utils/CompactionUtils.java | 274 +++++++++------------
.../execute/utils/MultiTsFileDeviceIterator.java | 10 -
.../fast/FastAlignedSeriesCompactionExecutor.java | 23 --
.../compaction/schedule/TTLScheduleTask.java | 12 +
.../object/ObjectTypeCompactionTest.java | 161 +++++++++++-
10 files changed, 307 insertions(+), 215 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
index 75372f0c387..8a499f9f96a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java
@@ -703,6 +703,7 @@ public class CompactionMetrics implements IMetricSet {
private Histogram settleCompactionTaskSelectionTimeCost =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram ttlCheckForObjectFileTimeCost =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram seqInnerSpaceCompactionTaskSelectedFileNum =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
@@ -766,6 +767,10 @@ public class CompactionMetrics implements IMetricSet {
}
}
+ public void updateTTLCheckForObjectFileCost(long time) {
+ ttlCheckForObjectFileTimeCost.update(time);
+ }
+
public void updateCompactionTaskSelectedFileNum(
CompactionTaskType taskType, int selectedFileNum) {
switch (taskType) {
@@ -869,6 +874,12 @@ public class CompactionMetrics implements IMetricSet {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
"settle");
+ ttlCheckForObjectFileTimeCost =
+ metricService.getOrCreateHistogram(
+ Metric.COMPACTION_TASK_SELECTION_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "settle-object");
seqInnerSpaceCompactionTaskSelectedFileNum =
metricService.getOrCreateHistogram(
Metric.COMPACTION_TASK_SELECTED_FILE.toString(),
@@ -935,7 +946,8 @@ public class CompactionMetrics implements IMetricSet {
}
private void unbindCompactionTaskSelection(AbstractMetricService
metricService) {
- for (String taskType : Arrays.asList("seq", "unseq", "cross", "insertion",
"settle")) {
+ for (String taskType :
+ Arrays.asList("seq", "unseq", "cross", "insertion", "settle",
"settle-object")) {
metricService.remove(
MetricType.GAUGE,
Metric.COMPACTION_TASK_SELECTION.toString(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 09c9ee84304..82f5b7a0a71 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -103,6 +103,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.RepairUnsortedFileCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
@@ -3428,6 +3429,20 @@ public class DataRegion implements IDataRegionForQuery {
return trySubmitCount;
}
+ public void executeTTLCheckForObjectFiles() throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ List<String> allObjectDirs =
TierManager.getInstance().getAllObjectFileFolders();
+ for (String objectDir : allObjectDirs) {
+ File regionObjectDir = new File(objectDir, dataRegionIdString);
+ if (!regionObjectDir.isDirectory()) {
+ continue;
+ }
+ CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir,
databaseName);
+ }
+ CompactionMetrics.getInstance()
+ .updateTTLCheckForObjectFileCost(System.currentTimeMillis() -
startTime);
+ }
+
private boolean skipCurrentTTLAndModificationCheck() {
if (this.databaseName.equals(InformationSchema.INFORMATION_DATABASE)) {
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 290466ffeb6..41d859924f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -27,7 +27,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
@@ -193,11 +192,6 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
performer.setTargetFiles(targetTsfileResourceList);
performer.setSummary(summary);
performer.perform();
- if (performer instanceof ReadPointCompactionPerformer) {
- for (TsFileResource resource : getAllSourceTsFiles()) {
- CompactionUtils.removeDeletedObjectFiles(resource);
- }
- }
CompactionUtils.updateProgressIndexAndMark(
targetTsfileResourceList, selectedSequenceFiles,
selectedUnsequenceFiles);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 034e4eb48b8..c61d2275ac1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IInnerCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
@@ -390,11 +389,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
performer.setTargetFiles(filesView.targetFilesInPerformer);
performer.setSummary(summary);
performer.perform();
- if (performer instanceof ReadPointCompactionPerformer) {
- for (TsFileResource resource :
filesView.sourceFilesInCompactionPerformer) {
- CompactionUtils.removeDeletedObjectFiles(resource);
- }
- }
prepareTargetFiles();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
index 7543d56ce80..08cc494232b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
@@ -219,7 +219,6 @@ public class SettleCompactionTask extends
InnerSpaceCompactionTask {
if (recoverMemoryStatus) {
tsFileManager.remove(resource, resource.isSeq());
}
- CompactionUtils.removeDeletedObjectFiles(resource);
boolean res = deleteTsFileOnDisk(resource);
if (res) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index bac48729167..1e3f494ae8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -25,14 +25,15 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionAlignedChunkReader;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch;
@@ -44,33 +45,28 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEnt
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import org.apache.iotdb.db.utils.ModificationUtils;
-import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.tsfile.common.constant.TsFileConstant;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.file.header.PageHeader;
-import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
-import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.tsfile.read.TimeValuePair;
-import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.read.reader.IPointReader;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -512,173 +508,131 @@ public class CompactionUtils {
}
}
- public static void removeDeletedObjectFiles(TsFileResource resource) {
- // check for compaction recovery
- if (!resource.tsFileExists()) {
+ public static void executeTTLCheckObjectFilesForTableModel(
+ File regionObjectDir, String databaseName) {
+ File[] tableDirs = regionObjectDir.listFiles();
+ if (tableDirs == null) {
return;
}
- try (MultiTsFileDeviceIterator deviceIterator =
- new MultiTsFileDeviceIterator(Collections.singletonList(resource))) {
- while (deviceIterator.hasNextDevice()) {
- deviceIterator.nextDevice();
- deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+ boolean restrictObjectLimit =
+ CommonDescriptor.getInstance().getConfig().isRestrictObjectLimit();
+ for (File tableDir : tableDirs) {
+ if (!tableDir.isDirectory()) {
+ continue;
+ }
+ String tableName = tableDir.getName();
+ if (!restrictObjectLimit) {
+ try {
+ tableName =
+ new String(
+ BaseEncoding.base32().omitPadding().decode(tableName),
StandardCharsets.UTF_8);
+ } catch (IllegalArgumentException ignored) {
+ continue;
+ }
+ }
+ TsTable tsTable =
DataNodeTableCache.getInstance().getTable(databaseName, tableName);
+ if (tsTable == null) {
+ continue;
+ }
+ long ttlInMS =
CommonDateTimeUtils.convertIoTDBTimeToMillis(tsTable.getCachedTableTTL());
+ if (ttlInMS == Long.MAX_VALUE) {
+ continue;
+ }
+ // buffer 60s to avoid concurrent issues with querying
+ final long timeLowerBoundInMS = CommonDateTimeUtils.currentTime() -
ttlInMS - 60 * 1000;
+ try {
+ recursiveTTLCheckForTableDir(
+ tableDir, 0, tsTable.getTagNum() + 1, !restrictObjectLimit,
timeLowerBoundInMS);
+ } catch (Exception e) {
+ logger.warn(
+ "Meet exception when checking for object files for table {}.{} in
region {}",
+ databaseName,
+ tableName,
+ regionObjectDir.getName(),
+ e);
}
- } catch (Exception e) {
- logger.warn("Failed to remove object files from file {}",
resource.getTsFilePath(), e);
}
}
- @SuppressWarnings("java:S3776")
- public static void removeDeletedObjectFiles(
- TsFileSequenceReader reader,
- List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
- List<ModEntry> timeMods,
- List<List<ModEntry>> valueMods,
- int currentRegionId)
- throws IOException {
- if (alignedChunkMetadataList.isEmpty()) {
- return;
- }
- List<Integer> objectColumnIndexList = new ArrayList<>();
- List<List<ModEntry>> objectDeletionIntervalList = new ArrayList<>();
- boolean objectColumnHasDeletion = false;
-
- TSDataType[] dataTypes = new TSDataType[valueMods.size()];
- for (AbstractAlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
- boolean hasNull = false;
- for (int i = 0; i <
alignedChunkMetadata.getValueChunkMetadataList().size(); i++) {
- if (dataTypes[i] != null) {
- continue;
- }
- IChunkMetadata chunkMetadata =
alignedChunkMetadata.getValueChunkMetadataList().get(i);
- if (chunkMetadata == null) {
- hasNull = true;
- continue;
- }
- dataTypes[i] = chunkMetadata.getDataType();
- if (dataTypes[i] == TSDataType.OBJECT) {
- objectColumnIndexList.add(i);
- List<ModEntry> deletionInterval =
ModificationUtils.sortAndMerge(valueMods.get(i));
- objectColumnHasDeletion |= (!deletionInterval.isEmpty() ||
!timeMods.isEmpty());
- objectDeletionIntervalList.add(deletionInterval);
- }
+ // We try to avoid expensive 'stat' system calls by first checking file name
and only performing
+ // Files.readAttributes when the file may be expired
+ private static void recursiveTTLCheckForTableDir(
+ File currentFile,
+ int depth,
+ int maxObjectFileDepth,
+ boolean canDistinguishDirectoryByFileName,
+ long lowerBoundInMS) {
+ canDistinguishDirectoryByFileName |= depth > maxObjectFileDepth;
+ String fileName = currentFile.getName();
+ boolean maybeObjectFile = fileName.endsWith(".bin");
+ if (maybeObjectFile) {
+ if (canDistinguishDirectoryByFileName) {
+ checkTTLAndDeleteExpiredObjectFile(currentFile, null, lowerBoundInMS);
+ return;
}
- if (!hasNull) {
- break;
+ try {
+ BasicFileAttributes basicFileAttributes =
+ Files.readAttributes(currentFile.toPath(),
BasicFileAttributes.class);
+ if (!basicFileAttributes.isDirectory()) {
+ checkTTLAndDeleteExpiredObjectFile(currentFile, basicFileAttributes,
lowerBoundInMS);
+ return;
+ }
+ } catch (IOException ignored) {
}
}
- if (!objectColumnHasDeletion) {
+ File[] children = currentFile.listFiles();
+ if (children == null) {
return;
}
- int[] deletionCursors = new int[objectColumnIndexList.size() + 1];
- List<ModEntry> timeDeletionIntervalList =
ModificationUtils.sortAndMerge(timeMods);
- for (AbstractAlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
- CompactionUtils.removeDeletedObjectFiles(
- reader,
- alignedChunkMetadata,
- objectColumnIndexList,
- timeDeletionIntervalList,
- objectDeletionIntervalList,
- deletionCursors,
- currentRegionId);
+ // The rate limit may only work on filesystems like ext4, directory
File.length() is
+ // block-aligned and reflects allocated directory entry blocks.
+ acquireCompactionReadRate(currentFile.length());
+ for (File child : children) {
+ recursiveTTLCheckForTableDir(
+ child, depth + 1, maxObjectFileDepth,
canDistinguishDirectoryByFileName, lowerBoundInMS);
}
}
- @SuppressWarnings("java:S3776")
- private static void removeDeletedObjectFiles(
- TsFileSequenceReader reader,
- AbstractAlignedChunkMetadata alignedChunkMetadata,
- List<Integer> objectColumnIndexList,
- List<ModEntry> timeDeletions,
- List<List<ModEntry>> objectDeletions,
- int[] deletionCursors,
- int currentRegionId)
- throws IOException {
- Chunk timeChunk =
- reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
- CompactionChunkReader compactionChunkReader = new
CompactionChunkReader(timeChunk);
- List<Pair<PageHeader, ByteBuffer>> timePages =
- compactionChunkReader.readPageDataWithoutUncompressing();
-
- List<Chunk> valueChunks = new ArrayList<>();
- List<List<Pair<PageHeader, ByteBuffer>>> valuePages = new ArrayList<>();
-
- for (int i = 0; i < objectColumnIndexList.size(); i++) {
- int idxInAlignedChunkMetadata = objectColumnIndexList.get(i);
- if (timeDeletions.isEmpty() && objectDeletions.get(i).isEmpty()) {
- continue;
- }
- ChunkMetadata valueChunkMetadata =
- (ChunkMetadata)
-
alignedChunkMetadata.getValueChunkMetadataList().get(idxInAlignedChunkMetadata);
- if (valueChunkMetadata == null) {
- continue;
- }
- Chunk chunk = reader.readMemChunk(valueChunkMetadata);
- if (chunk != null) {
- chunk
- .getHeader()
- .setReplaceDecoder(
- decoder -> ObjectTypeUtils.getReplaceDecoder(decoder,
currentRegionId));
- }
- valueChunks.add(chunk);
- valuePages.add(
- chunk == null
- ? null
- : new
CompactionChunkReader(chunk).readPageDataWithoutUncompressing());
- }
-
- CompactionAlignedChunkReader alignedChunkReader =
- new CompactionAlignedChunkReader(timeChunk, valueChunks, true);
- for (int i = 0; i < timePages.size(); i++) {
- Pair<PageHeader, ByteBuffer> timePage = timePages.get(i);
- List<PageHeader> valuePageHeaders = new ArrayList<>(valuePages.size());
- List<ByteBuffer> compressedValuePages = new
ArrayList<>(valuePages.size());
- for (int j = 0; j < valuePages.size(); j++) {
- Pair<PageHeader, ByteBuffer> valuePage = valuePages.get(j).get(i);
- valuePageHeaders.add(valuePage.getLeft());
- compressedValuePages.add(valuePage.getRight());
- }
- IPointReader pagePointReader =
- alignedChunkReader.getPagePointReader(
- timePage.getLeft(), valuePageHeaders, timePage.getRight(),
compressedValuePages);
+ private static void checkTTLAndDeleteExpiredObjectFile(
+ File file, @Nullable BasicFileAttributes attributes, long
timeLowerBoundInMS) {
+ String fileName = file.getName();
+ long fileTimestampInMS;
+ try {
+ fileTimestampInMS = Long.parseLong(fileName.substring(0,
fileName.length() - 4));
+ } catch (NumberFormatException ignored) {
+ return;
+ }
- while (pagePointReader.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = pagePointReader.nextTimeValuePair();
- removeDeletedObjectFiles(timeValuePair, deletionCursors,
timeDeletions, objectDeletions);
- }
+ if (fileTimestampInMS >= timeLowerBoundInMS) {
+ return;
}
- }
- private static void removeDeletedObjectFiles(
- TimeValuePair timeValuePair,
- int[] cursors,
- List<ModEntry> timeDeletions,
- List<List<ModEntry>> objectDeletions) {
- long timestamp = timeValuePair.getTimestamp();
- boolean timeDeleted = isDeleted(timestamp, timeDeletions, cursors, 0);
- for (int i = 0; i < timeValuePair.getValues().length; i++) {
- Binary value = (Binary) timeValuePair.getValues()[i];
- if (value == null) {
- continue;
- }
- if (timeDeleted || isDeleted(timestamp, objectDeletions.get(i), cursors,
i + 1)) {
- ObjectTypeUtils.deleteObjectPathFromBinary(value);
+ try {
+ attributes =
+ attributes == null
+ ? Files.readAttributes(file.toPath(), BasicFileAttributes.class)
+ : attributes;
+ if (attributes.isDirectory()) {
+ return;
}
+ Files.delete(file.toPath());
+ FileMetrics.getInstance().decreaseObjectFileNum(1);
+ FileMetrics.getInstance().decreaseObjectFileSize(attributes.size());
+ logger.info("Remove object file {}, size is {}(byte)", file.getPath(),
attributes.size());
+ } catch (Exception ignored) {
}
}
- private static boolean isDeleted(
- long timestamp, List<ModEntry> deleteIntervalList, int[] deleteCursors,
int idx) {
- while (deleteIntervalList != null && deleteCursors[idx] <
deleteIntervalList.size()) {
- if
(deleteIntervalList.get(deleteCursors[idx]).getTimeRange().contains(timestamp))
{
- return true;
- } else if
(deleteIntervalList.get(deleteCursors[idx]).getTimeRange().getMax() <
timestamp) {
- deleteCursors[idx]++;
- } else {
- return false;
- }
+ private static void acquireCompactionReadRate(long size) {
+ if (size <= 0) {
+ return;
+ }
+
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);
+ RateLimiter rateLimiter =
CompactionTaskManager.getInstance().getCompactionReadRateLimiter();
+ while (size >= Integer.MAX_VALUE) {
+ size -= Integer.MAX_VALUE;
+ rateLimiter.acquire(Integer.MAX_VALUE);
}
- return false;
+ rateLimiter.acquire((int) size);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index c07f5a1ad40..b9b7f114e3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -483,16 +483,6 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
modificationList.isEmpty() ? Collections.emptyList() :
modificationList);
}
- if (ttlDeletion != null) {
- List<ModEntry> emptyList = Collections.emptyList();
- CompactionUtils.removeDeletedObjectFiles(
- readerMap.get(tsFileResource),
- alignedChunkMetadataList,
- Collections.singletonList(ttlDeletion),
- modificationForValueColumns.stream().map(v ->
emptyList).collect(Collectors.toList()),
- tsFileResource.getTsFileID().regionId);
- }
-
ModificationUtils.modifyAlignedChunkMetaData(
alignedChunkMetadataList,
modificationForTimeColumn,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
index 7a8e884a491..c2d3524697d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
@@ -22,11 +22,8 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.ex
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
-import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.exception.WriteProcessException;
-import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
@@ -62,11 +59,9 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class FastAlignedSeriesCompactionExecutor extends
SeriesCompactionExecutor {
@@ -270,24 +265,6 @@ public class FastAlignedSeriesCompactionExecutor extends
SeriesCompactionExecuto
}
});
- long ttlForTable =
- deviceId.isTableModel()
- ? DataNodeTTLCache.getInstance()
- .getTTLForTable(resource.getDatabaseName(),
deviceId.getTableName())
- : Long.MAX_VALUE;
- if (ttlForTable != Long.MAX_VALUE) {
- ModEntry ttlDeletion =
- CompactionUtils.convertTtlToDeletion(
- deviceId, CommonDateTimeUtils.currentTime() - ttlForTable);
- List<ModEntry> emptyList = Collections.emptyList();
- CompactionUtils.removeDeletedObjectFiles(
- readerCacheMap.get(resource),
- alignedChunkMetadataList,
- Collections.singletonList(ttlDeletion),
- valueModifications.stream().map(v ->
emptyList).collect(Collectors.toList()),
- resource.getTsFileID().regionId);
- }
-
// modify aligned chunk metadatas
ModificationUtils.modifyAlignedChunkMetaData(
alignedChunkMetadataList, timeModifications, valueModifications,
ignoreAllNullRows);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index 393a9f6d2dc..e15a908ac24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -65,6 +66,17 @@ public class TTLScheduleTask implements Callable<Void> {
dataRegionListSnapshot.get(i).executeTTLCheck();
}
}
+ // check for object files
+ for (int i = 0; i < dataRegionListSnapshot.size(); i++) {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ DataRegion region = dataRegionListSnapshot.get(i);
+ if (i % workerNum == workerId
+ && PathUtils.isTableModelDatabase(region.getDatabaseName())) {
+ dataRegionListSnapshot.get(i).executeTTLCheckForObjectFiles();
+ }
+ }
} catch (InterruptedException ignored) {
logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
return null;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
index 723a052cff1..14495067da4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.dataregion.Base32ObjectPath;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
import org.apache.iotdb.db.storageengine.dataregion.PlainObjectPath;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
@@ -39,11 +40,13 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.utils.ObjectTypeUtils;
+import com.google.common.io.BaseEncoding;
import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
@@ -67,8 +70,8 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
@@ -134,7 +137,7 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
@Test
public void testSeqCompactionWithTTL() throws IOException,
WriteProcessException {
Pair<TsFileResource, File> pair1 =
- generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 0);
+ generateTsFileAndObject(true, System.currentTimeMillis() - 100000, 0);
Pair<TsFileResource, File> pair2 =
generateTsFileAndObject(true, System.currentTimeMillis() + 1000000,
100);
tsFileManager.add(pair1.getLeft(), true);
@@ -148,6 +151,10 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
new ReadChunkCompactionPerformer(),
0);
Assert.assertTrue(task.start());
+
+ Assert.assertTrue(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionDir,
COMPACTION_TEST_SG);
Assert.assertFalse(pair1.getRight().exists());
Assert.assertTrue(pair2.getRight().exists());
}
@@ -169,6 +176,10 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
new FastCompactionPerformer(false),
0);
Assert.assertTrue(task.start());
+
+ Assert.assertTrue(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionDir,
COMPACTION_TEST_SG);
Assert.assertFalse(pair2.getRight().exists());
Assert.assertTrue(pair1.getRight().exists());
}
@@ -191,6 +202,9 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
0);
Assert.assertTrue(task.start());
Assert.assertTrue(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionDir,
COMPACTION_TEST_SG);
+ Assert.assertTrue(pair1.getRight().exists());
Assert.assertFalse(pair2.getRight().exists());
}
@@ -212,6 +226,10 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
1,
0);
Assert.assertTrue(task.start());
+
+ Assert.assertTrue(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionDir,
COMPACTION_TEST_SG);
Assert.assertFalse(pair2.getRight().exists());
Assert.assertTrue(pair1.getRight().exists());
}
@@ -219,7 +237,7 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
@Test
public void testSettleCompaction() throws IOException, WriteProcessException
{
Pair<TsFileResource, File> pair1 =
- generateTsFileAndObject(true, System.currentTimeMillis() - 10000, 3);
+ generateTsFileAndObject(true, System.currentTimeMillis() - 100000, 3);
Pair<TsFileResource, File> pair2 =
generateTsFileAndObject(true, System.currentTimeMillis() + 1000000, 0);
tsFileManager.add(pair1.getLeft(), true);
@@ -234,10 +252,73 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
new FastCompactionPerformer(true),
0);
Assert.assertTrue(task.start());
+
+ Assert.assertTrue(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionDir,
COMPACTION_TEST_SG);
Assert.assertFalse(pair1.getRight().exists());
Assert.assertTrue(pair2.getRight().exists());
}
+ @Test
+ public void testBase32ObjectPathTTLCheck() throws IOException {
+ config.setRestrictObjectLimit(false);
+ try {
+ File file1 =
+ generateBase32PathObjectFile(regionDir, System.currentTimeMillis() +
100000, false);
+ File file2 =
+ generateBase32PathObjectFile(regionDir, System.currentTimeMillis() +
200000, true);
+ File file3 =
+ generateBase32PathObjectFile(regionDir, System.currentTimeMillis() -
100000, true);
+ File file4 =
+ generateBase32PathObjectFile(regionDir, System.currentTimeMillis() -
200000, false);
+ Assert.assertTrue(file1.exists());
+ Assert.assertTrue(file2.exists());
+ Assert.assertTrue(file3.exists());
+ Assert.assertTrue(file4.exists());
+ CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionDir,
COMPACTION_TEST_SG);
+ Assert.assertTrue(file1.exists());
+ Assert.assertTrue(file2.exists());
+ Assert.assertFalse(file3.exists());
+ Assert.assertFalse(file4.exists());
+ } finally {
+ config.setRestrictObjectLimit(true);
+ }
+ }
+
+ @Test
+ public void testPlainObjectPathTTLCheck() throws IOException,
InterruptedException {
+ File file1 =
+ generatePlainPathObjectFile(regionDir, System.currentTimeMillis() +
100000, false, "d1");
+ File file2 =
+ generatePlainPathObjectFile(
+ regionDir,
+ System.currentTimeMillis() + 200000,
+ true,
+ (System.currentTimeMillis() - 100000) + ".bin");
+ File file3 =
+ generatePlainPathObjectFile(regionDir, System.currentTimeMillis() -
100000, true, "d1");
+ File file4 =
+ generatePlainPathObjectFile(regionDir, System.currentTimeMillis() -
200000, false, "d1");
+ File file5 =
+ generatePlainPathObjectFile(
+ regionDir,
+ System.currentTimeMillis() + 300000,
+ false,
+ (System.currentTimeMillis() - 300000) + ".bin");
+ Assert.assertTrue(file1.exists());
+ Assert.assertTrue(file2.exists());
+ Assert.assertTrue(file3.exists());
+ Assert.assertTrue(file4.exists());
+ Assert.assertTrue(file5.exists());
+ new DataRegion(COMPACTION_TEST_SG,
regionDir.getName()).executeTTLCheckForObjectFiles();
+ Assert.assertTrue(file1.exists());
+ Assert.assertTrue(file2.exists());
+ Assert.assertFalse(file3.exists());
+ Assert.assertFalse(file4.exists());
+ Assert.assertTrue(file5.exists());
+ }
+
@Test
public void testPlainObjectBinaryReplaceRegionId() {
IObjectPath objectPath = new PlainObjectPath(1, 0, new
StringArrayDeviceID("t1.d1"), "s1");
@@ -280,13 +361,32 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
private Pair<TsFileResource, File> generateTsFileAndObject(
boolean seq, long timestamp, int regionIdInTsFile) throws IOException,
WriteProcessException {
TsFileResource resource = createEmptyFileAndResource(seq);
- Path testFile1 = Files.createTempFile(regionDir.toPath(), "test_", ".bin");
+ File dir =
+ new File(
+ regionDir.getPath()
+ + File.separator
+ + "t1"
+ + File.separator
+ + "d1"
+ + File.separator
+ + "s1");
+ dir.mkdirs();
+ File testFile1 = new File(dir, timestamp + ".bin");
byte[] content = new byte[100];
for (int i = 0; i < 100; i++) {
content[i] = (byte) i;
}
- Files.write(testFile1, content);
- String relativePathInTsFile = regionIdInTsFile + File.separator +
testFile1.toFile().getName();
+ Files.write(testFile1.toPath(), content);
+ String relativePathInTsFile =
+ regionIdInTsFile
+ + File.separator
+ + "t1"
+ + File.separator
+ + "d1"
+ + File.separator
+ + "s1"
+ + File.separator
+ + testFile1.getName();
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES +
relativePathInTsFile.length());
buffer.putLong(100L);
buffer.put(BytesUtils.stringToBytes(relativePathInTsFile));
@@ -296,7 +396,8 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
writer.getSchema().registerTableSchema(tableSchema);
writer.startChunkGroup(deviceID);
AlignedChunkWriterImpl alignedChunkWriter =
- new AlignedChunkWriterImpl(Arrays.asList(new MeasurementSchema("s1",
TSDataType.OBJECT)));
+ new AlignedChunkWriterImpl(
+ Collections.singletonList(new MeasurementSchema("s1",
TSDataType.OBJECT)));
alignedChunkWriter.write(timestamp);
alignedChunkWriter.write(timestamp, new Binary(buffer.array()), false);
alignedChunkWriter.sealCurrentPage();
@@ -309,6 +410,50 @@ public class ObjectTypeCompactionTest extends
AbstractCompactionTest {
resource.serialize();
resource.deserialize();
resource.setStatus(TsFileResourceStatus.NORMAL);
- return new Pair<>(resource, testFile1.toFile());
+ return new Pair<>(resource, testFile1);
+ }
+
+ private File generatePlainPathObjectFile(
+ File regionDir, long timestamp, boolean internalLevel, String tagValue)
throws IOException {
+ File dir =
+ new File(
+ regionDir.getPath()
+ + File.separator
+ + "t1"
+ + (internalLevel ? "" : (File.separator + tagValue))
+ + File.separator
+ + "s1");
+ dir.mkdirs();
+ File testFile1 = new File(dir, timestamp + ".bin");
+ byte[] content = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ content[i] = (byte) i;
+ }
+ Files.write(testFile1.toPath(), content);
+ return testFile1;
+ }
+
+ private File generateBase32PathObjectFile(File regionDir, long timestamp,
boolean internalLevel)
+ throws IOException {
+ File dir =
+ new File(
+ regionDir.getPath()
+ + File.separator
+ + toBase32Str("t1")
+ + (internalLevel ? "" : (File.separator + toBase32Str("d1")))
+ + File.separator
+ + toBase32Str("s1"));
+ dir.mkdirs();
+ File testFile1 = new File(dir, timestamp + ".bin");
+ byte[] content = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ content[i] = (byte) i;
+ }
+ Files.write(testFile1.toPath(), content);
+ return testFile1;
+ }
+
+ private String toBase32Str(String str) {
+ return
BaseEncoding.base32().omitPadding().encode(str.getBytes(StandardCharsets.UTF_8));
}
}