This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch IgnoreDataOfDeprecatedTableInCompaction in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 216d020a27229f14f5233ce48a1e2c97e9f5527c Author: shuwenwei <[email protected]> AuthorDate: Thu Oct 9 18:17:38 2025 +0800 ignore data of deprecated table in compaction --- .../performer/impl/FastCompactionPerformer.java | 7 +- .../impl/ReadChunkCompactionPerformer.java | 5 +- .../impl/ReadPointCompactionPerformer.java | 6 +- .../execute/utils/CompactionTableSchema.java | 11 +- .../utils/CompactionTableSchemaCollector.java | 33 +++++- .../execute/utils/MultiTsFileDeviceIterator.java | 23 ++++ .../TableModelFastCompactionPerformerTest.java | 125 +++++++++++++++------ ...TableModelReadChunkCompactionPerformerTest.java | 94 +++++++++------- 8 files changed, 215 insertions(+), 89 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index 35383269fe1..fafba7ac1bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -125,7 +125,8 @@ public class FastCompactionPerformer ? new FastCrossCompactionWriter(targetFiles, seqFiles, readerCacheMap) : new FastInnerCompactionWriter(targetFiles)) { List<Schema> schemas = - CompactionTableSchemaCollector.collectSchema(seqFiles, unseqFiles, readerCacheMap); + CompactionTableSchemaCollector.collectSchema( + seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap()); compactionWriter.setSchemaForAllTargetFile(schemas); readModification(seqFiles); readModification(unseqFiles); @@ -210,6 +211,10 @@ public class FastCompactionPerformer measurementSchemas.add(entry.getValue().left); timeseriesMetadataOffsetMap.put(entry.getKey(), entry.getValue().right); } + // current device may be ignored by some conditions + if (measurementSchemas.isEmpty()) { + return; + } FastCompactionTaskSummary taskSummary = new FastCompactionTaskSummary(); new FastCompactionPerformerSubTask( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 9ce42946416..837978b4ce1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -91,7 +91,10 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { PageException { try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles)) { schema = - CompactionTableSchemaCollector.collectSchema(seqFiles, deviceIterator.getReaderMap()); + CompactionTableSchemaCollector.collectSchema( + seqFiles, + deviceIterator.getReaderMap(), + deviceIterator.getDeprecatedTableSchemaMap()); while (deviceIterator.hasNextDevice()) { currentWriter = getAvailableCompactionWriter(); Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java index 447ca149462..ca650f28a75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java @@ -118,7 +118,10 @@ public class ReadPointCompactionPerformer new MultiTsFileDeviceIterator(seqFiles, unseqFiles); List<Schema> schemas = CompactionTableSchemaCollector.collectSchema( - seqFiles, unseqFiles, deviceIterator.getReaderMap()); + seqFiles, + unseqFiles, + deviceIterator.getReaderMap(), + deviceIterator.getDeprecatedTableSchemaMap()); compactionWriter.setSchemaForAllTargetFile(schemas); while (deviceIterator.hasNextDevice()) { checkThreadInterrupted(); @@ -271,6 +274,7 @@ public class ReadPointCompactionPerformer } else { seriesPath = new NonAlignedFullPath(deviceId, measurementSchemas.get(0)); } + return new SeriesDataBlockReader( seriesPath, new HashSet<>(allSensors), fragmentInstanceContext, queryDataSource, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java index 605cdda0230..3f6e83cbe96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java @@ -33,9 +33,9 @@ public class CompactionTableSchema extends TableSchema { super(tableName); } - public void merge(TableSchema tableSchema) { + public boolean merge(TableSchema tableSchema) { if (tableSchema == null) { - return; + return true; } if (!tableSchema.getTableName().equals(this.tableName)) { throw new CompactionTableSchemaNotMatchException( @@ -60,11 +60,7 @@ public class CompactionTableSchema extends TableSchema { IMeasurementSchema idColumnToMerge = idColumnSchemasToMerge.get(i); IMeasurementSchema currentIdColumn = measurementSchemas.get(i); if (!idColumnToMerge.getMeasurementName().equals(currentIdColumn.getMeasurementName())) { - throw new CompactionTableSchemaNotMatchException( - "current id column name is " - + currentIdColumn.getMeasurementName() - + ", other id column name in same position is " - + idColumnToMerge.getMeasurementName()); + return false; } } @@ -75,6 +71,7 @@ public class CompactionTableSchema extends TableSchema { columnCategories.add(ColumnCategory.TAG); measurementSchemas.add(newIdColumn); } + return true; } public CompactionTableSchema copy() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java index 002858d93cf..55640c3fcfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java @@ -28,8 +28,10 @@ import org.apache.tsfile.write.schema.Schema; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -39,13 +41,17 @@ public class CompactionTableSchemaCollector { public static List<Schema> collectSchema( List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, - Map<TsFileResource, TsFileSequenceReader> readerMap) + Map<TsFileResource, TsFileSequenceReader> readerMap, + Map<TsFileResource, Set<String>> deprecatedTableSchemaMap) throws IOException { List<Schema> targetSchemas = new ArrayList<>(seqFiles.size()); Schema schema = collectSchema( - Stream.concat(seqFiles.stream(), unseqFiles.stream()).collect(Collectors.toList()), - readerMap); + Stream.concat(seqFiles.stream(), unseqFiles.stream()) + .sorted(TsFileResource::compareFileName) + .collect(Collectors.toList()), + readerMap, + deprecatedTableSchemaMap); targetSchemas.add(schema); for (int i = 1; i < seqFiles.size(); i++) { @@ -64,11 +70,14 @@ public class CompactionTableSchemaCollector { } public static Schema collectSchema( - List<TsFileResource> sourceFiles, Map<TsFileResource, TsFileSequenceReader> readerMap) + List<TsFileResource> sourceFiles, + Map<TsFileResource, TsFileSequenceReader> readerMap, + Map<TsFileResource, Set<String>> deprecatedTableSchemaMap) throws IOException { Schema targetSchema = new Schema(); Map<String, TableSchema> targetTableSchemaMap = new HashMap<>(); - for (TsFileResource resource : sourceFiles) { + for (int i = 0; i < sourceFiles.size(); i++) { + TsFileResource resource = sourceFiles.get(i); TsFileSequenceReader reader = readerMap.get(resource); Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap(); if (tableSchemaMap == null) { @@ -89,7 +98,19 @@ public class CompactionTableSchemaCollector { collectedTableSchema = new CompactionTableSchema(tableName); targetTableSchemaMap.put(tableName, collectedTableSchema); } - collectedTableSchema.merge(currentTableSchema); + boolean canMerge = collectedTableSchema.merge(currentTableSchema); + if (!canMerge) { + // mark resources with deprecated table schema + for (int j = 0; j < i; j++) { + deprecatedTableSchemaMap + .computeIfAbsent(sourceFiles.get(j), k -> new HashSet<>()) + .add(tableName); + } + // replace old table schema in targetTableSchemaMap + collectedTableSchema = new CompactionTableSchema(tableName); + collectedTableSchema.merge(currentTableSchema); + targetTableSchemaMap.put(tableName, collectedTableSchema); + } } } targetTableSchemaMap.values().forEach(targetSchema::registerTableSchema); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 2a49d28bae3..e662e8ed452 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -57,6 +57,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -69,6 +70,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // sort from the oldest to the newest by version (Used by ReadChunkPerformer) private List<TsFileResource> tsFileResourcesSortedByAsc; private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>(); + private final Map<TsFileResource, Set<String>> deprecatedTableSchemaMap = new HashMap<>(); private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>(); private final Map<TsFileResource, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> modificationCache = new HashMap<>(); @@ -344,6 +346,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { // which means this tsfile does not contain the current device, then skip it. continue; } + if (isCurrentDeviceDataInDeprecatedTable(resource)) { + continue; + } TsFileSequenceReader reader = readerMap.get(resource); for (Map.Entry<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> entrySet : @@ -408,6 +413,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { if (!currentDevice.equals(iterator.current())) { continue; } + if (isCurrentDeviceDataInDeprecatedTable(tsFileResource)) { + continue; + } MetadataIndexNode firstMeasurementNodeOfCurrentDevice = iterator.getFirstMeasurementNodeOfCurrentDevice(); TsFileSequenceReader reader = readerMap.get(tsFileResource); @@ -479,6 +487,10 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { return readerMap; } + public Map<TsFileResource, Set<String>> getDeprecatedTableSchemaMap() { + return deprecatedTableSchemaMap; + } + @Override public void close() throws IOException { for (TsFileSequenceReader reader : readerMap.values()) { @@ -695,4 +707,15 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { return readerAndChunkMetadataForThisSeries; } } + + // skip data of deleted table + private boolean isCurrentDeviceDataInDeprecatedTable(TsFileResource resource) { + if (ignoreAllNullRows) { + return false; + } + String tableName = currentDevice.getLeft().getTableName(); + Set<String> deprecatedTablesInCurrentFile = deprecatedTableSchemaMap.get(resource); + return deprecatedTablesInCurrentFile != null + && deprecatedTablesInCurrentFile.contains(tableName); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java index 66602eb347e..778220d8263 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java @@ -32,6 +32,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -41,6 +42,8 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Arrays; +import java.util.Collections; +import java.util.List; public class TableModelFastCompactionPerformerTest extends AbstractCompactionTest { @@ -63,8 +66,8 @@ public class TableModelFastCompactionPerformerTest extends AbstractCompactionTes new CompactionTableModelTestFileWriter(resource1)) { writer.registerTableSchema("t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -76,13 +79,8 @@ public class TableModelFastCompactionPerformerTest extends AbstractCompactionTes new CompactionTableModelTestFileWriter(resource2)) { writer.registerTableSchema("t2", Arrays.asList("id1", "id2", "id3")); writer.startChunkGroup("t2", Arrays.asList("id_field1", "id_field2", "id_field3")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", - new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, - TSEncoding.PLAIN, - CompressionType.LZ4); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s2", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -95,13 +93,8 @@ public class TableModelFastCompactionPerformerTest extends AbstractCompactionTes new CompactionTableModelTestFileWriter(resource3)) { writer.registerTableSchema("t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", - new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, - TSEncoding.PLAIN, - CompressionType.LZ4); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s2", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -113,13 +106,8 @@ public class TableModelFastCompactionPerformerTest extends AbstractCompactionTes new CompactionTableModelTestFileWriter(resource4)) { writer.registerTableSchema("t2", Arrays.asList("id1", "id2", "id3")); writer.startChunkGroup("t2", Arrays.asList("id_field1", "id_field2", "id_field3")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", - new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, - TSEncoding.PLAIN, - CompressionType.LZ4); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s2", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -172,24 +160,24 @@ public class TableModelFastCompactionPerformerTest extends AbstractCompactionTes new CompactionTableModelTestFileWriter(resource2)) { writer.registerTableSchema("db1.t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("db1.t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); writer.endChunkGroup(); writer.startChunkGroup("d1"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); writer.endChunkGroup(); writer.startChunkGroup("node1.node2.device"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -218,16 +206,16 @@ public class TableModelFastCompactionPerformerTest extends AbstractCompactionTes new CompactionTableModelTestFileWriter(resource1)) { writer.registerTableSchema("db1.t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("db1.t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); writer.endChunkGroup(); writer.startChunkGroup("d1"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -248,4 +236,75 @@ public class TableModelFastCompactionPerformerTest extends AbstractCompactionTes Assert.assertEquals(1, reader.getTableSchemaMap().size()); } } + + @Test + public void testCrossSpaceCompactionOfTableModelCanNotMatchTableSchema() throws IOException { + TsFileResource resource1 = createEmptyFileAndResource(true); + try (CompactionTableModelTestFileWriter writer = + new CompactionTableModelTestFileWriter(resource1)) { + writer.registerTableSchema("t1", Arrays.asList("id1", "id2")); + writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2")); + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), + new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, + TSEncoding.PLAIN, + CompressionType.LZ4); + writer.endChunkGroup(); + writer.endFile(); + } + TsFileResource resource2 = createEmptyFileAndResource(false); + try (CompactionTableModelTestFileWriter writer = + new CompactionTableModelTestFileWriter(resource2)) { + writer.registerTableSchema("t1", Arrays.asList("id1", "id3")); + writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3")); + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), + new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(1, 2)}}}, + TSEncoding.PLAIN, + CompressionType.LZ4); + writer.endChunkGroup(); + writer.endFile(); + } + TsFileResource resource3 = createEmptyFileAndResource(true); + try (CompactionTableModelTestFileWriter writer = + new CompactionTableModelTestFileWriter(resource3)) { + writer.registerTableSchema("t1", Arrays.asList("id1", "id4")); + writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3")); + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), + new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(100, 200)}}}, + TSEncoding.PLAIN, + CompressionType.LZ4); + writer.endChunkGroup(); + writer.endFile(); + } + seqResources.add(resource1); + unseqResources.add(resource2); + seqResources.add(resource3); + tsFileManager.addAll(seqResources, true); + + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + 0, + 0); + Assert.assertTrue(task.start()); + TsFileResource targetResource = tsFileManager.getTsFileList(true).get(0); + Assert.assertEquals(1, targetResource.getDevices().size()); + try (TsFileSequenceReader reader = + new TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) { + Assert.assertEquals(1, reader.getTableSchemaMap().size()); + Assert.assertTrue(reader.getTableSchemaMap().containsKey("t1")); + List<IMeasurementSchema> tableSchema = + reader.getTableSchemaMap().get("t1").getColumnSchemas(); + Assert.assertEquals("id1", tableSchema.get(0).getMeasurementName()); + Assert.assertEquals("id4", tableSchema.get(1).getMeasurementName()); + Assert.assertEquals("s1", tableSchema.get(2).getMeasurementName()); + Assert.assertEquals("s2", tableSchema.get(3).getMeasurementName()); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java index 7c175a16c84..f2721652f99 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java @@ -36,6 +36,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,6 +46,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -78,8 +80,8 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti new CompactionTableModelTestFileWriter(resource1)) { writer.registerTableSchema("t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -91,13 +93,8 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti new CompactionTableModelTestFileWriter(resource2)) { writer.registerTableSchema("t1", Arrays.asList("id1", "id2", "id3")); writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2", "id_field3")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", - new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, - TSEncoding.PLAIN, - CompressionType.LZ4); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s2", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -128,8 +125,8 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti TsFileResource resource1 = createEmptyFileAndResource(true); try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource1)) { writer.startChunkGroup("d1"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -139,13 +136,8 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti TsFileResource resource2 = createEmptyFileAndResource(true); try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource2)) { writer.startChunkGroup("d1"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", - new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, - TSEncoding.PLAIN, - CompressionType.LZ4); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s2", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -168,7 +160,7 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti reader.getAllTimeseriesMetadata(true); for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry : allTimeseriesMetadata.entrySet()) { - Assert.assertEquals(2, entry.getValue().size()); + Assert.assertEquals(3, entry.getValue().size()); } } } @@ -178,8 +170,8 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti TsFileResource resource1 = createEmptyFileAndResource(true); try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource1)) { writer.startChunkGroup("d1"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -191,8 +183,8 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti new CompactionTableModelTestFileWriter(resource2)) { writer.registerTableSchema("t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -215,15 +207,15 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti } @Test - public void testSequenceInnerSpaceCompactionOfTwoV4TreeModelCanNotMatchTableSchema() + public void testSequenceInnerSpaceCompactionOfTableModelCanNotMatchTableSchema() throws IOException { TsFileResource resource1 = createEmptyFileAndResource(true); try (CompactionTableModelTestFileWriter writer = new CompactionTableModelTestFileWriter(resource1)) { writer.registerTableSchema("t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); @@ -235,14 +227,22 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti new CompactionTableModelTestFileWriter(resource2)) { writer.registerTableSchema("t1", Arrays.asList("id1", "id3")); writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, TSEncoding.PLAIN, CompressionType.LZ4); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s2", - new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(20, 22)}}}, + writer.endChunkGroup(); + writer.endFile(); + } + TsFileResource resource3 = createEmptyFileAndResource(true); + try (CompactionTableModelTestFileWriter writer = + new CompactionTableModelTestFileWriter(resource3)) { + writer.registerTableSchema("t1", Arrays.asList("id1", "id4")); + writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3")); + writer.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s1", "s2"), + new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(100, 200)}}}, TSEncoding.PLAIN, CompressionType.LZ4); writer.endChunkGroup(); @@ -250,12 +250,26 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti } seqResources.add(resource1); seqResources.add(resource2); + seqResources.add(resource3); tsFileManager.addAll(seqResources, true); InnerSpaceCompactionTask task = new InnerSpaceCompactionTask( 0, tsFileManager, seqResources, true, new ReadChunkCompactionPerformer(), 0); - Assert.assertFalse(task.start()); + Assert.assertTrue(task.start()); + TsFileResource targetResource = tsFileManager.getTsFileList(true).get(0); + Assert.assertEquals(1, targetResource.getDevices().size()); + try (TsFileSequenceReader reader = + new TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) { + Assert.assertEquals(1, reader.getTableSchemaMap().size()); + Assert.assertTrue(reader.getTableSchemaMap().containsKey("t1")); + List<IMeasurementSchema> tableSchema = + reader.getTableSchemaMap().get("t1").getColumnSchemas(); + Assert.assertEquals("id1", tableSchema.get(0).getMeasurementName()); + Assert.assertEquals("id4", tableSchema.get(1).getMeasurementName()); + Assert.assertEquals("s1", tableSchema.get(2).getMeasurementName()); + Assert.assertEquals("s2", tableSchema.get(3).getMeasurementName()); + } } @Test @@ -275,32 +289,32 @@ public class TableModelReadChunkCompactionPerformerTest extends AbstractCompacti new CompactionTableModelTestFileWriter(resource2)) { writer.registerTableSchema("db1.t1", Arrays.asList("id1", "id2")); writer.startChunkGroup("db1.t1", Arrays.asList("id_field1", "id_field2")); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); writer.endChunkGroup(); writer.startChunkGroup("d3"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); writer.endChunkGroup(); writer.startChunkGroup("node1.node2.device"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4); writer.endChunkGroup(); writer.startChunkGroup("node1.node2.node3.device"); - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s1", + writer.generateSimpleAlignedSeriesToCurrentDevice( + Collections.singletonList("s1"), new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new TimeRange(10, 12)}}}, TSEncoding.PLAIN, CompressionType.LZ4);
