This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-4517 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b873fed67b30b0c4b1e9185b9428fb962e34e32c Author: Liu Xuxin <[email protected]> AuthorDate: Tue Oct 4 17:45:11 2022 +0800 control metadata size in TsFileIOWriter --- .../resources/conf/iotdb-engine.properties | 8 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 21 + .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 + .../db/engine/compaction/CompactionUtils.java | 79 +- .../cross/rewrite/task/SubCompactionTask.java | 5 +- .../utils/AlignedSeriesCompactionExecutor.java | 1 + .../inner/utils/InnerSpaceCompactionUtils.java | 12 +- .../utils/SingleSeriesCompactionExecutor.java | 1 + .../writer/AbstractCompactionWriter.java | 7 + .../writer/CrossSpaceCompactionWriter.java | 47 + .../writer/InnerSpaceCompactionWriter.java | 34 +- .../iotdb/db/engine/flush/MemTableFlushTask.java | 16 +- .../db/engine/storagegroup/TsFileProcessor.java | 9 +- .../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 +- .../writelog/recover/TsFileRecoverPerformer.java | 12 +- .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 8 + .../file/metadata/MetadataIndexConstructor.java | 39 +- .../tsfile/file/metadata/MetadataIndexNode.java | 2 +- .../iotdb/tsfile/file/metadata/TsFileMetadata.java | 13 + .../tsfile/file/metadata/enums/TSDataType.java | 2 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 5 +- .../tsfile/v2/read/TsFileSequenceReaderForV2.java | 20 +- .../write/writer/RestorableTsFileIOWriter.java | 12 + .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 359 ++++-- .../write/writer/tsmiterator/DiskTSMIterator.java | 132 ++ .../write/writer/tsmiterator/TSMIterator.java | 147 +++ .../tsfile/write/MetadataIndexConstructorTest.java | 2 +- .../iotdb/tsfile/write/TsFileIOWriterTest.java | 2 +- .../tsfile/write/TsFileIntegrityCheckingTool.java | 251 ++++ .../writer/TsFileIOWriterMemoryControlTest.java | 1303 ++++++++++++++++++++ 31 files changed, 2411 insertions(+), 152 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index e17a900a25..9f494b582c 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -353,6 +353,10 @@ timestamp_precision=ms # Datatype: int # primitive_array_size=32 +# the percentage of write memory for chunk metadata remains in a single file writer when flushing memtable +# Datatype: double +# chunk_metadata_size_proportion_in_write=0.1 + # Ratio of write memory for invoking flush disk, 0.4 by default # If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2 # Datatype: double @@ -449,6 +453,10 @@ timestamp_precision=ms # BALANCE: alternate two compaction types # compaction_priority=BALANCE +# size proportion for chunk metadata maintains in memory when compacting +# Datatype: double +# chunk_metadata_size_proportion_in_compaction=0.05 + # The target tsfile size in compaction # Datatype: long, Unit: byte # target_compaction_file_size=1073741824 diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index f4cfc65920..c9151ffcf3 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -128,6 +128,8 @@ public class IoTDBConfig { /** The proportion of write memory for write process */ private double writeProportion = 0.8; + private double chunkMetadataSizeProportionInWrite = 0.1; + /** The proportion of write memory for compaction */ private double compactionProportion = 0.2; @@ -398,6 +400,8 @@ public class IoTDBConfig { */ private CompactionPriority compactionPriority = CompactionPriority.BALANCE; + private double chunkMetadataSizeProportionInCompaction = 0.05; + /** The target tsfile size in compaction, 1 GB by default */ private long targetCompactionFileSize = 1073741824L; @@ -2773,4 +2777,21 @@ public class IoTDBConfig { public void setCustomizedProperties(Properties customizedProperties) { this.customizedProperties = customizedProperties; } + + public double getChunkMetadataSizeProportionInWrite() { + return chunkMetadataSizeProportionInWrite; + } + + public void setChunkMetadataSizeProportionInWrite(double chunkMetadataSizeProportionInWrite) { + this.chunkMetadataSizeProportionInWrite = chunkMetadataSizeProportionInWrite; + } + + public double getChunkMetadataSizeProportionInCompaction() { + return chunkMetadataSizeProportionInCompaction; + } + + public void setChunkMetadataSizeProportionInCompaction( + double chunkMetadataSizeProportionInCompaction) { + this.chunkMetadataSizeProportionInCompaction = chunkMetadataSizeProportionInCompaction; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 815113f6b4..01606eb873 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -341,6 +341,11 @@ public class IoTDBDescriptor { "max_waiting_time_when_insert_blocked", Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked())))); + conf.setChunkMetadataSizeProportionInCompaction( + Double.parseDouble( + properties.getProperty( + "chunk_metadata_size_proportion_in_compaction", + Double.toString(conf.getChunkMetadataSizeProportionInCompaction())))); conf.setEstimatedSeriesSize( Integer.parseInt( properties.getProperty( @@ -928,6 +933,11 @@ public class IoTDBDescriptor { .setKerberosPrincipal( properties.getProperty("kerberos_principal", conf.getKerberosPrincipal())); TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize()); + conf.setChunkMetadataSizeProportionInWrite( + Double.parseDouble( + properties.getProperty( + "chunk_metadata_size_proportion_in_write", + Double.toString(conf.getChunkMetadataSizeProportionInWrite())))); // timed flush memtable, timed close tsfile loadTimedService(properties); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java index 97bcff78ff..e32930864e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java @@ -60,7 +60,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -165,48 +165,51 @@ public class CompactionUtils { QueryContext queryContext, QueryDataSource queryDataSource) throws IOException, InterruptedException { - Map<String, MeasurementSchema> measurementSchemaMap = - deviceIterator.getAllSchemasOfCurrentDevice(); - int subTaskNums = Math.min(measurementSchemaMap.size(), subTaskNum); - - // assign all measurements to different sub tasks - Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums]; - int idx = 0; - for (String measurement : measurementSchemaMap.keySet()) { - if (measurementsForEachSubTask[idx % subTaskNums] == null) { - measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>(); + Map<String, MeasurementSchema> schemaMap = deviceIterator.getAllSchemasOfCurrentDevice(); + List<String> allMeasurements = new ArrayList<>(schemaMap.keySet()); + allMeasurements.sort((String::compareTo)); + int subTaskNums = Math.min(allMeasurements.size(), subTaskNum); + // construct sub tasks and start compacting measurements in parallel + if (subTaskNums > 0) { + // assign the measurements for each subtask + List<String>[] measurementListArray = new List[subTaskNums]; + for (int i = 0, size = allMeasurements.size(); i < size; ++i) { + int index = i % subTaskNums; + if (measurementListArray[index] == null) { + measurementListArray[index] = new LinkedList<>(); + } + measurementListArray[index].add(allMeasurements.get(i)); } - measurementsForEachSubTask[idx++ % subTaskNums].add(measurement); - } - // construct sub tasks and start compacting measurements in parallel - List<Future<Void>> futures = new ArrayList<>(); - compactionWriter.startChunkGroup(device, false); - for (int i = 0; i < subTaskNums; i++) { - futures.add( - CompactionTaskManager.getInstance() - .submitSubTask( - new SubCompactionTask( - device, - measurementsForEachSubTask[i], - queryContext, - queryDataSource, - compactionWriter, - measurementSchemaMap, - i))); - } + // construct sub tasks and start compacting measurements in parallel + List<Future<Void>> futures = new ArrayList<>(); + compactionWriter.startChunkGroup(device, false); + for (int i = 0; i < subTaskNums; i++) { + futures.add( + CompactionTaskManager.getInstance() + .submitSubTask( + new SubCompactionTask( + device, + measurementListArray[i], + queryContext, + queryDataSource, + compactionWriter, + schemaMap, + i))); + } - // wait for all sub tasks finish - for (int i = 0; i < subTaskNums; i++) { - try { - futures.get(i).get(); - } catch (InterruptedException | ExecutionException e) { - logger.error("SubCompactionTask meet errors ", e); - Thread.interrupted(); - throw new InterruptedException(); + // wait for all sub tasks finish + for (int i = 0; i < subTaskNums; i++) { + try { + futures.get(i).get(); + } catch (InterruptedException | ExecutionException e) { + logger.error("SubCompactionTask meet errors ", e); + Thread.interrupted(); + throw new InterruptedException(); + } } } - + compactionWriter.checkAndMayFlushChunkMetadata(); compactionWriter.endChunkGroup(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java index c688deb8bd..f5d5278437 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; /** @@ -45,7 +44,7 @@ public class SubCompactionTask implements Callable<Void> { private static final Logger logger = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private final String device; - private final Set<String> measurementList; + private final List<String> measurementList; private final QueryContext queryContext; private final QueryDataSource queryDataSource; @@ -57,7 +56,7 @@ public class SubCompactionTask implements Callable<Void> { public SubCompactionTask( String device, - Set<String> measurementList, + List<String> measurementList, QueryContext queryContext, QueryDataSource queryDataSource, AbstractCompactionWriter compactionWriter, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java index af5353153f..dbd5b98bec 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java @@ -135,6 +135,7 @@ public class AlignedSeriesCompactionExecutor { chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(writer); } + writer.checkMetadataSizeAndMayFlush(); } private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java index 06004c9c6e..a6248e9835 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; @@ -64,8 +65,17 @@ public class InnerSpaceCompactionUtils { public static void compact(TsFileResource targetResource, List<TsFileResource> tsFileResources) throws IOException, MetadataException, InterruptedException { + // size for file writer is 5% of per compaction task memory budget + long sizeForFileWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInCompaction()); try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(tsFileResources); - TsFileIOWriter writer = new TsFileIOWriter(targetResource.getTsFile())) { + TsFileIOWriter writer = + new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) { while (deviceIterator.hasNextDevice()) { Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice(); String device = deviceInfo.left; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java index 2d49094f44..d614b3dbe3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java @@ -138,6 +138,7 @@ public class SingleSeriesCompactionExecutor { } targetResource.updateStartTime(device, minStartTimestamp); targetResource.updateEndTime(device, maxEndTimestamp); + fileWriter.checkMetadataSizeAndMayFlush(); } private void constructChunkWriterFromReadChunk(Chunk chunk) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java index 5c1460230d..72096069e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java @@ -177,4 +177,11 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } public abstract List<TsFileIOWriter> getFileIOWriter(); + + public void checkAndMayFlushChunkMetadata() throws IOException { + List<TsFileIOWriter> writers = this.getFileIOWriter(); + for (TsFileIOWriter writer : writers) { + writer.checkMetadataSizeAndMayFlush(); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java index 3e245cfc35..3a413d4cf4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java @@ -18,8 +18,10 @@ */ package org.apache.iotdb.db.engine.compaction.writer; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -27,6 +29,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { // target fileIOWriters @@ -34,6 +38,7 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { // source tsfiles private List<TsFileResource> seqTsFileResources; + private List<TsFileResource> targetTsFileResources; // Each sub task has its corresponding seq file index. // The index of the array corresponds to subTaskId. @@ -51,17 +56,46 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { // current chunk group header size private int chunkGroupHeaderSize; + private AtomicLong[] startTimeForCurDeviceForEachFile; + private AtomicLong[] endTimeForCurDeviceForEachFile; + private AtomicBoolean[] hasCurDeviceForEachFile; + private AtomicLong[][] startTimeForEachDevice = new AtomicLong[subTaskNum][]; + private AtomicLong[][] endTimeForEachDevice = new AtomicLong[subTaskNum][]; + public CrossSpaceCompactionWriter( List<TsFileResource> targetResources, List<TsFileResource> seqFileResources) throws IOException { currentDeviceEndTime = new long[seqFileResources.size()]; isEmptyFile = new boolean[seqFileResources.size()]; isDeviceExistedInTargetFiles = new boolean[targetResources.size()]; + this.targetTsFileResources = targetResources; + startTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()]; + endTimeForCurDeviceForEachFile = new AtomicLong[targetResources.size()]; + hasCurDeviceForEachFile = new AtomicBoolean[targetResources.size()]; + long memorySizeForEachWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInCompaction() + / targetResources.size()); for (int i = 0; i < targetResources.size(); i++) { this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile())); isEmptyFile[i] = true; + startTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MAX_VALUE); + endTimeForCurDeviceForEachFile[i] = new AtomicLong(Long.MIN_VALUE); + hasCurDeviceForEachFile[i] = new AtomicBoolean(false); } this.seqTsFileResources = seqFileResources; + for (int i = 0, size = targetResources.size(); i < subTaskNum; ++i) { + startTimeForEachDevice[i] = new AtomicLong[size]; + endTimeForEachDevice[i] = new AtomicLong[size]; + for (int j = 0; j < size; ++j) { + startTimeForEachDevice[i][j] = new AtomicLong(Long.MAX_VALUE); + endTimeForEachDevice[i][j] = new AtomicLong(Long.MIN_VALUE); + } + } } @Override @@ -86,6 +120,16 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { } isDeviceExistedInTargetFiles[i] = false; } + for (int i = 0, size = targetTsFileResources.size(); i < size; ++i) { + for (int j = 0; j < subTaskNum; ++j) { + targetTsFileResources + .get(i) + .updateStartTime(deviceId, startTimeForEachDevice[j][i].getAndSet(Long.MAX_VALUE)); + targetTsFileResources + .get(i) + .updateEndTime(deviceId, endTimeForEachDevice[j][i].getAndSet(Long.MIN_VALUE)); + } + } deviceId = null; } @@ -99,6 +143,9 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { public void write(long timestamp, Object value, int subTaskId) throws IOException { checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId); writeDataPoint(timestamp, value, subTaskId); + int fileIndex = seqFileIndexArray[subTaskId]; + startTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::min); + endTimeForEachDevice[subTaskId][fileIndex].accumulateAndGet(timestamp, Math::max); checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId); isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true; isEmptyFile[seqFileIndexArray[subTaskId]] = false; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java index af2cc53c67..18fa51d7a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java @@ -18,21 +18,43 @@ */ package org.apache.iotdb.db.engine.compaction.writer; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public class InnerSpaceCompactionWriter extends AbstractCompactionWriter { private TsFileIOWriter fileWriter; private boolean isEmptyFile; + private TsFileResource resource; + private AtomicLong[] startTimeOfCurDevice; + private AtomicLong[] endTimeOfCurDevice; public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException { - this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile()); + long sizeForFileWriter = + (long) + (SystemInfo.getInstance().getMemorySizeForCompaction() + / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInCompaction()); + this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile(), true, sizeForFileWriter); isEmptyFile = true; + resource = targetFileResource; + int concurrentThreadNum = + Math.max(1, IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum()); + startTimeOfCurDevice = new AtomicLong[concurrentThreadNum]; + endTimeOfCurDevice = new AtomicLong[concurrentThreadNum]; + for (int i = 0; i < concurrentThreadNum; ++i) { + startTimeOfCurDevice[i] = new AtomicLong(Long.MAX_VALUE); + endTimeOfCurDevice[i] = new AtomicLong(Long.MIN_VALUE); + } } @Override @@ -44,6 +66,14 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter { @Override public void endChunkGroup() throws IOException { + for (int i = 0; i < startTimeOfCurDevice.length; ++i) { + resource.updateStartTime( + fileWriter.getCurrentChunkGroupDeviceId(), startTimeOfCurDevice[i].get()); + resource.updateEndTime( + fileWriter.getCurrentChunkGroupDeviceId(), endTimeOfCurDevice[i].get()); + startTimeOfCurDevice[i].set(Long.MAX_VALUE); + endTimeOfCurDevice[i].set(Long.MIN_VALUE); + } fileWriter.endChunkGroup(); } @@ -57,6 +87,8 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter { writeDataPoint(timestamp, value, subTaskId); checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId); isEmptyFile = false; + startTimeOfCurDevice[subTaskId].set(Math.min(startTimeOfCurDevice[subTaskId].get(), timestamp)); + endTimeOfCurDevice[subTaskId].set(Math.max(endTimeOfCurDevice[subTaskId].get(), timestamp)); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index 2fd5f4db64..5ab620f7fd 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -39,6 +39,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -114,12 +117,14 @@ public class MemTableFlushTask { long start = System.currentTimeMillis(); long sortTime = 0; - // for map do not use get(key) to iterate - for (Map.Entry<IDeviceID, IWritableMemChunkGroup> memTableEntry : - memTable.getMemTableMap().entrySet()) { - encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey().toStringID())); + Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap(); + List<IDeviceID> deviceIDList = new ArrayList<>(memTableMap.keySet()); + // sort the IDeviceID in lexicographical order + deviceIDList.sort(Comparator.comparing(IDeviceID::toStringID)); + for (IDeviceID deviceID : deviceIDList) { + encodingTaskQueue.put(new StartFlushGroupIOTask(deviceID.toStringID())); - final Map<String, IWritableMemChunk> value = memTableEntry.getValue().getMemChunkMap(); + final Map<String, IWritableMemChunk> value = memTableMap.get(deviceID).getMemChunkMap(); for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) { long startTime = System.currentTimeMillis(); IWritableMemChunk series = iWritableMemChunkEntry.getValue(); @@ -275,6 +280,7 @@ public class MemTableFlushTask { this.writer.setMinPlanIndex(memTable.getMinPlanIndex()); this.writer.setMaxPlanIndex(memTable.getMaxPlanIndex()); this.writer.endChunkGroup(); + writer.checkMetadataSizeAndMayFlush(); } else { ((IChunkWriter) ioMessage).writeToFileWriter(this.writer); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 1cb31057e5..a56061935a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -172,7 +172,14 @@ public class TsFileProcessor { this.storageGroupName = storageGroupName; this.tsFileResource = new TsFileResource(tsfile, this); this.storageGroupInfo = storageGroupInfo; - this.writer = new RestorableTsFileIOWriter(tsfile); + this.writer = + new RestorableTsFileIOWriter( + tsfile, + (long) + (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInWrite())); this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback; this.sequence = sequence; logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath()); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 8c26ec903c..6ec8e69fc3 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -1423,7 +1423,7 @@ public class PlanExecutor implements IPlanExecutor { private void loadNewTsFileVerifyMetadata(TsFileSequenceReader tsFileSequenceReader) throws MetadataException, QueryProcessException, IOException { Map<String, List<TimeseriesMetadata>> metadataSet = - tsFileSequenceReader.getAllTimeseriesMetadata(); + tsFileSequenceReader.getAllTimeseriesMetadata(false); for (Map.Entry<String, List<TimeseriesMetadata>> entry : metadataSet.entrySet()) { String deviceId = entry.getKey(); PartialPath devicePath = new PartialPath(deviceId); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 5d61e6c304..6273add29f 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -69,7 +69,7 @@ public class FileLoaderUtils { public static void updateTsFileResource( TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException { for (Entry<String, List<TimeseriesMetadata>> entry : - reader.getAllTimeseriesMetadata().entrySet()) { + reader.getAllTimeseriesMetadata(false).entrySet()) { for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) { tsFileResource.updateStartTime( entry.getKey(), timeseriesMetaData.getStatistics().getStartTime()); diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index b7722b67dd..dd87894e39 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.writelog.recover; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.flush.MemTableFlushTask; import org.apache.iotdb.db.engine.memtable.IMemTable; @@ -113,7 +114,14 @@ public class TsFileRecoverPerformer { // remove corrupted part of the TsFile RestorableTsFileIOWriter restorableTsFileIOWriter; try { - restorableTsFileIOWriter = new RestorableTsFileIOWriter(file); + restorableTsFileIOWriter = + new RestorableTsFileIOWriter( + file, + (long) + (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold() + * IoTDBDescriptor.getInstance() + .getConfig() + .getChunkMetadataSizeProportionInWrite())); } catch (NotCompatibleTsFileException e) { boolean result = file.delete(); logger.warn("TsFile {} is incompatible. Delete it successfully {}", filePath, result); @@ -180,7 +188,7 @@ public class TsFileRecoverPerformer { try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath(), true)) { for (Entry<String, List<TimeseriesMetadata>> entry : - reader.getAllTimeseriesMetadata().entrySet()) { + reader.getAllTimeseriesMetadata(false).entrySet()) { for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) { tsFileResource.updateStartTime( entry.getKey(), timeseriesMetaData.getStatistics().getStartTime()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index 831f8cd120..9ee1f7f566 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -186,6 +186,14 @@ public class ChunkMetadata implements IChunkMetadata { return chunkMetaData; } + public static ChunkMetadata deserializeFrom(ByteBuffer buffer, TSDataType dataType) { + ChunkMetadata chunkMetadata = new ChunkMetadata(); + chunkMetadata.tsDataType = dataType; + chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer); + chunkMetadata.statistics = Statistics.deserialize(buffer, dataType); + return chunkMetadata; + } + @Override public long getVersion() { return version; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java index 062ffd6183..baa93ce9da 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java @@ -123,7 +123,7 @@ public class MetadataIndexConstructor { * @param out tsfile output * @param type MetadataIndexNode type */ - private static MetadataIndexNode generateRootNode( + public static MetadataIndexNode generateRootNode( Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type) throws IOException { int queueSize = metadataIndexNodeQueue.size(); @@ -148,7 +148,7 @@ public class MetadataIndexConstructor { return metadataIndexNodeQueue.poll(); } - private static void addCurrentIndexNodeToQueue( + public static void addCurrentIndexNodeToQueue( MetadataIndexNode currentIndexNode, Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out) @@ -156,4 +156,39 @@ public class MetadataIndexConstructor { currentIndexNode.setEndOffset(out.getPosition()); metadataIndexNodeQueue.add(currentIndexNode); } + + public static MetadataIndexNode checkAndBuildLevelIndex( + Map<String, MetadataIndexNode> deviceMetadataIndexMap, TsFileOutput out) throws IOException { + // if not exceed the max child nodes num, ignore the device index and directly point to the + // measurement + if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) { + MetadataIndexNode metadataIndexNode = + new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE); + for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) { + metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition())); + entry.getValue().serializeTo(out.wrapAsStream()); + } + metadataIndexNode.setEndOffset(out.getPosition()); + return metadataIndexNode; + } + + // else, build level index for devices + Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>(); + MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE); + + for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) { + // when constructing from internal node, each node is related to an entry + if (currentIndexNode.isFull()) { + addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out); + currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE); + } + currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition())); + entry.getValue().serializeTo(out.wrapAsStream()); + } + addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out); + MetadataIndexNode deviceMetadataIndexNode = + generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE); + deviceMetadataIndexNode.setEndOffset(out.getPosition()); + return deviceMetadataIndexNode; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java index 3f6f6336b3..1d3972cafe 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java @@ -74,7 +74,7 @@ public class MetadataIndexNode { this.children.add(metadataIndexEntry); } - boolean isFull() { + public boolean isFull() { return children.size() >= config.getMaxDegreeOfIndexNode(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java index 95e01e2da1..7b70f54ebe 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java @@ -119,6 +119,19 @@ public class TsFileMetadata { return byteLen; } + public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter) + throws IOException { + int byteLen = 0; + byte[] bytes = filter.serialize(); + byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream); + outputStream.write(bytes); + byteLen += bytes.length; + byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getSize(), outputStream); + byteLen += + ReadWriteForEncodingUtils.writeUnsignedVarInt(filter.getHashFunctionSize(), outputStream); + return byteLen; + } + /** * build bloom filter * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java index 48d9b2a38c..73ae05703c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java @@ -62,7 +62,7 @@ public enum TSDataType { return getTsDataType(type); } - private static TSDataType getTsDataType(byte type) { + public static TSDataType getTsDataType(byte type) { switch (type) { case 0: return TSDataType.BOOLEAN; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 90ec11349f..3a413b2f98 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -929,7 +929,8 @@ public class TsFileSequenceReader implements AutoCloseable { } /* TimeseriesMetadata don't need deserialize chunk metadata list */ - public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException { + public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata) + throws IOException { if (tsFileMetaData == null) { readFileMetadata(); } @@ -949,7 +950,7 @@ public class TsFileSequenceReader implements AutoCloseable { null, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - false); + needChunkMetadata); } return timeseriesMetadataMap; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java index 49553e42d9..00536ca1dc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java @@ -22,7 +22,13 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; -import org.apache.iotdb.tsfile.file.metadata.*; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -41,8 +47,15 @@ import org.apache.iotdb.tsfile.v2.file.metadata.TsFileMetadataV2; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements AutoCloseable { @@ -413,7 +426,8 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A } @Override - public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata() throws IOException { + public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata) + throws IOException { if (tsFileMetaData == null) { readFileMetadata(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java index 70a5d8cf9f..ae2afdb8e1 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -78,6 +78,18 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter { this(file, true); } + /** + * @param file a given tsfile path you want to (continue to) write + * @throws IOException if write failed, or the file is broken but autoRepair==false. + */ + public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException { + this(file, true); + this.maxMetadataSize = maxMetadataSize; + this.enableMemoryControl = true; + this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); + this.checkMetadataSizeAndMayFlush(); + } + public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException { if (logger.isDebugEnabled()) { logger.debug("{} is opened.", file.getName()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 2f865f297f..7fdbac27df 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -26,35 +26,47 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.BloomFilter; import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.TreeMap; +import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue; +import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex; +import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode; + /** * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream. */ @@ -93,6 +105,21 @@ public class TsFileIOWriter implements AutoCloseable { private long minPlanIndex; private long maxPlanIndex; + // the following variable is used for memory control + protected long maxMetadataSize; + protected long currentChunkMetadataSize = 0L; + protected File chunkMetadataTempFile; + protected LocalTsFileOutput tempOutput; + protected volatile boolean hasChunkMetadataInDisk = false; + protected String currentSeries = null; + // record the total num of path in order to make bloom filter + protected int pathCount = 0; + protected boolean enableMemoryControl = false; + private Path lastSerializePath = null; + protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>(); + private volatile int chunkMetadataCount = 0; + public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta"; + /** empty construct function. */ protected TsFileIOWriter() {} @@ -126,6 +153,15 @@ public class TsFileIOWriter implements AutoCloseable { this.out = output; } + /** for write with memory control */ + public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize) + throws IOException { + this(file); + this.enableMemoryControl = enableMemoryControl; + this.maxMetadataSize = maxMetadataSize; + chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); + } + /** * Writes given bytes to output stream. This method is called when total memory size exceeds the * chunk group size threshold. @@ -236,6 +272,10 @@ public class TsFileIOWriter implements AutoCloseable { /** end chunk and write some log. */ public void endCurrentChunk() { + if (enableMemoryControl) { + this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize(); + } + chunkMetadataCount++; chunkMetadataList.add(currentChunkMetadata); currentChunkMetadata = null; } @@ -247,47 +287,14 @@ public class TsFileIOWriter implements AutoCloseable { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void endFile() throws IOException { - long metaOffset = out.getPosition(); - - // serialize the SEPARATOR of MetaData - ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream()); - - // group ChunkMetadata by series - Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>(); - - for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { - List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList(); - for (IChunkMetadata chunkMetadata : chunkMetadatas) { - Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()); - chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata); - } - } - - MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap); - TsFileMetadata tsFileMetaData = new TsFileMetadata(); - tsFileMetaData.setMetadataIndex(metadataIndex); - tsFileMetaData.setMetaOffset(metaOffset); + checkInMemoryPathCount(); + readChunkMetadataAndConstructIndexTree(); long footerIndex = out.getPosition(); if (logger.isDebugEnabled()) { logger.debug("start to flush the footer,file pos:{}", footerIndex); } - // write TsFileMetaData - int size = tsFileMetaData.serializeTo(out.wrapAsStream()); - if (logger.isDebugEnabled()) { - logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition()); - } - - // write bloom filter - size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet()); - if (logger.isDebugEnabled()) { - logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition()); - } - - // write TsFileMetaData size - ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata. - // write magic string out.write(MAGIC_STRING_BYTES); @@ -296,66 +303,121 @@ public class TsFileIOWriter implements AutoCloseable { if (resourceLogger.isDebugEnabled() && file != null) { resourceLogger.debug("{} writer is closed.", file.getName()); } + if (file != null) { + File chunkMetadataFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); + if (chunkMetadataFile.exists()) { + FileUtils.delete(chunkMetadataFile); + } + } canWrite = false; } - /** - * Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData - * - * @param chunkMetadataListMap chunkMetadata that Path.mask == 0 - * @return MetadataIndexEntry list in TsFileMetadata - */ - private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap) - throws IOException { - - // convert ChunkMetadataList to this field - deviceTimeseriesMetadataMap = new LinkedHashMap<>(); - // create device -> TimeseriesMetaDataList Map - for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) { - // for ordinary path - flushOneChunkMetadata(entry.getKey(), entry.getValue()); + private void readChunkMetadataAndConstructIndexTree() throws IOException { + if (tempOutput != null) { + tempOutput.close(); } + long metaOffset = out.getPosition(); - // construct TsFileMetadata and return - return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out); - } + // serialize the SEPARATOR of MetaData + ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream()); - /** - * Flush one chunkMetadata - * - * @param path Path of chunk - * @param chunkMetadataList List of chunkMetadata about path(previous param) - */ - private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList) - throws IOException { - // create TimeseriesMetaData - PublicBAOS publicBAOS = new PublicBAOS(); - TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType(); - Statistics seriesStatistics = Statistics.getStatsByType(dataType); - - int chunkMetadataListLength = 0; - boolean serializeStatistic = (chunkMetadataList.size() > 1); - // flush chunkMetadataList one by one - for (IChunkMetadata chunkMetadata : chunkMetadataList) { - if (!chunkMetadata.getDataType().equals(dataType)) { - continue; + TSMIterator tsmIterator = + hasChunkMetadataInDisk + ? TSMIterator.getTSMIteratorInDisk( + chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice) + : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList); + Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>(); + Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>(); + String currentDevice = null; + String prevDevice = null; + MetadataIndexNode currentIndexNode = + new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); + int seriesIdxForCurrDevice = 0; + BloomFilter filter = + BloomFilter.getEmptyBloomFilter( + TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount); + + int indexCount = 0; + while (tsmIterator.hasNext()) { + // read in all chunk metadata of one series + // construct the timeseries metadata for this series + Pair<String, TimeseriesMetadata> timeseriesMetadataPair = tsmIterator.next(); + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + currentSeries = timeseriesMetadataPair.left; + + indexCount++; + // build bloom filter + filter.add(currentSeries); + // construct the index tree node for the series + Path currentPath = null; + if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { + // this series is the time column of the aligned device + // the full series path will be like "root.sg.d." + // we remove the last . in the series id here + currentPath = new Path(currentSeries); + currentDevice = currentSeries.substring(0, currentSeries.length() - 1); + } else { + currentPath = new Path(currentSeries, true); + currentDevice = currentPath.getDevice(); + } + if (!currentDevice.equals(prevDevice)) { + if (prevDevice != null) { + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + deviceMetadataIndexMap.put( + prevDevice, + generateRootNode( + measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); + currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + } + measurementMetadataIndexQueue = new ArrayDeque<>(); + seriesIdxForCurrDevice = 0; + } + + if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) { + if (currentIndexNode.isFull()) { + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + } + if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) { + currentIndexNode.addEntry( + new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition())); + } else { + currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition())); + } } - chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic); - seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); + + prevDevice = currentDevice; + seriesIdxForCurrDevice++; + // serialize the timeseries metadata to file + timeseriesMetadata.serializeTo(out.wrapAsStream()); + } + + addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); + if (prevDevice != null) { + deviceMetadataIndexMap.put( + prevDevice, + generateRootNode( + measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); } - TimeseriesMetadata timeseriesMetadata = - new TimeseriesMetadata( - (byte) - ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()), - chunkMetadataListLength, - path.getMeasurement(), - dataType, - seriesStatistics, - publicBAOS); - deviceTimeseriesMetadataMap - .computeIfAbsent(path.getDevice(), k -> new ArrayList<>()) - .add(timeseriesMetadata); + MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out); + + TsFileMetadata tsFileMetadata = new TsFileMetadata(); + tsFileMetadata.setMetadataIndex(metadataIndex); + tsFileMetadata.setMetaOffset(metaOffset); + + int size = tsFileMetadata.serializeTo(out.wrapAsStream()); + size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter); + + // write TsFileMetaData size + ReadWriteIOUtils.write(size, out.wrapAsStream()); + } + + private void checkInMemoryPathCount() { + for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { + pathCount += chunkGroupMetadata.getChunkMetadataList().size(); + } } /** @@ -399,6 +461,9 @@ public class TsFileIOWriter implements AutoCloseable { public void close() throws IOException { canWrite = false; out.close(); + if (tempOutput != null) { + this.tempOutput.close(); + } } void writeSeparatorMaskForTest() throws IOException { @@ -477,6 +542,30 @@ public class TsFileIOWriter implements AutoCloseable { * @return DeviceTimeseriesMetadataMap */ public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() { + Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new TreeMap<>(); + Map<String, Map<String, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>(); + for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { + for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) { + chunkMetadataMap + .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>()) + .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>()) + .add(chunkMetadata); + } + } + for (String device : chunkMetadataMap.keySet()) { + Map<String, List<IChunkMetadata>> seriesToChunkMetadataMap = chunkMetadataMap.get(device); + for (Map.Entry<String, List<IChunkMetadata>> entry : seriesToChunkMetadataMap.entrySet()) { + try { + deviceTimeseriesMetadataMap + .computeIfAbsent(device, x -> new ArrayList<>()) + .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue())); + } catch (IOException e) { + logger.error("Failed to get device timeseries metadata map", e); + return null; + } + } + } + return deviceTimeseriesMetadataMap; } @@ -495,4 +584,98 @@ public class TsFileIOWriter implements AutoCloseable { public void setMaxPlanIndex(long maxPlanIndex) { this.maxPlanIndex = maxPlanIndex; } + + /** + * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the + * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device + * in row, you should make sure all data of current writing device has been written before this + * method is called. For writing not aligned series or writing aligned series in column, you + * should make sure that all data of one series is written before you call this function.</b> + * + * @throws IOException + */ + public void checkMetadataSizeAndMayFlush() throws IOException { + // This function should be called after all data of an aligned device has been written + if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) { + try { + if (logger.isDebugEnabled()) { + logger.debug( + "Flushing chunk metadata, total size is {}, count is {}, avg size is {}", + currentChunkMetadataSize, + chunkMetadataCount, + currentChunkMetadataSize / chunkMetadataCount); + } + sortAndFlushChunkMetadata(); + } catch (IOException e) { + logger.error("Meets exception when flushing metadata to temp file for {}", file, e); + throw e; + } + } + } + + /** + * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then + * flush them to a temp file. + * + * @throws IOException + */ + protected void sortAndFlushChunkMetadata() throws IOException { + // group by series + List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = + TSMIterator.sortChunkMetadata( + chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList); + if (tempOutput == null) { + tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile)); + } + hasChunkMetadataInDisk = true; + for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) { + Path seriesPath = pair.left; + boolean isNewPath = !seriesPath.equals(lastSerializePath); + if (isNewPath) { + // record the count of path to construct bloom filter later + pathCount++; + } + List<IChunkMetadata> iChunkMetadataList = pair.right; + writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath); + lastSerializePath = seriesPath; + logger.debug("Flushing {}", seriesPath); + } + // clear the cache metadata to release the memory + chunkGroupMetadataList.clear(); + if (chunkMetadataList != null) { + chunkMetadataList.clear(); + } + chunkMetadataCount = 0; + currentChunkMetadataSize = 0; + } + + private void writeChunkMetadataToTempFile( + List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath) + throws IOException { + // [DeviceId] measurementId datatype size chunkMetadataBuffer + if (lastSerializePath == null + || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) { + // mark the end position of last device + endPosInCMTForDevice.add(tempOutput.getPosition()); + // serialize the device + // for each device, we only serialize it once, in order to save io + ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream()); + } + if (isNewPath && iChunkMetadataList.size() > 0) { + // serialize the public info of this measurement + ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream()); + ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream()); + } + PublicBAOS buffer = new PublicBAOS(); + int totalSize = 0; + for (IChunkMetadata chunkMetadata : iChunkMetadataList) { + totalSize += chunkMetadata.serializeTo(buffer, true); + } + ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream()); + buffer.writeTo(tempOutput); + } + + public String getCurrentChunkGroupDeviceId() { + return currentChunkGroupDeviceId; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java new file mode 100644 index 0000000000..fd02f1438a --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/DiskTSMIterator.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write.writer.tsmiterator; + +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +/** + * This class read ChunkMetadata iteratively from disk(.cmt file) and memory(list of + * ChunkGroupMetadata), and construct them as TimeseriesMetadata. It will read ChunkMetadata in disk + * first, and after all ChunkMetadata in disk is read, it will read ChunkMetadata in memory. + */ +public class DiskTSMIterator extends TSMIterator { + + private static final Logger LOG = LoggerFactory.getLogger(DiskTSMIterator.class); + + private LinkedList<Long> endPosForEachDevice; + private File cmtFile; + private LocalTsFileInput input; + private long fileLength = 0; + private long currentPos = 0; + private long nextEndPosForDevice = 0; + private String currentDevice; + private boolean remainsInFile = true; + + protected DiskTSMIterator( + File cmtFile, + List<ChunkGroupMetadata> chunkGroupMetadataList, + LinkedList<Long> endPosForEachDevice) + throws IOException { + super(chunkGroupMetadataList); + this.cmtFile = cmtFile; + this.endPosForEachDevice = endPosForEachDevice; + this.input = new LocalTsFileInput(cmtFile.toPath()); + this.fileLength = cmtFile.length(); + this.nextEndPosForDevice = endPosForEachDevice.removeFirst(); + } + + @Override + public boolean hasNext() { + return remainsInFile || iterator.hasNext(); + } + + @Override + public Pair<String, TimeseriesMetadata> next() { + try { + if (remainsInFile) { + // deserialize from file + return getTimeSerisMetadataFromFile(); + } else { + // get from memory iterator + return super.next(); + } + } catch (IOException e) { + LOG.error("Meets IOException when reading timeseries metadata from disk", e); + return null; + } + } + + private Pair<String, TimeseriesMetadata> getTimeSerisMetadataFromFile() throws IOException { + if (currentPos == nextEndPosForDevice) { + // deserialize the current device name + currentDevice = ReadWriteIOUtils.readString(input.wrapAsInputStream()); + nextEndPosForDevice = + endPosForEachDevice.size() > 0 ? endPosForEachDevice.removeFirst() : fileLength; + } + // deserialize public info for measurement + String measurementUid = ReadWriteIOUtils.readVarIntString(input.wrapAsInputStream()); + byte dataTypeInByte = ReadWriteIOUtils.readByte(input.wrapAsInputStream()); + TSDataType dataType = TSDataType.getTsDataType(dataTypeInByte); + int chunkBufferSize = ReadWriteIOUtils.readInt(input.wrapAsInputStream()); + ByteBuffer chunkBuffer = ByteBuffer.allocate(chunkBufferSize); + int readSize = ReadWriteIOUtils.readAsPossible(input, chunkBuffer); + if (readSize < chunkBufferSize) { + throw new IOException( + String.format( + "Expected to read %s bytes, but actually read %s bytes", chunkBufferSize, readSize)); + } + chunkBuffer.flip(); + + // deserialize chunk metadata from chunk buffer + List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); + while (chunkBuffer.hasRemaining()) { + chunkMetadataList.add(ChunkMetadata.deserializeFrom(chunkBuffer, dataType)); + } + updateCurrentPos(); + return new Pair<>( + currentDevice + "." + measurementUid, + constructOneTimeseriesMetadata(measurementUid, chunkMetadataList)); + } + + private void updateCurrentPos() throws IOException { + currentPos = input.position(); + if (currentPos >= fileLength) { + remainsInFile = false; + input.close(); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java new file mode 100644 index 0000000000..f11242f296 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/tsmiterator/TSMIterator.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer.tsmiterator; + +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.PublicBAOS; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * TSMIterator returns full path of series and its TimeseriesMetadata iteratively. It accepts data + * source from memory or disk. Static method getTSMIteratorInMemory returns a TSMIterator that reads + * from memory, and static method getTSMIteratorInDisk returns a TSMIterator that reads from disk. + */ +public class TSMIterator { + private static final Logger LOG = LoggerFactory.getLogger(TSMIterator.class); + protected List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList; + protected Iterator<Pair<Path, List<IChunkMetadata>>> iterator; + + protected TSMIterator(List<ChunkGroupMetadata> chunkGroupMetadataList) { + this.sortedChunkMetadataList = sortChunkMetadata(chunkGroupMetadataList, null, null); + this.iterator = sortedChunkMetadataList.iterator(); + } + + public static TSMIterator getTSMIteratorInMemory( + List<ChunkGroupMetadata> chunkGroupMetadataList) { + return new TSMIterator(chunkGroupMetadataList); + } + + public static TSMIterator getTSMIteratorInDisk( + File cmtFile, List<ChunkGroupMetadata> chunkGroupMetadataList, LinkedList<Long> serializePos) + throws IOException { + return new DiskTSMIterator(cmtFile, chunkGroupMetadataList, serializePos); + } + + public boolean hasNext() { + return iterator.hasNext(); + } + + public Pair<String, TimeseriesMetadata> next() throws IOException { + Pair<Path, List<IChunkMetadata>> nextPair = iterator.next(); + return new Pair<>( + nextPair.left.getFullPath(), + constructOneTimeseriesMetadata(nextPair.left.getMeasurement(), nextPair.right)); + } + + public static TimeseriesMetadata constructOneTimeseriesMetadata( + String measurementId, List<IChunkMetadata> chunkMetadataList) throws IOException { + // create TimeseriesMetaData + PublicBAOS publicBAOS = new PublicBAOS(); + TSDataType dataType = chunkMetadataList.get(chunkMetadataList.size() - 1).getDataType(); + Statistics seriesStatistics = Statistics.getStatsByType(dataType); + + int chunkMetadataListLength = 0; + boolean serializeStatistic = (chunkMetadataList.size() > 1); + // flush chunkMetadataList one by one + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + if (!chunkMetadata.getDataType().equals(dataType)) { + continue; + } + chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic); + seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); + } + + TimeseriesMetadata timeseriesMetadata = + new TimeseriesMetadata( + (byte) + ((serializeStatistic ? (byte) 1 : (byte) 0) | chunkMetadataList.get(0).getMask()), + chunkMetadataListLength, + measurementId, + dataType, + seriesStatistics, + publicBAOS); + return timeseriesMetadata; + } + + public static List<Pair<Path, List<IChunkMetadata>>> sortChunkMetadata( + List<ChunkGroupMetadata> chunkGroupMetadataList, + String currentDevice, + List<ChunkMetadata> chunkMetadataList) { + Map<String, Map<Path, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>(); + List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = new LinkedList<>(); + for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) { + chunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>()); + for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) { + chunkMetadataMap + .get(chunkGroupMetadata.getDevice()) + .computeIfAbsent( + new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid()), + x -> new ArrayList<>()) + .add(chunkMetadata); + } + } + if (currentDevice != null) { + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + chunkMetadataMap + .computeIfAbsent(currentDevice, x -> new TreeMap<>()) + .computeIfAbsent( + new Path(currentDevice, chunkMetadata.getMeasurementUid()), x -> new ArrayList<>()) + .add(chunkMetadata); + } + } + + for (Map.Entry<String, Map<Path, List<IChunkMetadata>>> entry : chunkMetadataMap.entrySet()) { + Map<Path, List<IChunkMetadata>> seriesChunkMetadataMap = entry.getValue(); + for (Map.Entry<Path, List<IChunkMetadata>> seriesChunkMetadataEntry : + seriesChunkMetadataMap.entrySet()) { + sortedChunkMetadataList.add( + new Pair<>(seriesChunkMetadataEntry.getKey(), seriesChunkMetadataEntry.getValue())); + } + } + return sortedChunkMetadataList; + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java index 5e518d4280..f7137e95be 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/MetadataIndexConstructorTest.java @@ -234,7 +234,7 @@ public class MetadataIndexConstructorTest { assertFalse(iterator.hasNext()); Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata = - reader.getAllTimeseriesMetadata(); + reader.getAllTimeseriesMetadata(false); for (int j = 0; j < actualDevices.size(); j++) { for (int i = 0; i < actualMeasurements.get(j).size(); i++) { assertEquals( diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java index 4c2321928a..ba955912f1 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java @@ -147,7 +147,7 @@ public class TsFileIOWriterTest { // make sure timeseriesMetadata is only Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = - reader.getAllTimeseriesMetadata(); + reader.getAllTimeseriesMetadata(false); Set<String> pathSet = new HashSet<>(); for (Map.Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) { diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java new file mode 100644 index 0000000000..c97a9a0774 --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIntegrityCheckingTool.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write; + +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; +import org.apache.iotdb.tsfile.read.reader.IChunkReader; +import org.apache.iotdb.tsfile.read.reader.IPointReader; +import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; +import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +/** This class provide some static method to check the integrity of tsfile */ +public class TsFileIntegrityCheckingTool { + private static Logger LOG = LoggerFactory.getLogger(TsFileIntegrityCheckingTool.class); + + /** + * This method check the integrity of file by reading it from the start to the end. It mainly + * checks the integrity of the chunks. + * + * @param filename + */ + public static void checkIntegrityBySequenceRead(String filename) { + try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) { + String headMagicString = reader.readHeadMagic(); + Assert.assertEquals(TSFileConfig.MAGIC_STRING, headMagicString); + String tailMagicString = reader.readTailMagic(); + Assert.assertEquals(TSFileConfig.MAGIC_STRING, tailMagicString); + reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); + List<long[]> timeBatch = new ArrayList<>(); + int pageIndex = 0; + byte marker; + while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + case MetaMarker.TIME_CHUNK_HEADER: + case MetaMarker.VALUE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: + ChunkHeader header = reader.readChunkHeader(marker); + if (header.getDataSize() == 0) { + // empty value chunk + break; + } + Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + Decoder valueDecoder = + Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); + int dataSize = header.getDataSize(); + pageIndex = 0; + if (header.getDataType() == TSDataType.VECTOR) { + timeBatch.clear(); + } + while (dataSize > 0) { + valueDecoder.reset(); + PageHeader pageHeader = + reader.readPageHeader( + header.getDataType(), + (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); + ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); + if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK) + == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk + TimePageReader timePageReader = + new TimePageReader(pageHeader, pageData, defaultTimeDecoder); + timeBatch.add(timePageReader.getNextTimeBatch()); + } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK) + == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk + ValuePageReader valuePageReader = + new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder); + TsPrimitiveType[] valueBatch = + valuePageReader.nextValueBatch(timeBatch.get(pageIndex)); + } else { // NonAligned Chunk + PageReader pageReader = + new PageReader( + pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null); + BatchData batchData = pageReader.getAllSatisfiedPageData(); + } + pageIndex++; + dataSize -= pageHeader.getSerializedPageSize(); + } + break; + case MetaMarker.CHUNK_GROUP_HEADER: + ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); + break; + case MetaMarker.OPERATION_INDEX_RANGE: + reader.readPlanIndex(); + break; + default: + MetaMarker.handleUnexpectedMarker(marker); + } + } + } catch (IOException e) { + LOG.error("Meet exception when checking integrity of tsfile", e); + Assert.fail(); + } + } + + /** + * This method checks the integrity of the file by mimicking the process of the query, which reads + * the metadata index tree first, and get the timeseries metadata list and chunk metadata list. + * After that, this method acquires single chunk according to chunk metadata, then it deserializes + * the chunk, and verifies the correctness of the data. + * + * @param filename File to be check + * @param originData The origin data in a map format, Device -> SeriesId -> List<List<Time,Val>>, + * each inner list stands for a chunk. + */ + public static void checkIntegrityByQuery( + String filename, + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData) { + try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) { + Map<String, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(true); + Assert.assertEquals(originData.size(), allTimeseriesMetadata.size()); + // check each series + for (Map.Entry<String, List<TimeseriesMetadata>> entry : allTimeseriesMetadata.entrySet()) { + String deviceId = entry.getKey(); + List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue(); + boolean vectorMode = false; + if (timeseriesMetadataList.size() > 0 + && timeseriesMetadataList.get(0).getTSDataType() != TSDataType.VECTOR) { + Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size()); + } else { + vectorMode = true; + Assert.assertEquals(originData.get(deviceId).size(), timeseriesMetadataList.size() - 1); + } + + if (!vectorMode) { + // check integrity of not aligned series + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) { + // get its chunk metadata list, and read the chunk + String measurementId = timeseriesMetadata.getMeasurementId(); + List<List<Pair<Long, TsPrimitiveType>>> originChunks = + originData.get(deviceId).get(measurementId); + List<IChunkMetadata> chunkMetadataList = timeseriesMetadata.getChunkMetadataList(); + Assert.assertEquals(originChunks.size(), chunkMetadataList.size()); + chunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime)); + for (int i = 0; i < chunkMetadataList.size(); ++i) { + Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadataList.get(i)); + ChunkReader chunkReader = new ChunkReader(chunk, null); + List<Pair<Long, TsPrimitiveType>> originValue = originChunks.get(i); + // deserialize the chunk and verify it with origin data + for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) { + IPointReader pointReader = chunkReader.nextPageData().getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair pair = pointReader.nextTimeValuePair(); + Assert.assertEquals( + originValue.get(valIdx).left.longValue(), pair.getTimestamp()); + Assert.assertEquals(originValue.get(valIdx++).right, pair.getValue()); + } + } + } + } + } else { + // check integrity of vector type + // get the timeseries metadata of the time column + TimeseriesMetadata timeColumnMetadata = timeseriesMetadataList.get(0); + List<IChunkMetadata> timeChunkMetadataList = timeColumnMetadata.getChunkMetadataList(); + timeChunkMetadataList.sort(Comparator.comparing(IChunkMetadata::getStartTime)); + + for (int i = 1; i < timeseriesMetadataList.size(); ++i) { + // traverse each value column + List<IChunkMetadata> valueChunkMetadataList = + timeseriesMetadataList.get(i).getChunkMetadataList(); + Assert.assertEquals(timeChunkMetadataList.size(), valueChunkMetadataList.size()); + List<List<Pair<Long, TsPrimitiveType>>> originDataChunks = + originData.get(deviceId).get(timeseriesMetadataList.get(i).getMeasurementId()); + for (int chunkIdx = 0; chunkIdx < timeChunkMetadataList.size(); ++chunkIdx) { + Chunk timeChunk = + reader.readMemChunk((ChunkMetadata) timeChunkMetadataList.get(chunkIdx)); + Chunk valueChunk = + reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(chunkIdx)); + // construct an aligned chunk reader using time chunk and value chunk + IChunkReader chunkReader = + new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk), null); + // verify the values + List<Pair<Long, TsPrimitiveType>> originValue = originDataChunks.get(chunkIdx); + for (int valIdx = 0; chunkReader.hasNextSatisfiedPage(); ) { + IBatchDataIterator pointReader = chunkReader.nextPageData().getBatchDataIterator(); + while (pointReader.hasNext()) { + long time = pointReader.currentTime(); + Assert.assertEquals(originValue.get(valIdx).left.longValue(), time); + Assert.assertEquals( + originValue.get(valIdx++).right.getValue(), + ((TsPrimitiveType[]) pointReader.currentValue())[0].getValue()); + pointReader.next(); + } + } + } + } + } + } + + } catch (IOException e) { + LOG.error("Meet exception when checking integrity of tsfile", e); + Assert.fail(); + } + } +} diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java new file mode 100644 index 0000000000..b7c6ff84db --- /dev/null +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java @@ -0,0 +1,1303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.write.writer; + +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.encoding.encoder.Encoder; +import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.write.TsFileIntegrityCheckingTool; +import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.TimeChunkWriter; +import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class TsFileIOWriterMemoryControlTest { + private static File testFile = new File("target", "1-1-0-0.tsfile"); + private static File emptyFile = new File("target", "temp"); + private long TEST_CHUNK_SIZE = 1000; + private List<String> sortedSeriesId = new ArrayList<>(); + private List<String> sortedDeviceId = new ArrayList<>(); + private boolean init = false; + + @Before + public void setUp() throws IOException { + if (!init) { + init = true; + for (int i = 0; i < 2048; ++i) { + sortedSeriesId.add("s" + i); + sortedDeviceId.add("root.sg.d" + i); + } + sortedSeriesId.sort((String::compareTo)); + sortedDeviceId.sort((String::compareTo)); + } + TEST_CHUNK_SIZE = 1000; + } + + @After + public void tearDown() throws IOException { + if (testFile.exists()) { + FileUtils.delete(testFile); + } + if (new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX).exists()) { + FileUtils.delete( + new File(testFile.getPath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX)); + } + if (emptyFile.exists()) { + FileUtils.delete(emptyFile); + } + } + + /** The following tests is for ChunkMetadata serialization and deserialization. */ + @Test + public void testSerializeAndDeserializeChunkMetadata() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + List<ChunkMetadata> originChunkMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + chunkWriter = generateIntData(j, 0L, new ArrayList<>()); + break; + case 1: + chunkWriter = generateBooleanData(j, 0, new ArrayList<>()); + break; + case 2: + chunkWriter = generateFloatData(j, 0L, new ArrayList<>()); + break; + case 3: + chunkWriter = generateDoubleData(j, 0L, new ArrayList<>()); + break; + case 4: + default: + chunkWriter = generateTextData(j, 0L, new ArrayList<>()); + break; + } + chunkWriter.writeToFileWriter(writer); + } + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, + writer.chunkGroupMetadataList, + writer.endPosInCMTForDevice); + for (int i = 0; iterator.hasNext(); ++i) { + Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next(); + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId()); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics()); + } + } + } + + @Test + public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + List<ChunkMetadata> originChunkMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6); + chunkWriter.writeToFileWriter(writer); + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + List<String> measurementIds = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + measurementIds.add(sortedDeviceId.get(i) + "."); + for (int j = 1; j <= 6; ++j) { + measurementIds.add(sortedDeviceId.get(i) + ".s" + j); + } + } + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice); + for (int i = 0; iterator.hasNext(); ++i) { + Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next(); + String fullPath = timeseriesMetadataPair.left; + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + Assert.assertEquals(measurementIds.get(i), fullPath); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTSDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics()); + } + } + } + + @Test + public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + List<IChunkMetadata> originChunkMetadataList = new ArrayList<>(); + List<String> seriesIds = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + if (i % 2 == 0) { + // write normal series + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + chunkWriter = generateIntData(j, 0L, new ArrayList<>()); + break; + case 1: + chunkWriter = generateBooleanData(j, 0L, new ArrayList<>()); + break; + case 2: + chunkWriter = generateFloatData(j, 0L, new ArrayList<>()); + break; + case 3: + chunkWriter = generateDoubleData(j, 0L, new ArrayList<>()); + break; + case 4: + default: + chunkWriter = generateTextData(j, 0L, new ArrayList<>()); + break; + } + chunkWriter.writeToFileWriter(writer); + seriesIds.add(deviceId + "." + sortedSeriesId.get(j)); + } + } else { + // write vector + AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, new ArrayList<>(), 6); + chunkWriter.writeToFileWriter(writer); + seriesIds.add(deviceId + "."); + for (int l = 1; l <= 6; ++l) { + seriesIds.add(deviceId + ".s" + l); + } + } + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, new ArrayList<>(), writer.endPosInCMTForDevice); + for (int i = 0; i < originChunkMetadataList.size(); ++i) { + Pair<String, TimeseriesMetadata> timeseriesMetadataPair = iterator.next(); + Assert.assertEquals(seriesIds.get(i), timeseriesMetadataPair.left); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), + timeseriesMetadataPair.right.getTSDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), + timeseriesMetadataPair.right.getStatistics()); + } + } + } + + /** The following tests is for writing normal series in different nums. */ + + /** + * Write a file with 10 devices and 5 series in each device. For each series, we write one chunk + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithNormalChunk() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + chunkWriter = generateIntData(j, 0L, valList); + break; + case 1: + chunkWriter = generateBooleanData(j, 0L, valList); + break; + case 2: + chunkWriter = generateFloatData(j, 0L, valList); + break; + case 3: + chunkWriter = generateDoubleData(j, 0L, valList); + break; + case 4: + default: + chunkWriter = generateTextData(j, 0L, valList); + break; + } + chunkWriter.writeToFileWriter(writer); + writer.checkMetadataSizeAndMayFlush(); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 10 devices and 5 series in each device. For each series, we write 100 chunks + * for it. We maintain some chunk metadata in memory when calling endFile(). + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + if (i < 9) { + writer.checkMetadataSizeAndMayFlush(); + } + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + Assert.assertFalse(writer.chunkGroupMetadataList.isEmpty()); + writer.endFile(); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 2 devices and 5 series in each device. For each series, we write 1024 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + long originTestChunkSize = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 2; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 1024; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 1024; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 1024; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 1024; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 1024; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } finally { + TEST_CHUNK_SIZE = originTestChunkSize; + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Write a file with 2 devices and 1024 series in each device. For each series, we write 50 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>(); + long originTestChunkSize = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 1; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 2; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 1024; ++j) { + ChunkWriterImpl chunkWriter; + switch (j % 5) { + case 0: + for (int k = 0; k < 50; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 50; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 50; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 50; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 50; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } finally { + TEST_CHUNK_SIZE = originTestChunkSize; + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes); + } + + /** + * Write a file with 1024 devices and 5 series in each device. For each series, we write 10 chunks + * for it. This test make sure that each chunk + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>(); + long originTestChunkSize = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 1024; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j % 5) { + case 0: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originTimes + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + Assert.assertTrue(writer.hasChunkMetadataInDisk); + writer.endFile(); + } finally { + TEST_CHUNK_SIZE = originTestChunkSize; + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originTimes); + } + + /** The following tests is for writing aligned series. */ + + /** + * Test writing 10 align series, 6 in a group. + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = generateVectorData(0L, valList, 6); + for (int j = 1; j <= 6; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Test writing 1 aligned series, for each series we write 512 chunks + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + int chunkNum = 512, seriesNum = 6; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 1; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < chunkNum; ++k) { + writer.startChunkGroup(deviceId); + List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = + generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum); + for (int j = 1; j <= seriesNum; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + /** + * Test write aligned chunk metadata, for each aligned series, we write 1024 components. + * + * @throws IOException + */ + @Test + public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + int chunkNum = 5, seriesNum = 1024; + long originTestPointNum = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + try { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < chunkNum; ++k) { + writer.startChunkGroup(deviceId); + List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = + generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum); + for (int j = 1; j <= seriesNum; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + } finally { + TEST_CHUNK_SIZE = originTestPointNum; + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + @Test + public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + int chunkNum = 5, seriesNum = 12; + long originTestPointNum = TEST_CHUNK_SIZE; + TEST_CHUNK_SIZE = 10; + int deviceNum = 1024; + try { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < deviceNum; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < chunkNum; ++k) { + writer.startChunkGroup(deviceId); + List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = + generateVectorData(k * TEST_CHUNK_SIZE, valList, seriesNum); + for (int j = 1; j <= seriesNum; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + } finally { + TEST_CHUNK_SIZE = originTestPointNum; + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + @Test + public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>(); + TEST_CHUNK_SIZE = 10; + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; i++) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + TSEncoding timeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); + Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + TimeChunkWriter timeChunkWriter = + new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + timeChunkWriter.write(j); + } + timeChunkWriter.writeToFileWriter(writer); + } + writer.sortAndFlushChunkMetadata(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + for (int k = 0; k < 1024; ++k) { + TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN); + builder.initFromProps(null); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + sortedSeriesId.get(k), + CompressionType.SNAPPY, + TSDataType.DOUBLE, + TSEncoding.PLAIN, + builder.getEncoder(TSDataType.DOUBLE)); + Random random = new Random(); + List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>(); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + double val = random.nextDouble(); + chunkWriter.write(j, val, false); + valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val))); + } + chunkWriter.writeToFileWriter(writer); + originValue + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>()) + .add(valueList); + } + writer.sortAndFlushChunkMetadata(); + } + writer.endChunkGroup(); + } + writer.endFile(); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue); + } + + @Test + public void testWritingCompleteMixedFiles() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; ++i) { + String deviceId = sortedDeviceId.get(i); + for (int k = 0; k < 10; ++k) { + writer.startChunkGroup(deviceId); + List<List<Pair<Long, TsPrimitiveType>>> valList = new ArrayList<>(); + AlignedChunkWriterImpl chunkWriter = generateVectorData(k * TEST_CHUNK_SIZE, valList, 6); + for (int j = 1; j <= 6; ++j) { + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent("s" + j, x -> new ArrayList<>()) + .add(valList.get(j - 1)); + } + + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + writer.checkMetadataSizeAndMayFlush(); + } + for (int i = 5; i < 10; ++i) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + for (int j = 0; j < 5; ++j) { + ChunkWriterImpl chunkWriter; + switch (j) { + case 0: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateIntData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 1: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateBooleanData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 2: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateFloatData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 3: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateDoubleData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + case 4: + default: + for (int k = 0; k < 10; ++k) { + List<Pair<Long, TsPrimitiveType>> valList = new ArrayList<>(); + chunkWriter = generateTextData(j, (long) TEST_CHUNK_SIZE * k, valList); + chunkWriter.writeToFileWriter(writer); + originData + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(j), x -> new ArrayList<>()) + .add(valList); + } + break; + } + writer.checkMetadataSizeAndMayFlush(); + } + writer.endChunkGroup(); + } + writer.endFile(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originData); + } + + @Test + public void testWritingAlignedSeriesByColumn() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; i++) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + TSEncoding timeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); + Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType); + TimeChunkWriter timeChunkWriter = + new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder); + for (int j = 0; j < TEST_CHUNK_SIZE; ++j) { + timeChunkWriter.write(j); + } + timeChunkWriter.writeToFileWriter(writer); + writer.sortAndFlushChunkMetadata(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + for (int k = 0; k < 5; ++k) { + TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN); + builder.initFromProps(null); + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + sortedSeriesId.get(k), + CompressionType.SNAPPY, + TSDataType.DOUBLE, + TSEncoding.PLAIN, + builder.getEncoder(TSDataType.DOUBLE)); + Random random = new Random(); + List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>(); + for (int j = 0; j < TEST_CHUNK_SIZE; ++j) { + double val = random.nextDouble(); + chunkWriter.write(j, val, false); + valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val))); + } + chunkWriter.writeToFileWriter(writer); + originValue + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>()) + .add(valueList); + writer.sortAndFlushChunkMetadata(); + } + writer.endChunkGroup(); + } + writer.endFile(); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue); + } + + @Test + public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException { + Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>(); + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + for (int i = 0; i < 5; i++) { + String deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + TSEncoding timeEncoding = + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); + TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); + Encoder encoder = TSEncodingBuilder.getEncodingBuilder(timeEncoding).getEncoder(timeType); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + TimeChunkWriter timeChunkWriter = + new TimeChunkWriter("", CompressionType.SNAPPY, TSEncoding.PLAIN, encoder); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + timeChunkWriter.write(j); + } + timeChunkWriter.writeToFileWriter(writer); + } + writer.sortAndFlushChunkMetadata(); + Assert.assertTrue(writer.hasChunkMetadataInDisk); + for (int k = 0; k < 5; ++k) { + TSEncodingBuilder builder = TSEncodingBuilder.getEncodingBuilder(TSEncoding.PLAIN); + builder.initFromProps(null); + for (int chunkIdx = 0; chunkIdx < 10; ++chunkIdx) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + sortedSeriesId.get(k), + CompressionType.SNAPPY, + TSDataType.DOUBLE, + TSEncoding.PLAIN, + builder.getEncoder(TSDataType.DOUBLE)); + Random random = new Random(); + List<Pair<Long, TsPrimitiveType>> valueList = new ArrayList<>(); + for (long j = TEST_CHUNK_SIZE * chunkIdx; j < TEST_CHUNK_SIZE * (chunkIdx + 1); ++j) { + double val = random.nextDouble(); + chunkWriter.write(j, val, false); + valueList.add(new Pair<>((long) j, new TsPrimitiveType.TsDouble(val))); + } + chunkWriter.writeToFileWriter(writer); + originValue + .computeIfAbsent(deviceId, x -> new HashMap<>()) + .computeIfAbsent(sortedSeriesId.get(k), x -> new ArrayList<>()) + .add(valueList); + } + writer.sortAndFlushChunkMetadata(); + } + writer.endChunkGroup(); + } + writer.endFile(); + } + Assert.assertFalse( + new File(testFile.getAbsolutePath() + TsFileIOWriter.CHUNK_METADATA_TEMP_FILE_SUFFIX) + .exists()); + TsFileIntegrityCheckingTool.checkIntegrityBySequenceRead(testFile.getPath()); + TsFileIntegrityCheckingTool.checkIntegrityByQuery(testFile.getPath(), originValue); + } + + /** The following tests is for writing mixed of normal series and aligned series */ + private ChunkWriterImpl generateIntData( + int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.INT64)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + long val = random.nextLong(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsLong(val))); + } + return chunkWriter; + } + + private ChunkWriterImpl generateFloatData( + int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.FLOAT)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + float val = random.nextFloat(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsFloat(val))); + } + return chunkWriter; + } + + private ChunkWriterImpl generateDoubleData( + int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.DOUBLE)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + double val = random.nextDouble(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsDouble(val))); + } + return chunkWriter; + } + + private ChunkWriterImpl generateBooleanData( + int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.BOOLEAN)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + boolean val = random.nextBoolean(); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsBoolean(val))); + } + return chunkWriter; + } + + private AlignedChunkWriterImpl generateVectorData( + long startTime, List<List<Pair<Long, TsPrimitiveType>>> record, int seriesNum) { + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); + TSDataType[] dataTypes = + new TSDataType[] { + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.BOOLEAN, + TSDataType.TEXT + }; + for (int i = 0; i < seriesNum; ++i) { + measurementSchemas.add(new MeasurementSchema("s" + (i + 1), dataTypes[i % 6])); + } + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchemas); + Random random = new Random(); + for (int i = 0; i < seriesNum; ++i) { + record.add(new ArrayList<>()); + } + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + TsPrimitiveType[] points = new TsPrimitiveType[seriesNum]; + for (int j = 0; j < seriesNum; ++j) { + switch (j % 6) { + case 0: + points[j] = new TsPrimitiveType.TsInt(random.nextInt()); + break; + case 1: + points[j] = new TsPrimitiveType.TsLong(random.nextLong()); + break; + case 2: + points[j] = new TsPrimitiveType.TsFloat(random.nextFloat()); + break; + case 3: + points[j] = new TsPrimitiveType.TsDouble(random.nextDouble()); + break; + case 4: + points[j] = new TsPrimitiveType.TsBoolean(random.nextBoolean()); + break; + case 5: + points[j] = + new TsPrimitiveType.TsBinary(new Binary(String.valueOf(random.nextDouble()))); + break; + } + } + for (int j = 0; j < seriesNum; ++j) { + record.get(j).add(new Pair<>(i, points[j])); + } + chunkWriter.write(i, points); + } + return chunkWriter; + } + + private ChunkWriterImpl generateTextData( + int idx, long startTime, List<Pair<Long, TsPrimitiveType>> record) { + ChunkWriterImpl chunkWriter = + new ChunkWriterImpl(new MeasurementSchema(sortedSeriesId.get(idx), TSDataType.TEXT)); + Random random = new Random(); + for (long i = startTime; i < startTime + TEST_CHUNK_SIZE; ++i) { + Binary val = new Binary(String.valueOf(random.nextDouble())); + chunkWriter.write(i, val); + record.add(new Pair<>(i, new TsPrimitiveType.TsBinary(val))); + } + return chunkWriter; + } +}
