This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch fast_performer_force_decoding_rel12 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c52e8c8fcd1fb6fb6bb1f3cb6f65395997d8901d Author: shuwenwei <[email protected]> AuthorDate: Thu Jul 27 20:17:37 2023 +0800 Fast performer force decoding (#10705) --- .../executor/fast/SeriesCompactionExecutor.java | 4 +- ...InconsistentCompressionTypeAndEncodingTest.java | 644 +++++++++++++++++++++ .../compaction/utils/CompactionTestFileWriter.java | 219 +++++++ .../utils/CompactionTestFileWriterTest.java | 180 ++++++ 4 files changed, 1046 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index 1290287c0d0..33a401c67ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -347,7 +347,9 @@ public abstract class SeriesCompactionExecutor { currentPoint.getTimestamp() <= nextPageElement.pageHeader.getEndTime() || nextPageElement.pageHeader.getEndTime() >= nextPageStartTime || nextPageElement.pageHeader.getEndTime() >= nextChunkStartTime; - if (isNextPageOverlap || nextPageModifiedStatus == ModifiedStatus.PARTIAL_DELETED) { + if (isNextPageOverlap + || nextPageModifiedStatus == ModifiedStatus.PARTIAL_DELETED + || nextPageElement.needForceDecoding) { // next page is overlapped or modified, then deserialize it summary.pageOverlapOrModified++; pointPriorityReader.addNewPage(nextPageElement); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCompactionPerformerWithInconsistentCompressionTypeAndEncodingTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCompactionPerformerWithInconsistentCompressionTypeAndEncodingTest.java new file mode 100644 index 00000000000..9ffcb65a665 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCompactionPerformerWithInconsistentCompressionTypeAndEncodingTest.java @@ -0,0 +1,644 @@ +package org.apache.iotdb.db.storageengine.dataregion.compaction; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class FastCompactionPerformerWithInconsistentCompressionTypeAndEncodingTest + extends AbstractCompactionTest { + + @Before + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + IoTDBDescriptor.getInstance().getConfig().setMinCrossCompactionUnseqFileLevel(0); + super.setUp(); + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + } + + @Test + public void test1() throws MetadataException, IOException, WriteProcessException { + + TsFileResource seqResource1 = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + true); + seqResources.add(seqResource1); + + TsFileResource seqResource2 = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + seqResources.add(seqResource2); + + tsFileManager.addAll(seqResources, true); + + TsFileResource unseqResource = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[] {new TimeRange(210000, 290000), new TimeRange(710000, 890000)}, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, + true); + unseqResources.add(unseqResource); + + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + new AtomicInteger(0), + 0, + 0); + + Assert.assertTrue(task.start()); + TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0); + TsFileSequenceReader reader = new TsFileSequenceReader(targetFile.getTsFilePath()); + validateSingleTsFileWithNonAlignedSeries(reader); + } + + @Test + public void test2() throws MetadataException, IOException, WriteProcessException { + + TimeRange[] file1Chunk1 = + new TimeRange[] { + new TimeRange(10000, 15000), new TimeRange(16000, 19000), new TimeRange(20000, 29000) + }; + TimeRange[] file1Chunk2 = + new TimeRange[] { + new TimeRange(30000, 35000), new TimeRange(36000, 39000), new TimeRange(40000, 49000) + }; + + TsFileResource seqResource1 = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[][] {file1Chunk1, file1Chunk2}, + TSEncoding.PLAIN, + CompressionType.LZ4, + true); + seqResources.add(seqResource1); + + TimeRange[] file2Chunk1 = + new TimeRange[] { + new TimeRange(50000, 55000), new TimeRange(56000, 59000), new TimeRange(60000, 69000) + }; + TimeRange[] file2Chunk2 = + new TimeRange[] { + new TimeRange(70000, 75000), new TimeRange(76000, 79000), new TimeRange(180000, 189000) + }; + TsFileResource seqResource2 = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[][] {file2Chunk1, file2Chunk2}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + seqResources.add(seqResource2); + + tsFileManager.addAll(seqResources, true); + + TimeRange[] unseqFileChunk1 = + new TimeRange[] { + new TimeRange(1000, 5000), new TimeRange(96000, 99000), new TimeRange(100000, 110000) + }; + TimeRange[] unseqFileChunk2 = + new TimeRange[] { + new TimeRange(120000, 130000), + new TimeRange(136000, 149000), + new TimeRange(200000, 210000) + }; + TsFileResource unseqResource = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[][] {unseqFileChunk1, unseqFileChunk2}, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, + true); + unseqResources.add(unseqResource); + + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + new AtomicInteger(0), + 0, + 0); + + Assert.assertTrue(task.start()); + TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0); + TsFileSequenceReader reader = new TsFileSequenceReader(targetFile.getTsFilePath()); + validateSingleTsFileWithNonAlignedSeries(reader); + } + + @Test + public void test3() throws MetadataException, IOException, WriteProcessException { + + TimeRange[] file1Chunk1Page1 = + new TimeRange[] {new TimeRange(10000, 12000), new TimeRange(16000, 19000)}; + TimeRange[] file1Chunk1Page2 = + new TimeRange[] {new TimeRange(30000, 35000), new TimeRange(36000, 39000)}; + + TsFileResource seqResource1 = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[][][] {new TimeRange[][] {file1Chunk1Page1, file1Chunk1Page2}}, + TSEncoding.PLAIN, + CompressionType.LZ4, + true); + seqResources.add(seqResource1); + + TimeRange[] file2Chunk1Page1 = + new TimeRange[] { + new TimeRange(50000, 55000), new TimeRange(56000, 59000), new TimeRange(68000, 69000) + }; + TimeRange[] file2Chunk1Page2 = + new TimeRange[] { + new TimeRange(70000, 75000), new TimeRange(76000, 79000), new TimeRange(180000, 181000) + }; + TsFileResource seqResource2 = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[][][] {new TimeRange[][] {file2Chunk1Page1, file2Chunk1Page2}}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + seqResources.add(seqResource2); + + tsFileManager.addAll(seqResources, true); + + TimeRange[] unseqFileChunk1 = + new TimeRange[] { + new TimeRange(13000, 15000), new TimeRange(96000, 99000), new TimeRange(100000, 101000) + }; + TimeRange[] unseqFileChunk2 = + new TimeRange[] { + new TimeRange(120000, 123000), + new TimeRange(136000, 139000), + new TimeRange(200000, 201000) + }; + TsFileResource unseqResource = + generateSingleNonAlignedSeriesFile( + "d0", + "s0", + new TimeRange[][] {unseqFileChunk1, unseqFileChunk2}, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, + true); + unseqResources.add(unseqResource); + + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + new AtomicInteger(0), + 0, + 0); + + Assert.assertTrue(task.start()); + TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0); + TsFileSequenceReader reader = new TsFileSequenceReader(targetFile.getTsFilePath()); + // validateSingleTsFileWithNonAlignedSeries(reader); + } + + @Test + public void test4() throws MetadataException, IOException, WriteProcessException { + + TsFileResource seqResource1 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(100000, 200000), new TimeRange(300000, 500000)}, + TSEncoding.PLAIN, + CompressionType.LZ4, + true); + seqResources.add(seqResource1); + + TsFileResource seqResource2 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(600000, 700000), new TimeRange(800000, 900000)}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + seqResources.add(seqResource2); + + tsFileManager.addAll(seqResources, true); + + TsFileResource unseqResource = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(210000, 290000), new TimeRange(710000, 890000)}, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, + true); + unseqResources.add(unseqResource); + + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + new AtomicInteger(0), + 0, + 0); + + Assert.assertTrue(task.start()); + TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0); + TsFileSequenceReader reader = new TsFileSequenceReader(targetFile.getTsFilePath()); + // validateSingleTsFileWithNonAlignedSeries(reader); + } + + @Test + public void test5() throws MetadataException, IOException, WriteProcessException { + + TimeRange[] file1Chunk1 = + new TimeRange[] { + new TimeRange(10000, 15000), new TimeRange(16000, 19000), new TimeRange(20000, 29000) + }; + TimeRange[] file1Chunk2 = + new TimeRange[] { + new TimeRange(30000, 35000), new TimeRange(36000, 39000), new TimeRange(40000, 49000) + }; + + TsFileResource seqResource1 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][] {file1Chunk1, file1Chunk2}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + seqResources.add(seqResource1); + + TimeRange[] file2Chunk1 = + new TimeRange[] { + new TimeRange(50000, 55000), new TimeRange(56000, 59000), new TimeRange(60000, 69000) + }; + TimeRange[] file2Chunk2 = + new TimeRange[] { + new TimeRange(70000, 75000), new TimeRange(76000, 79000), new TimeRange(180000, 189000) + }; + TsFileResource seqResource2 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][] {file2Chunk1, file2Chunk2}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + seqResources.add(seqResource2); + + tsFileManager.addAll(seqResources, true); + + TimeRange[] unseqFileChunk1 = + new TimeRange[] { + new TimeRange(1000, 5000), new TimeRange(96000, 99000), new TimeRange(100000, 110000) + }; + TimeRange[] unseqFileChunk2 = + new TimeRange[] { + new TimeRange(120000, 130000), + new TimeRange(136000, 149000), + new TimeRange(200000, 210000) + }; + TsFileResource unseqResource = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][] {unseqFileChunk1, unseqFileChunk2}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + unseqResources.add(unseqResource); + + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + new AtomicInteger(0), + 0, + 0); + + Assert.assertTrue(task.start()); + TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0); + TsFileSequenceReader reader = new TsFileSequenceReader(targetFile.getTsFilePath()); + // validateSingleTsFileWithNonAlignedSeries(reader); + } + + @Test + public void test6() throws MetadataException, IOException, WriteProcessException { + + TimeRange[] file1Chunk1Page1 = + new TimeRange[] {new TimeRange(10000, 12000), new TimeRange(16000, 19000)}; + TimeRange[] file1Chunk1Page2 = + new TimeRange[] {new TimeRange(30000, 35000), new TimeRange(36000, 39000)}; + + TsFileResource seqResource1 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][][] {new TimeRange[][] {file1Chunk1Page1, file1Chunk1Page2}}, + TSEncoding.PLAIN, + CompressionType.LZ4, + true); + seqResources.add(seqResource1); + + TimeRange[] file2Chunk1Page1 = + new TimeRange[] { + new TimeRange(50000, 55000), new TimeRange(56000, 59000), new TimeRange(68000, 69000) + }; + TimeRange[] file2Chunk1Page2 = + new TimeRange[] { + new TimeRange(70000, 75000), new TimeRange(76000, 79000), new TimeRange(180000, 181000) + }; + TsFileResource seqResource2 = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][][] {new TimeRange[][] {file2Chunk1Page1, file2Chunk1Page2}}, + TSEncoding.PLAIN, + CompressionType.SNAPPY, + true); + seqResources.add(seqResource2); + + tsFileManager.addAll(seqResources, true); + + TimeRange[] unseqFileChunk1 = + new TimeRange[] { + new TimeRange(13000, 15000), new TimeRange(96000, 99000), new TimeRange(100000, 101000) + }; + TimeRange[] unseqFileChunk2 = + new TimeRange[] { + new TimeRange(120000, 123000), + new TimeRange(136000, 139000), + new TimeRange(200000, 201000) + }; + TsFileResource unseqResource = + generateSingleAlignedSeriesFile( + "d0", + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][] {unseqFileChunk1, unseqFileChunk2}, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED, + true); + unseqResources.add(unseqResource); + + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + new AtomicInteger(0), + 0, + 0); + + Assert.assertTrue(task.start()); + TsFileResource targetFile = tsFileManager.getTsFileList(true).get(0); + TsFileSequenceReader reader = new TsFileSequenceReader(targetFile.getTsFilePath()); + validateSingleTsFileWithAlignedSeries(reader); + } + + private TsFileResource generateSingleNonAlignedSeriesFile( + String device, + String measurement, + TimeRange[] chunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType, + boolean isSeq) + throws IOException { + TsFileResource seqResource1 = createEmptyFileAndResource(isSeq); + CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1); + writer1.startChunkGroup(device); + writer1.generateSimpleNonAlignedSeriesToCurrentDevice( + measurement, chunkTimeRanges, encoding, compressionType); + writer1.endChunkGroup(); + writer1.endFile(); + writer1.close(); + return seqResource1; + } + + private TsFileResource generateSingleNonAlignedSeriesFile( + String device, + String measurement, + TimeRange[][] chunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType, + boolean isSeq) + throws IOException { + TsFileResource seqResource1 = createEmptyFileAndResource(isSeq); + CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1); + writer1.startChunkGroup(device); + writer1.generateSimpleNonAlignedSeriesToCurrentDevice( + measurement, chunkTimeRanges, encoding, compressionType); + writer1.endChunkGroup(); + writer1.endFile(); + writer1.close(); + return seqResource1; + } + + private TsFileResource generateSingleNonAlignedSeriesFile( + String device, + String measurement, + TimeRange[][][] chunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType, + boolean isSeq) + throws IOException { + TsFileResource seqResource1 = createEmptyFileAndResource(isSeq); + CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1); + writer1.startChunkGroup(device); + writer1.generateSimpleNonAlignedSeriesToCurrentDevice( + measurement, chunkTimeRanges, encoding, compressionType); + writer1.endChunkGroup(); + writer1.endFile(); + writer1.close(); + return seqResource1; + } + + private TsFileResource generateSingleAlignedSeriesFile( + String device, + List<String> measurement, + TimeRange[] chunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType, + boolean isSeq) + throws IOException { + TsFileResource seqResource1 = createEmptyFileAndResource(isSeq); + CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1); + writer1.startChunkGroup(device); + writer1.generateSimpleAlignedSeriesToCurrentDevice( + measurement, chunkTimeRanges, encoding, compressionType); + writer1.endChunkGroup(); + writer1.endFile(); + writer1.close(); + return seqResource1; + } + + private TsFileResource generateSingleAlignedSeriesFile( + String device, + List<String> measurement, + TimeRange[][] chunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType, + boolean isSeq) + throws IOException { + TsFileResource seqResource1 = createEmptyFileAndResource(isSeq); + CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1); + writer1.startChunkGroup(device); + writer1.generateSimpleAlignedSeriesToCurrentDevice( + measurement, chunkTimeRanges, encoding, compressionType); + writer1.endChunkGroup(); + writer1.endFile(); + writer1.close(); + return seqResource1; + } + + private TsFileResource generateSingleAlignedSeriesFile( + String device, + List<String> measurement, + TimeRange[][][] chunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType, + boolean isSeq) + throws IOException { + TsFileResource seqResource1 = createEmptyFileAndResource(isSeq); + CompactionTestFileWriter writer1 = new CompactionTestFileWriter(seqResource1); + writer1.startChunkGroup(device); + writer1.generateSimpleAlignedSeriesToCurrentDevice( + measurement, chunkTimeRanges, encoding, compressionType); + writer1.endChunkGroup(); + writer1.endFile(); + writer1.close(); + return seqResource1; + } + + private void validateSingleTsFileWithNonAlignedSeries(TsFileSequenceReader reader) + throws IOException { + Map<String, CompressionType> compressionTypeMap = new HashMap<>(); + for (String device : reader.getAllDevices()) { + Map<String, List<ChunkMetadata>> seriesMetaData = reader.readChunkMetadataInDevice(device); + for (Map.Entry<String, List<ChunkMetadata>> entry : seriesMetaData.entrySet()) { + String series = entry.getKey(); + List<ChunkMetadata> chunkMetadataList = entry.getValue(); + for (ChunkMetadata chunkMetadata : chunkMetadataList) { + Chunk chunk = reader.readMemChunk(chunkMetadata); + ChunkReader chunkReader = new ChunkReader(chunk); + ChunkHeader chunkHeader = chunk.getHeader(); + ByteBuffer chunkDataBuffer = chunk.getData(); + if (!compressionTypeMap.containsKey(series)) { + compressionTypeMap.put(series, chunkHeader.getCompressionType()); + } else if (!compressionTypeMap.get(series).equals(chunkHeader.getCompressionType())) { + Assert.fail(); + } + while (chunkDataBuffer.remaining() > 0) { + PageHeader pageHeader; + if (((byte) (chunkHeader.getChunkType() & 0x3F)) + == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) { + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunk.getChunkStatistic()); + } else { + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); + } + chunkReader.readPageData( + pageHeader, chunkReader.readPageDataWithoutUncompressing(pageHeader)); + } + } + } + } + } + + private void validateSingleTsFileWithAlignedSeries(TsFileSequenceReader reader) + throws IOException { + Map<String, CompressionType> compressionTypeMap = new HashMap<>(); + for (String device : reader.getAllDevices()) { + List<AlignedChunkMetadata> alignedChunkMetadataList = reader.getAlignedChunkMetadata(device); + for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { + IChunkMetadata timeChunkMetadata = alignedChunkMetadata.getTimeChunkMetadata(); + List<IChunkMetadata> valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata); + if (!compressionTypeMap.containsKey("time")) { + compressionTypeMap.put("time", timeChunk.getHeader().getCompressionType()); + } else if (!compressionTypeMap + .get("time") + .equals(timeChunk.getHeader().getCompressionType())) { + Assert.fail(); + } + for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { + Chunk valueChunk = reader.readMemChunk((ChunkMetadata) chunkMetadata); + if (!compressionTypeMap.containsKey(valueChunk.getHeader().getMeasurementID())) { + compressionTypeMap.put( + valueChunk.getHeader().getMeasurementID(), + valueChunk.getHeader().getCompressionType()); + } else if (!compressionTypeMap + .get(valueChunk.getHeader().getMeasurementID()) + .equals(valueChunk.getHeader().getCompressionType())) { + Assert.fail(); + } + } + } + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTestFileWriter.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTestFileWriter.java new file mode 100644 index 00000000000..e4606cb6c6a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTestFileWriter.java @@ -0,0 +1,219 @@ +package org.apache.iotdb.db.storageengine.dataregion.compaction.utils; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.page.PageWriter; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class CompactionTestFileWriter { + + private TsFileResource resource; + private TsFileIOWriter fileWriter; + private static final String SG_NAME = "root.testsg"; + private String currentDeviceId; + private long currentDeviceStartTime; + private long currentDeviceEndTime; + + public CompactionTestFileWriter(TsFileResource emptyFile) throws IOException { + this.resource = emptyFile; + fileWriter = new TsFileIOWriter(emptyFile.getTsFile()); + } + + public String startChunkGroup(String deviceName) throws IOException { + currentDeviceId = SG_NAME + "." + deviceName; + fileWriter.startChunkGroup(currentDeviceId); + currentDeviceStartTime = Long.MAX_VALUE; + currentDeviceEndTime = Long.MIN_VALUE; + return currentDeviceId; + } + + public void endChunkGroup() throws IOException { + resource.updateStartTime(currentDeviceId, currentDeviceStartTime); + resource.updateEndTime(currentDeviceId, currentDeviceEndTime); + fileWriter.endChunkGroup(); + } + + public void endFile() throws IOException { + fileWriter.endFile(); + resource.serialize(); + } + + public void close() throws IOException { + fileWriter.close(); + } + + public void generateSimpleNonAlignedSeriesToCurrentDevice( + String measurementName, + TimeRange[] toGenerateChunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType) + throws IOException { + MeasurementSchema schema = + new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType); + for (TimeRange timeRange : toGenerateChunkTimeRanges) { + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema); + currentDeviceStartTime = Math.min(timeRange.getMin(), currentDeviceStartTime); + currentDeviceEndTime = Math.max(timeRange.getMax(), currentDeviceEndTime); + for (long time = timeRange.getMin(); time <= timeRange.getMax(); time++) { + chunkWriter.write(time, new Random().nextInt()); + } + chunkWriter.sealCurrentPage(); + chunkWriter.writeToFileWriter(fileWriter); + } + } + + public void generateSimpleNonAlignedSeriesToCurrentDevice( + String measurementName, + TimeRange[][] toGenerateChunkPageTimeRanges, + TSEncoding encoding, + CompressionType compressionType) + throws IOException { + MeasurementSchema schema = + new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType); + for (TimeRange[] toGenerateChunk : toGenerateChunkPageTimeRanges) { + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema); + for (TimeRange toGeneratePage : toGenerateChunk) { + PageWriter pageWriter = chunkWriter.getPageWriter(); + currentDeviceStartTime = Math.min(toGeneratePage.getMin(), currentDeviceStartTime); + currentDeviceEndTime = Math.max(toGeneratePage.getMax(), currentDeviceEndTime); + for (long time = toGeneratePage.getMin(); time <= toGeneratePage.getMax(); time++) { + pageWriter.write(time, new Random().nextInt()); + } + chunkWriter.sealCurrentPage(); + } + chunkWriter.writeToFileWriter(fileWriter); + } + } + + public void generateSimpleNonAlignedSeriesToCurrentDevice( + String measurementName, + TimeRange[][][] toGenerateChunkPagePointsTimeRanges, + TSEncoding encoding, + CompressionType compressionType) + throws IOException { + MeasurementSchema schema = + new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType); + for (TimeRange[][] toGenerateChunk : toGenerateChunkPagePointsTimeRanges) { + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema); + for (TimeRange[] toGeneratePage : toGenerateChunk) { + PageWriter pageWriter = chunkWriter.getPageWriter(); + for (TimeRange pagePointTimeRange : toGeneratePage) { + currentDeviceStartTime = Math.min(pagePointTimeRange.getMin(), currentDeviceStartTime); + currentDeviceEndTime = Math.max(pagePointTimeRange.getMax(), currentDeviceEndTime); + for (long time = pagePointTimeRange.getMin(); + time <= pagePointTimeRange.getMax(); + time++) { + pageWriter.write(time, new Random().nextInt()); + } + } + chunkWriter.sealCurrentPage(); + } + chunkWriter.writeToFileWriter(fileWriter); + } + } + + public void generateSimpleAlignedSeriesToCurrentDevice( + List<String> measurementNames, + TimeRange[] toGenerateChunkTimeRanges, + TSEncoding encoding, + CompressionType compressionType) + throws IOException { + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); + for (String measurementName : measurementNames) { + measurementSchemas.add( + new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType)); + } + for (TimeRange toGenerateChunk : toGenerateChunkTimeRanges) { + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas); + currentDeviceStartTime = Math.min(toGenerateChunk.getMin(), currentDeviceStartTime); + currentDeviceEndTime = Math.max(toGenerateChunk.getMax(), currentDeviceEndTime); + for (long time = toGenerateChunk.getMin(); time <= toGenerateChunk.getMax(); time++) { + alignedChunkWriter.getTimeChunkWriter().write(time); + for (int i = 0; i < measurementNames.size(); i++) { + alignedChunkWriter + .getValueChunkWriterByIndex(i) + .write(time, new Random().nextInt(), false); + } + } + alignedChunkWriter.writeToFileWriter(fileWriter); + } + } + + public void generateSimpleAlignedSeriesToCurrentDevice( + List<String> measurementNames, + TimeRange[][] toGenerateChunkPageTimeRanges, + TSEncoding encoding, + CompressionType compressionType) + throws IOException { + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); + for (String measurementName : measurementNames) { + measurementSchemas.add( + new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType)); + } + for (TimeRange[] toGenerateChunk : toGenerateChunkPageTimeRanges) { + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas); + for (TimeRange toGeneratePageTimeRange : toGenerateChunk) { + currentDeviceStartTime = Math.min(toGeneratePageTimeRange.getMin(), currentDeviceStartTime); + currentDeviceEndTime = Math.max(toGeneratePageTimeRange.getMax(), currentDeviceEndTime); + for (long time = toGeneratePageTimeRange.getMin(); + time <= toGeneratePageTimeRange.getMax(); + time++) { + alignedChunkWriter.write(time); + for (int i = 0; i < measurementNames.size(); i++) { + alignedChunkWriter + .getValueChunkWriterByIndex(i) + .getPageWriter() + .write(time, new Random().nextInt(), false); + } + } + alignedChunkWriter.sealCurrentPage(); + } + alignedChunkWriter.writeToFileWriter(fileWriter); + } + } + + public void generateSimpleAlignedSeriesToCurrentDevice( + List<String> measurementNames, + TimeRange[][][] toGenerateChunkPageTimeRanges, + TSEncoding encoding, + CompressionType compressionType) + throws IOException { + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); + for (String measurementName : measurementNames) { + measurementSchemas.add( + new MeasurementSchema(measurementName, TSDataType.INT32, encoding, compressionType)); + } + for (TimeRange[][] toGenerateChunk : toGenerateChunkPageTimeRanges) { + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemas); + for (TimeRange[] toGeneratePageTimeRanges : toGenerateChunk) { + for (TimeRange pointsTimeRange : toGeneratePageTimeRanges) { + currentDeviceStartTime = Math.min(pointsTimeRange.getMin(), currentDeviceStartTime); + currentDeviceEndTime = Math.max(pointsTimeRange.getMax(), currentDeviceEndTime); + for (long time = pointsTimeRange.getMin(); time <= pointsTimeRange.getMax(); time++) { + alignedChunkWriter.write(time); + for (int i = 0; i < measurementNames.size(); i++) { + alignedChunkWriter + .getValueChunkWriterByIndex(i) + .getPageWriter() + .write(time, new Random().nextInt(), false); + } + } + } + alignedChunkWriter.sealCurrentPage(); + } + alignedChunkWriter.writeToFileWriter(fileWriter); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTestFileWriterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTestFileWriterTest.java new file mode 100644 index 00000000000..cb996e37217 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTestFileWriterTest.java @@ -0,0 +1,180 @@ +package org.apache.iotdb.db.storageengine.dataregion.compaction.utils; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.TimeRange; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class CompactionTestFileWriterTest extends AbstractCompactionTest { + @Before + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + super.setUp(); + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + } + + @Test + public void testGenerateNonAlignedSeriesWithChunk() { + TsFileResource seqResource1 = createEmptyFileAndResource(true); + try { + CompactionTestFileWriter fileWriter = new CompactionTestFileWriter(seqResource1); + fileWriter.startChunkGroup("d0"); + fileWriter.generateSimpleNonAlignedSeriesToCurrentDevice( + "s0", + new TimeRange[] {new TimeRange(1000, 3000), new TimeRange(4000, 6000)}, + TSEncoding.PLAIN, + CompressionType.LZ4); + fileWriter.generateSimpleNonAlignedSeriesToCurrentDevice( + "s0", + new TimeRange[] {new TimeRange(7000, 9000), new TimeRange(14000, 16000)}, + TSEncoding.PLAIN, + CompressionType.SNAPPY); + fileWriter.endChunkGroup(); + fileWriter.endFile(); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGenerateNonAlignedSeriesWithPage() throws IOException { + try { + TsFileResource seqResource1 = createEmptyFileAndResource(true); + CompactionTestFileWriter fileWriter = new CompactionTestFileWriter(seqResource1); + fileWriter.startChunkGroup("d0"); + TimeRange[] chunk1PageRanges = + new TimeRange[] {new TimeRange(1000, 1500), new TimeRange(2000, 2500)}; + TimeRange[] chunk2PageRanges = + new TimeRange[] {new TimeRange(3000, 3500), new TimeRange(5000, 5500)}; + + fileWriter.generateSimpleNonAlignedSeriesToCurrentDevice( + "s0", + new TimeRange[][] {chunk1PageRanges, chunk2PageRanges}, + TSEncoding.PLAIN, + CompressionType.LZ4); + fileWriter.endChunkGroup(); + fileWriter.endFile(); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGenerateNonAlignedSeriesWithPoints() throws IOException { + try { + TsFileResource seqResource1 = createEmptyFileAndResource(true); + CompactionTestFileWriter fileWriter = new CompactionTestFileWriter(seqResource1); + fileWriter.startChunkGroup("d0"); + TimeRange[] chunk1Page1Points = + new TimeRange[] {new TimeRange(1000, 1500), new TimeRange(2000, 2500)}; + TimeRange[] chunk1Page2Points = + new TimeRange[] {new TimeRange(3000, 3500), new TimeRange(5000, 5500)}; + TimeRange[][] chunk1 = new TimeRange[][] {chunk1Page1Points, chunk1Page2Points}; + TimeRange[] chunk2Page1Points = + new TimeRange[] {new TimeRange(6000, 6500), new TimeRange(7000, 7500)}; + TimeRange[] chunk2Page2Points = + new TimeRange[] {new TimeRange(8000, 8500), new TimeRange(9000, 9500)}; + TimeRange[][] chunk2 = new TimeRange[][] {chunk2Page1Points, chunk2Page2Points}; + + fileWriter.generateSimpleNonAlignedSeriesToCurrentDevice( + "s0", new TimeRange[][][] {chunk1, chunk2}, TSEncoding.PLAIN, CompressionType.LZ4); + fileWriter.endChunkGroup(); + fileWriter.endFile(); + System.out.println(seqResource1.getTsFile().getAbsolutePath()); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGenerateAlignedSeriesWithChunk() throws IOException { + try { + TsFileResource seqResource1 = createEmptyFileAndResource(true); + CompactionTestFileWriter fileWriter = new CompactionTestFileWriter(seqResource1); + fileWriter.startChunkGroup("d0"); + fileWriter.generateSimpleNonAlignedSeriesToCurrentDevice( + "s0", + new TimeRange[] {new TimeRange(1000, 3000), new TimeRange(4000, 6000)}, + TSEncoding.PLAIN, + CompressionType.LZ4); + fileWriter.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s0", "s1", "s2"), + new TimeRange[] {new TimeRange(7000, 9000), new TimeRange(14000, 16000)}, + TSEncoding.PLAIN, + CompressionType.SNAPPY); + fileWriter.endChunkGroup(); + fileWriter.endFile(); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGenerateAlignedSeriesWithPage() throws IOException { + try { + TsFileResource seqResource1 = createEmptyFileAndResource(true); + CompactionTestFileWriter fileWriter = new CompactionTestFileWriter(seqResource1); + fileWriter.startChunkGroup("d0"); + TimeRange[] chunk1PageRanges = + new TimeRange[] {new TimeRange(1000, 1500), new TimeRange(2000, 2500)}; + TimeRange[] chunk2PageRanges = + new TimeRange[] {new TimeRange(3000, 3500), new TimeRange(5000, 5500)}; + + fileWriter.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][] {chunk1PageRanges, chunk2PageRanges}, + TSEncoding.PLAIN, + CompressionType.LZ4); + fileWriter.endChunkGroup(); + fileWriter.endFile(); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGenerateAlignedSeriesWithPoints() throws IOException { + try { + TsFileResource seqResource1 = createEmptyFileAndResource(true); + CompactionTestFileWriter fileWriter = new CompactionTestFileWriter(seqResource1); + fileWriter.startChunkGroup("d0"); + TimeRange[] chunk1Page1Points = + new TimeRange[] {new TimeRange(1000, 1500), new TimeRange(2000, 2500)}; + TimeRange[] chunk1Page2Points = + new TimeRange[] {new TimeRange(3000, 3500), new TimeRange(5000, 5500)}; + TimeRange[][] chunk1 = new TimeRange[][] {chunk1Page1Points, chunk1Page2Points}; + TimeRange[] chunk2Page1Points = + new TimeRange[] {new TimeRange(6000, 6500), new TimeRange(7000, 7500)}; + TimeRange[] chunk2Page2Points = + new TimeRange[] {new TimeRange(8000, 8500), new TimeRange(9000, 9500)}; + TimeRange[][] chunk2 = new TimeRange[][] {chunk2Page1Points, chunk2Page2Points}; + + fileWriter.generateSimpleAlignedSeriesToCurrentDevice( + Arrays.asList("s0", "s1", "s2"), + new TimeRange[][][] {chunk1, chunk2}, + TSEncoding.PLAIN, + CompressionType.LZ4); + fileWriter.endChunkGroup(); + fileWriter.endFile(); + System.out.println(seqResource1.getTsFile().getAbsolutePath()); + } catch (Exception e) { + Assert.fail(); + } + } +}
