This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch usePatternTreeMapInCompaction in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2941f77e257a1ab9341c770be374d96a47f37890 Author: shuwenwei <[email protected]> AuthorDate: Thu Aug 14 18:35:17 2025 +0800 Using PatternTreeMap to cache mod entries in ReadChunkCompactionPerformer --- .../performer/impl/FastCompactionPerformer.java | 5 +- .../compaction/execute/utils/CompactionUtils.java | 36 +++++++++++ .../execute/utils/MultiTsFileDeviceIterator.java | 45 ++++++------- .../executor/fast/SeriesCompactionExecutor.java | 19 +----- .../tablemodel/CompactionWithAllNullRowsTest.java | 73 +++++++++++++++++++--- 5 files changed, 123 insertions(+), 55 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index e970b07f2f4..d34de94e584 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -365,10 +365,7 @@ public class FastCompactionPerformer } // read mods PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = - PatternTreeMapFactory.getModsPatternTreeMap(); - for (ModEntry modification : resource.getAllModEntries()) { - modifications.append(modification.keyOfPatternTree(), modification); - } + CompactionUtils.buildModEntryPatternTreeMap(resource); modificationCache.put(resource.getTsFile().getName(), modifications); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index ee7afe8f88c..57191a99a4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.TestOnly; @@ -39,6 +41,8 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEn import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; +import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.SystemMetric; @@ -54,6 +58,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -395,6 +400,37 @@ public class CompactionUtils { } } + public static PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> + buildModEntryPatternTreeMap(TsFileResource resource) { + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> patternTreeMap = + PatternTreeMapFactory.getModsPatternTreeMap(); + TsFileResource.ModIterator modEntryIterator = resource.getModEntryIterator(); + while (modEntryIterator.hasNext()) { + ModEntry modification = modEntryIterator.next(); + patternTreeMap.append(modification.keyOfPatternTree(), modification); + } + return patternTreeMap; + } + + public static List<ModEntry> getMatchedModifications( + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> patternTreeMap, + IDeviceID deviceID, + String measurement) + throws IllegalPathException { + if (patternTreeMap == null) { + return Collections.emptyList(); + } + PartialPath path = CompactionPathUtils.getPath(deviceID, measurement); + List<ModEntry> modEntries = patternTreeMap.getOverlapped(path); + if (path.getIDeviceID().isTableModel()) { + modEntries = + modEntries.stream() + .filter(e -> e.affects(path.getIDeviceID()) && e.affects(path.getMeasurement())) + .collect(Collectors.toList()); + } + return ModificationUtils.sortAndMerge(modEntries); + } + public static boolean isDiskHasSpace() { return isDiskHasSpace(0d); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index f84e25a179b..b5e9a9010e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -21,7 +21,9 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; @@ -31,6 +33,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEnt import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; @@ -67,7 +70,8 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { private List<TsFileResource> tsFileResourcesSortedByAsc; private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>(); private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>(); - private final Map<TsFileResource, List<ModEntry>> modificationCache = new HashMap<>(); + private final Map<TsFileResource, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> + modificationCache = new HashMap<>(); private Pair<IDeviceID, Boolean> currentDevice = null; private boolean ignoreAllNullRows; private long ttlForCurrentDevice; @@ -437,21 +441,18 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { ttlDeletion = CompactionUtils.convertTtlToDeletion(device, timeLowerBoundForCurrentDevice); } - List<ModEntry> modifications = + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = modificationCache.computeIfAbsent( - tsFileResource, r -> new ArrayList<>(tsFileResource.getAllModEntries())); + tsFileResource, CompactionUtils::buildModEntryPatternTreeMap); // construct the input params List<List<Modification>> for QueryUtils.modifyAlignedChunkMetaData AbstractAlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); List<IChunkMetadata> valueChunkMetadataList = alignedChunkMetadata.getValueChunkMetadataList(); // match time column modifications - List<ModEntry> modificationForTimeColumn = new ArrayList<>(); - for (ModEntry modification : modifications) { - if (modification.affectsAll(device)) { - modificationForTimeColumn.add(modification); - } - } + List<ModEntry> modificationForTimeColumn = + CompactionUtils.getMatchedModifications( + modifications, device, AlignedPath.VECTOR_PLACEHOLDER); if (ttlDeletion != null) { modificationForTimeColumn.add(ttlDeletion); } @@ -463,13 +464,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { modificationForValueColumns.add(Collections.emptyList()); continue; } - List<ModEntry> modificationList = new ArrayList<>(); - for (ModEntry modification : modifications) { - if (modification.affects(currentDevice.getLeft()) - && modification.affects(valueChunkMetadata.getMeasurementUid())) { - modificationList.add(modification); - } - } + List<ModEntry> modificationList = + CompactionUtils.getMatchedModifications( + modifications, device, valueChunkMetadata.getMeasurementUid()); if (ttlDeletion != null) { modificationList.add(ttlDeletion); } @@ -682,16 +679,14 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { chunkMetadataListMap.get(currentCompactingSeries); chunkMetadataListMap.remove(currentCompactingSeries); - List<ModEntry> modificationsInThisResource = - modificationCache.computeIfAbsent( - resource, r -> new ArrayList<>(r.getAllModEntries())); - LinkedList<ModEntry> modificationForCurrentSeries = new LinkedList<>(); + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> + modificationsInThisResource = + modificationCache.computeIfAbsent( + resource, CompactionUtils::buildModEntryPatternTreeMap); // collect the modifications for current series - for (ModEntry modification : modificationsInThisResource) { - if (modification.affects(device) && modification.affects(currentCompactingSeries)) { - modificationForCurrentSeries.add(modification); - } - } + List<ModEntry> modificationForCurrentSeries = + CompactionUtils.getMatchedModifications( + modificationsInThisResource, device, currentCompactingSeries); // add ttl deletion for current series if (ttlDeletion != null) { modificationForCurrentSeries.add(ttlDeletion); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index cff3adb69cf..bf16c153d26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -20,11 +20,10 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement; @@ -33,7 +32,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.rea import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.exception.write.PageException; @@ -47,12 +45,10 @@ import org.apache.tsfile.read.common.TimeRange; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; -import java.util.stream.Collectors; public abstract class SeriesCompactionExecutor { @@ -482,18 +478,7 @@ public abstract class SeriesCompactionExecutor { throws IllegalPathException { PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> allModifications = modificationCacheMap.get(tsFileResource.getTsFile().getName()); - if (allModifications == null) { - return Collections.emptyList(); - } - PartialPath path = CompactionPathUtils.getPath(deviceId, measurement); - List<ModEntry> modEntries = allModifications.getOverlapped(path); - if (path.getIDeviceID().isTableModel()) { - modEntries = - modEntries.stream() - .filter(e -> e.affects(path.getIDeviceID()) && e.affects(path.getMeasurement())) - .collect(Collectors.toList()); - } - return ModificationUtils.sortAndMerge(modEntries); + return CompactionUtils.getMatchedModifications(allModifications, deviceId, measurement); } @SuppressWarnings("squid:S3776") diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java index a7c2d1cb289..9ff8a401150 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.tablemodel; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; -import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache; @@ -36,9 +35,9 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; -import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.exception.write.WriteProcessException; @@ -49,7 +48,6 @@ import org.apache.tsfile.read.common.TimeRange; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -57,6 +55,7 @@ import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; @RunWith(Parameterized.class) public class CompactionWithAllNullRowsTest extends AbstractCompactionTest { @@ -248,8 +247,7 @@ public class CompactionWithAllNullRowsTest extends AbstractCompactionTest { } @Test - @Ignore - public void testCompactionWithAllDeletion() throws IOException, IllegalPathException { + public void testCompactionWithAllDeletion1() throws IOException { TsFileResource resource1 = createEmptyFileAndResource(true); IDeviceID deviceID = null; try (CompactionTableModelTestFileWriter writer = @@ -279,6 +277,39 @@ public class CompactionWithAllNullRowsTest extends AbstractCompactionTest { Assert.assertTrue(tsFileManager.getTsFileList(true).isEmpty()); } + @Test + public void testCompactionWithAllDeletion2() throws IOException { + TsFileResource resource1 = createEmptyFileAndResource(true); + IDeviceID deviceID = null; + try (CompactionTableModelTestFileWriter writer = + new CompactionTableModelTestFileWriter(resource1)) { + writer.registerTableSchema("t1", Arrays.asList("id1", "id2")); + deviceID = writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2")); + writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue( + Arrays.asList("s0", "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9"), + new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, + TSEncoding.PLAIN, + CompressionType.LZ4, + Arrays.asList(true, true, true, true, true, true, true, true, true, true)); + writer.endChunkGroup(); + writer.endFile(); + } + resource1 + .getModFileForWrite() + .write( + new TableDeletionEntry( + new DeletionPredicate(deviceID.getTableName(), new FullExactMatch(deviceID)), + new TimeRange(Long.MIN_VALUE, 11))); + resource1.getModFileForWrite().close(); + seqResources.add(resource1); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask(0, tsFileManager, seqResources, true, getPerformer(), 0); + Assert.assertTrue(task.start()); + TsFileResource target = tsFileManager.getTsFileList(true).get(0); + Assert.assertEquals(12, target.getFileStartTime()); + Assert.assertEquals(12, target.getFileEndTime()); + } + @Test public void testCompactionWithAllValueColumnDeletion() throws IOException, IllegalPathException { TsFileResource resource1 = createEmptyFileAndResource(true); @@ -298,16 +329,40 @@ public class CompactionWithAllNullRowsTest extends AbstractCompactionTest { } resource1 .getModFileForWrite() - .write(new TreeDeletionEntry(new MeasurementPath(deviceID, "s0"), 11)); + .write( + new TableDeletionEntry( + new DeletionPredicate( + "t1", + new IDPredicate.FullExactMatch(deviceID), + Collections.singletonList("s0")), + new TimeRange(Long.MIN_VALUE, 11))); resource1 .getModFileForWrite() - .write(new TreeDeletionEntry(new MeasurementPath(deviceID, "s1"), 11)); + .write( + new TableDeletionEntry( + new DeletionPredicate( + "t1", + new IDPredicate.FullExactMatch(deviceID), + Collections.singletonList("s1")), + new TimeRange(Long.MIN_VALUE, 11))); resource1 .getModFileForWrite() - .write(new TreeDeletionEntry(new MeasurementPath(deviceID, "s2"), 11)); + .write( + new TableDeletionEntry( + new DeletionPredicate( + "t1", + new IDPredicate.FullExactMatch(deviceID), + Collections.singletonList("s2")), + new TimeRange(Long.MIN_VALUE, 11))); resource1 .getModFileForWrite() - .write(new TreeDeletionEntry(new MeasurementPath(deviceID, "s3"), 11)); + .write( + new TableDeletionEntry( + new DeletionPredicate( + "t1", + new IDPredicate.FullExactMatch(deviceID), + Collections.singletonList("s3")), + new TimeRange(Long.MIN_VALUE, 11))); resource1.getModFileForWrite().close(); seqResources.add(resource1); InnerSpaceCompactionTask task =
