This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch optimizeTTLDeletionInFastCompactionPerformer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d0bf9f241470c75cf7c211f330ed73aa1e07423a Author: shuwenwei <[email protected]> AuthorDate: Mon Sep 8 16:20:31 2025 +0800 optimize ttl deletion in fast compaction performer --- .../execute/performer/impl/FastCompactionPerformer.java | 11 +++-------- .../compaction/execute/utils/CompactionUtils.java | 12 +++++++++++- .../execute/utils/MultiTsFileDeviceIterator.java | 16 +++------------- .../utils/executor/fast/SeriesCompactionExecutor.java | 6 +++++- .../execute/utils/writer/AbstractCompactionWriter.java | 11 +++++++++++ .../selector/estimator/AbstractCompactionEstimator.java | 4 ++++ .../estimator/RepairUnsortedFileCompactionEstimator.java | 5 +++++ .../selector/impl/NewSizeTieredCompactionSelector.java | 2 +- .../compaction/selector/impl/SettleSelectorImpl.java | 2 +- 9 files changed, 44 insertions(+), 25 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index d34de94e584..35383269fe1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -147,18 +147,13 @@ public class FastCompactionPerformer // checked above //noinspection OptionalGetWithoutIsPresent sortedSourceFiles.sort(Comparator.comparingLong(x -> x.getStartTime(device).get())); + ModEntry ttlDeletion = null; if (ttl != Long.MAX_VALUE) { - ModEntry ttlDeletion = + ttlDeletion = CompactionUtils.convertTtlToDeletion( device, deviceIterator.getTimeLowerBoundForCurrentDevice()); - for (TsFileResource sourceFile : sortedSourceFiles) { - modificationCache - .computeIfAbsent( - sourceFile.getTsFile().getName(), - k -> PatternTreeMapFactory.getModsPatternTreeMap()) - .append(ttlDeletion.keyOfPatternTree(), ttlDeletion); - } } + compactionWriter.setTTLDeletion(ttlDeletion); if (sortedSourceFiles.isEmpty()) { // device is out of dated in all source files diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 57191a99a4e..1f6ad903718 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -415,13 +416,22 @@ public class CompactionUtils { public static List<ModEntry> getMatchedModifications( PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> patternTreeMap, IDeviceID deviceID, - String measurement) + String measurement, + ModEntry ttlDeletion) throws IllegalPathException { if (patternTreeMap == null) { return Collections.emptyList(); } PartialPath path = CompactionPathUtils.getPath(deviceID, measurement); List<ModEntry> modEntries = patternTreeMap.getOverlapped(path); + if (ttlDeletion != null) { + if (!(modEntries instanceof ArrayList)) { + List<ModEntry> newModEntries = new ArrayList<>(modEntries.size() + 1); + newModEntries.addAll(modEntries); + modEntries = newModEntries; + } + modEntries.add(ttlDeletion); + } if (path.getIDeviceID().isTableModel()) { modEntries = modEntries.stream() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index b5e9a9010e7..2a49d28bae3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -452,10 +452,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // match time column modifications List<ModEntry> modificationForTimeColumn = CompactionUtils.getMatchedModifications( - modifications, device, AlignedPath.VECTOR_PLACEHOLDER); - if (ttlDeletion != null) { - modificationForTimeColumn.add(ttlDeletion); - } + modifications, device, AlignedPath.VECTOR_PLACEHOLDER, ttlDeletion); // match value column modifications List<List<ModEntry>> modificationForValueColumns = new ArrayList<>(); @@ -466,10 +463,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { } List<ModEntry> modificationList = CompactionUtils.getMatchedModifications( - modifications, device, valueChunkMetadata.getMeasurementUid()); - if (ttlDeletion != null) { - modificationList.add(ttlDeletion); - } + modifications, device, valueChunkMetadata.getMeasurementUid(), null); modificationForValueColumns.add( modificationList.isEmpty() ? Collections.emptyList() : modificationList); } @@ -686,11 +680,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // collect the modifications for current series List<ModEntry> modificationForCurrentSeries = CompactionUtils.getMatchedModifications( - modificationsInThisResource, device, currentCompactingSeries); - // add ttl deletion for current series - if (ttlDeletion != null) { - modificationForCurrentSeries.add(ttlDeletion); - } + modificationsInThisResource, device, currentCompactingSeries, ttlDeletion); // if there are modifications of current series, apply them to the chunk metadata if (!modificationForCurrentSeries.isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index bf16c153d26..b3073bd3d25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -478,7 +478,11 @@ public abstract class SeriesCompactionExecutor { throws IllegalPathException { PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> allModifications = modificationCacheMap.get(tsFileResource.getTsFile().getName()); - return CompactionUtils.getMatchedModifications(allModifications, deviceId, measurement); + return CompactionUtils.getMatchedModifications( + allModifications, + deviceId, + measurement, + compactionWriter.getTTLLowerBoundForCurrentDevice()); } @SuppressWarnings("squid:S3776") diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java index 364e21e2273..5e794746894 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.exe import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.PageHeader; @@ -99,10 +100,20 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { protected String[] measurementId = new String[subTaskNum]; + protected ModEntry ttlDeletionForCurrentDevice; + public abstract void startChunkGroup(IDeviceID deviceId, boolean isAlign) throws IOException; public abstract void endChunkGroup() throws IOException; + public void setTTLDeletion(ModEntry ttlDeletion) { + this.ttlDeletionForCurrentDevice = ttlDeletion; + } + + public ModEntry getTTLLowerBoundForCurrentDevice() { + return ttlDeletionForCurrentDevice; + } + public void startMeasurement(String measurement, IChunkWriter chunkWriter, int subTaskId) { lastCheckIndex = 0; lastTimeSet[subTaskId] = false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java index 5c6ff61084d..afdff56d155 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java @@ -247,6 +247,10 @@ public abstract class AbstractCompactionEstimator { return roughFileInfo; } + public boolean supportsRoughEstimation() { + return true; + } + public static void removeFileInfoFromGlobalFileInfoCache(TsFileResource resource) { if (resource == null || resource.getTsFile() == null) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java index eca5bb3dcb0..9316ad3c106 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java @@ -89,4 +89,9 @@ public class RepairUnsortedFileCompactionEstimator extends AbstractInnerSpaceEst CompactionScheduleContext context, List<TsFileResource> resources) throws IOException { throw new RuntimeException("unimplemented"); } + + @Override + public boolean supportsRoughEstimation() { + return false; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java index e56193301df..a4181afb72b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java @@ -199,7 +199,7 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto performer = sequence ? context.getSeqCompactionPerformer() : context.getUnseqCompactionPerformer(); estimator = performer.getInnerSpaceEstimator().orElse(null); - if (estimator == null) { + if (estimator == null || !estimator.supportsRoughEstimation()) { estimateCompactionTaskMemoryDuringSelection = false; } memoryCost = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java index 4e645189557..09a55819dd9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java @@ -297,7 +297,7 @@ public class SettleSelectorImpl implements ISettleSelector { throws IllegalPathException { return ModificationUtils.isAllDeletedByMods( CompactionUtils.getMatchedModifications( - modifications, device, AlignedPath.VECTOR_PLACEHOLDER), + modifications, device, AlignedPath.VECTOR_PLACEHOLDER, null), startTime, endTime); }
