This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d1c3b88fd80393b8567a71becb32a48a3cb59fa9 Author: Liu Xuxin <[email protected]> AuthorDate: Wed Jun 21 18:04:41 2023 +0800 [metrics] Refactor compaction read write throughput metrics (#10248) --- .../impl/ReadChunkCompactionPerformer.java | 21 ++- .../execute/utils/MultiTsFileDeviceIterator.java | 19 +- .../fast/AlignedSeriesCompactionExecutor.java | 21 +++ .../readchunk/AlignedSeriesCompactionExecutor.java | 99 ++++++----- .../readchunk/SingleSeriesCompactionExecutor.java | 56 ++---- .../utils/writer/AbstractCompactionWriter.java | 73 +++----- .../writer/AbstractCrossCompactionWriter.java | 24 +-- .../writer/AbstractInnerCompactionWriter.java | 13 +- .../utils/writer/FastCrossCompactionWriter.java | 4 +- .../writer/ReadPointCrossCompactionWriter.java | 2 +- .../writer/ReadPointInnerCompactionWriter.java | 2 +- .../compaction/io/CompactionTsFileReader.java | 178 +++++++++++++++++++ .../compaction/io/CompactionTsFileWriter.java | 130 ++++++++++++++ .../compaction/schedule/CompactionTaskManager.java | 10 -- .../schedule/constant/CompactionIoDataType.java | 36 ++++ .../iotdb/db/engine/storagegroup/DataRegion.java | 5 + .../db/service/metrics/CompactionMetrics.java | 192 ++++++++++++++------- .../iotdb/db/service/metrics/FileMetrics.java | 1 + .../iotdb/tsfile/read/TsFileDeviceIterator.java | 3 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 12 ++ .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 28 ++- 21 files changed, 677 insertions(+), 252 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index da0c9c91dd8..27d4d48afc7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -18,7 +18,6 @@ */ package org.apache.iotdb.db.engine.compaction.execute.performer.impl; -import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -27,6 +26,8 @@ import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.execute.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.AlignedSeriesCompactionExecutor; import org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.rescon.SystemInfo; @@ -34,18 +35,12 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.LinkedList; import java.util.List; public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { - private static final Logger LOGGER = - LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private TsFileResource targetResource; private List<TsFileResource> seqFiles; private CompactionTaskSummary summary; @@ -71,8 +66,12 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles); - TsFileIOWriter writer = - new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) { + CompactionTsFileWriter writer = + new CompactionTsFileWriter( + targetResource.getTsFile(), + true, + sizeForFileWriter, + CompactionType.INNER_SEQ_COMPACTION)) { while (deviceIterator.hasNextDevice()) { Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice(); String device = deviceInfo.left; @@ -113,7 +112,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { private void compactAlignedSeries( String device, TsFileResource targetResource, - TsFileIOWriter writer, + CompactionTsFileWriter writer, MultiTsFileDeviceIterator deviceIterator) throws IOException, InterruptedException { checkThreadInterrupted(); @@ -153,7 +152,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { private void compactNotAlignedSeries( String device, TsFileResource targetResource, - TsFileIOWriter writer, + CompactionTsFileWriter writer, MultiTsFileDeviceIterator deviceIterator) throws IOException, MetadataException, InterruptedException { writer.startChunkGroup(device); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java index 7e2484e0d71..28329e0d5cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -70,7 +72,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); try { for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) { - TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath()); + CompactionTsFileReader reader = + new CompactionTsFileReader( + tsFileResource.getTsFilePath(), CompactionType.INNER_SEQ_COMPACTION); readerMap.put(tsFileResource, reader); deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned()); } @@ -112,8 +116,19 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { Collections.sort( this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); this.readerMap = readerMap; + + CompactionType type = null; + if (!seqResources.isEmpty() && !unseqResources.isEmpty()) { + type = CompactionType.CROSS_COMPACTION; + } else if (seqResources.isEmpty()) { + type = CompactionType.INNER_UNSEQ_COMPACTION; + } else { + type = CompactionType.INNER_SEQ_COMPACTION; + } + for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) { - TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath()); + TsFileSequenceReader reader = + new CompactionTsFileReader(tsFileResource.getTsFilePath(), type); readerMap.put(tsFileResource, reader); deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned()); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java index cd7adbb1adc..4d68016a978 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.FileElement; import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.PageElement; import org.apache.iotdb.db.engine.compaction.execute.utils.writer.AbstractCompactionWriter; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.WriteProcessException; @@ -88,6 +89,8 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { @Override protected void compactFiles() throws PageException, IOException, WriteProcessException, IllegalPathException { + markStartOfAlignedSeries(); + while (!fileList.isEmpty()) { List<FileElement> overlappedFiles = findOverlapFiles(fileList.get(0)); @@ -96,6 +99,24 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { compactChunks(); } + + markEndOfAlignedSeries(); + } + + private void markStartOfAlignedSeries() { + for (TsFileSequenceReader reader : readerCacheMap.values()) { + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markStartOfAlignedSeries(); + } + } + } + + private void markEndOfAlignedSeries() { + for (TsFileSequenceReader reader : readerCacheMap.values()) { + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markEndOfAlignedSeries(); + } + } } /** Deserialize files into chunk metadatas and put them into the chunk metadata queue. */ diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index 6690d133ac8..08eeaba45a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -20,11 +20,9 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; -import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -39,9 +37,6 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - -import com.google.common.util.concurrent.RateLimiter; import java.io.IOException; import java.util.ArrayList; @@ -56,14 +51,12 @@ public class AlignedSeriesCompactionExecutor { private final LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList; private final TsFileResource targetResource; - private final TsFileIOWriter writer; + private final CompactionTsFileWriter writer; private final AlignedChunkWriterImpl chunkWriter; private final List<IMeasurementSchema> schemaList; private long remainingPointInChunkWriter = 0L; private final CompactionTaskSummary summary; - private final RateLimiter rateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); private final long chunkSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -74,7 +67,7 @@ public class AlignedSeriesCompactionExecutor { String device, TsFileResource targetResource, LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList, - TsFileIOWriter writer, + CompactionTsFileWriter writer, CompactionTaskSummary summary) throws IOException { this.device = device; @@ -101,27 +94,13 @@ public class AlignedSeriesCompactionExecutor { for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair : readerAndChunkMetadataList) { TsFileSequenceReader reader = readerListPair.left; + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markStartOfAlignedSeries(); + } List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right; - for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { - List<IChunkMetadata> valueChunkMetadataList = - alignedChunkMetadata.getValueChunkMetadataList(); - for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { - if (chunkMetadata == null) { - continue; - } - if (measurementSet.contains(chunkMetadata.getMeasurementUid())) { - continue; - } - measurementSet.add(chunkMetadata.getMeasurementUid()); - Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata); - ChunkHeader header = chunk.getHeader(); - schemaSet.add( - new MeasurementSchema( - header.getMeasurementID(), - header.getDataType(), - header.getEncodingType(), - header.getCompressionType())); - } + collectSchemaFromOneFile(alignedChunkMetadataList, reader, schemaSet, measurementSet); + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markEndOfAlignedSeries(); } } List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet); @@ -129,12 +108,46 @@ public class AlignedSeriesCompactionExecutor { return schemaList; } + private void collectSchemaFromOneFile( + List<AlignedChunkMetadata> alignedChunkMetadataList, + TsFileSequenceReader reader, + Set<MeasurementSchema> schemaSet, + Set<String> measurementSet) + throws IOException { + for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { + List<IChunkMetadata> valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { + if (chunkMetadata == null) { + continue; + } + if (measurementSet.contains(chunkMetadata.getMeasurementUid())) { + continue; + } + measurementSet.add(chunkMetadata.getMeasurementUid()); + Chunk chunk = reader.readMemChunk((ChunkMetadata) chunkMetadata); + ChunkHeader header = chunk.getHeader(); + schemaSet.add( + new MeasurementSchema( + header.getMeasurementID(), + header.getDataType(), + header.getEncodingType(), + header.getCompressionType())); + } + } + } + public void execute() throws IOException { while (readerAndChunkMetadataList.size() > 0) { Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair = readerAndChunkMetadataList.removeFirst(); TsFileSequenceReader reader = readerListPair.left; List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right; + + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markStartOfAlignedSeries(); + } + TsFileAlignedSeriesReaderIterator readerIterator = new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, schemaList); while (readerIterator.hasNext()) { @@ -142,22 +155,16 @@ public class AlignedSeriesCompactionExecutor { readerIterator.nextReader(); summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum()); summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum()); - CompactionMetrics.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize()); compactOneAlignedChunk( nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum()); } + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markEndOfAlignedSeries(); + } } if (remainingPointInChunkWriter != 0L) { - CompactionTaskManager.mergeRateLimiterAcquire( - rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - true, - chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(writer); + writer.writeChunk(chunkWriter); } writer.checkMetadataSizeAndMayFlush(); } @@ -191,15 +198,7 @@ public class AlignedSeriesCompactionExecutor { private void flushChunkWriterIfLargeEnough() throws IOException { if (remainingPointInChunkWriter >= chunkPointNumThreshold || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) { - CompactionTaskManager.mergeRateLimiterAcquire( - rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - true, - chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(writer); + writer.writeChunk(chunkWriter); remainingPointInChunkWriter = 0L; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index 47d80ccf4ea..0c695eac833 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -22,11 +22,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; -import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -39,9 +36,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - -import com.google.common.util.concurrent.RateLimiter; import java.io.IOException; import java.util.LinkedList; @@ -52,15 +46,13 @@ public class SingleSeriesCompactionExecutor { private String device; private PartialPath series; private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList; - private TsFileIOWriter fileWriter; + private CompactionTsFileWriter fileWriter; private TsFileResource targetResource; private IMeasurementSchema schema; private ChunkWriterImpl chunkWriter; private Chunk cachedChunk; private ChunkMetadata cachedChunkMetadata; - private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); // record the min time and max time to update the target resource private long minStartTimestamp = Long.MAX_VALUE; private long maxEndTimestamp = Long.MIN_VALUE; @@ -80,7 +72,7 @@ public class SingleSeriesCompactionExecutor { PartialPath series, IMeasurementSchema measurementSchema, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, - TsFileIOWriter fileWriter, + CompactionTsFileWriter fileWriter, TsFileResource targetResource) { this.device = series.getDevice(); this.series = series; @@ -97,7 +89,7 @@ public class SingleSeriesCompactionExecutor { public SingleSeriesCompactionExecutor( PartialPath series, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, - TsFileIOWriter fileWriter, + CompactionTsFileWriter fileWriter, TsFileResource targetResource, CompactionTaskSummary summary) { this.device = series.getDevice(); @@ -129,10 +121,6 @@ public class SingleSeriesCompactionExecutor { if (this.chunkWriter == null) { constructChunkWriterFromReadChunk(currentChunk); } - CompactionMetrics.getInstance() - .recordReadInfo( - (long) currentChunk.getHeader().getSerializedSize() - + currentChunk.getHeader().getDataSize()); // if this chunk is modified, deserialize it into points if (chunkMetadata.getDeleteIntervalList() != null) { @@ -155,7 +143,7 @@ public class SingleSeriesCompactionExecutor { // after all the chunk of this sensor is read, flush the remaining data if (cachedChunk != null) { - flushChunkToFileWriter(cachedChunk, cachedChunkMetadata, true); + flushChunkToFileWriter(cachedChunk, cachedChunkMetadata); cachedChunk = null; cachedChunkMetadata = null; } else if (pointCountInChunkWriter != 0L) { @@ -211,7 +199,7 @@ public class SingleSeriesCompactionExecutor { // there is no points remaining in ChunkWriter and no cached chunk // flush it to file directly summary.increaseDirectlyFlushChunkNum(1); - flushChunkToFileWriter(chunk, chunkMetadata, false); + flushChunkToFileWriter(chunk, chunkMetadata); } } @@ -327,36 +315,20 @@ public class SingleSeriesCompactionExecutor { } } - private void flushChunkToFileWriter( - Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); + private void flushChunkToFileWriter(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException { if (chunkMetadata.getStartTime() < minStartTimestamp) { minStartTimestamp = chunkMetadata.getStartTime(); } if (chunkMetadata.getEndTime() > maxEndTimestamp) { maxEndTimestamp = chunkMetadata.getEndTime(); } - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK, - false, - getChunkSize(chunk)); fileWriter.writeChunk(chunk, chunkMetadata); } private void flushChunkWriterIfLargeEnough() throws IOException { if (pointCountInChunkWriter >= targetChunkPointNum || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - false, - chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriter); + fileWriter.writeChunk(chunkWriter); pointCountInChunkWriter = 0L; } } @@ -364,22 +336,14 @@ public class SingleSeriesCompactionExecutor { private void flushCachedChunkIfLargeEnough() throws IOException { if (cachedChunk.getChunkStatistic().getCount() >= targetChunkPointNum || getChunkSize(cachedChunk) >= targetChunkSize) { - flushChunkToFileWriter(cachedChunk, cachedChunkMetadata, true); + flushChunkToFileWriter(cachedChunk, cachedChunkMetadata); cachedChunk = null; cachedChunkMetadata = null; } } private void flushChunkWriter() throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - false, - chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriter); + fileWriter.writeChunk(chunkWriter); pointCountInChunkWriter = 0L; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 1f4b13b9a14..7b7fbc2ee1a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -19,10 +19,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.PageException; import org.apache.iotdb.tsfile.file.header.PageHeader; @@ -39,9 +36,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - -import com.google.common.util.concurrent.RateLimiter; import java.io.IOException; import java.nio.ByteBuffer; @@ -50,9 +44,6 @@ import java.util.List; public abstract class AbstractCompactionWriter implements AutoCloseable { protected int subTaskNum = IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); - private RateLimiter compactionRateLimiter = - CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); - // check if there is unseq error point during writing protected long[] lastTime = new long[subTaskNum]; @@ -135,43 +126,43 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { */ public abstract void checkAndMayFlushChunkMetadata() throws IOException; - protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter iChunkWriter) { - if (iChunkWriter instanceof ChunkWriterImpl) { - ChunkWriterImpl chunkWriter = (ChunkWriterImpl) iChunkWriter; - switch (chunkWriter.getDataType()) { + protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) { + if (chunkWriter instanceof ChunkWriterImpl) { + ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter; + switch (chunkWriterImpl.getDataType()) { case TEXT: - chunkWriter.write(timestamp, value.getBinary()); + chunkWriterImpl.write(timestamp, value.getBinary()); break; case DOUBLE: - chunkWriter.write(timestamp, value.getDouble()); + chunkWriterImpl.write(timestamp, value.getDouble()); break; case BOOLEAN: - chunkWriter.write(timestamp, value.getBoolean()); + chunkWriterImpl.write(timestamp, value.getBoolean()); break; case INT64: - chunkWriter.write(timestamp, value.getLong()); + chunkWriterImpl.write(timestamp, value.getLong()); break; case INT32: - chunkWriter.write(timestamp, value.getInt()); + chunkWriterImpl.write(timestamp, value.getInt()); break; case FLOAT: - chunkWriter.write(timestamp, value.getFloat()); + chunkWriterImpl.write(timestamp, value.getFloat()); break; default: - throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); + throw new UnsupportedOperationException( + "Unknown data type " + chunkWriterImpl.getDataType()); } } else { - AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) iChunkWriter; + AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; alignedChunkWriter.write(timestamp, value.getVector()); } } - protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) + protected void sealChunk( + CompactionTsFileWriter targetWriter, IChunkWriter chunkWriter, int subTaskId) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize()); synchronized (targetWriter) { - iChunkWriter.writeToFileWriter(targetWriter); + targetWriter.writeChunk(chunkWriter); } chunkPointNumArray[subTaskId] = 0; } @@ -188,19 +179,18 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { throws IOException; protected void flushNonAlignedChunkToFileWriter( - TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) + CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(chunk)); synchronized (targetWriter) { // seal last chunk to file writer - chunkWriters[subTaskId].writeToFileWriter(targetWriter); + targetWriter.writeChunk(chunkWriters[subTaskId]); chunkPointNumArray[subTaskId] = 0; targetWriter.writeChunk(chunk, chunkMetadata); } } protected void flushAlignedChunkToFileWriter( - TsFileIOWriter targetWriter, + CompactionTsFileWriter targetWriter, Chunk timeChunk, IChunkMetadata timeChunkMetadata, List<Chunk> valueChunks, @@ -210,11 +200,12 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { synchronized (targetWriter) { AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId]; // seal last chunk to file writer - alignedChunkWriter.writeToFileWriter(targetWriter); + targetWriter.writeChunk(alignedChunkWriter); chunkPointNumArray[subTaskId] = 0; + targetWriter.markStartingWritingAligned(); + // flush time chunk - CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, getChunkSize(timeChunk)); targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata); // flush value chunks @@ -231,10 +222,10 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { Statistics.getStatsByType(valueChunkWriter.getDataType())); continue; } - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, getChunkSize(valueChunk)); targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i)); } + + targetWriter.markEndingWritingAligned(); } } @@ -292,22 +283,14 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } protected void checkChunkSizeAndMayOpenANewChunk( - TsFileIOWriter fileWriter, IChunkWriter iChunkWriter, int subTaskId, boolean isCrossSpace) + CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int subTaskId) throws IOException { if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) { // if chunk point num reaches the check point, then check if the chunk size over threshold lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint; - if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { - sealChunk(fileWriter, iChunkWriter, subTaskId); + if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { + sealChunk(fileWriter, chunkWriter, subTaskId); lastCheckIndex = 0; - CompactionMetrics.getInstance() - .recordWriteInfo( - isCrossSpace - ? CompactionType.CROSS_COMPACTION - : CompactionType.INNER_UNSEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - isAlign, - iChunkWriter.estimateMaxSeriesMemSize()); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index 44409808daf..277379cb9dd 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -21,6 +21,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; @@ -29,7 +31,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; import java.util.ArrayList; @@ -39,7 +40,7 @@ import java.util.Map; public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWriter { // target fileIOWriters - protected List<TsFileIOWriter> targetFileWriters = new ArrayList<>(); + protected List<CompactionTsFileWriter> targetFileWriters = new ArrayList<>(); // source tsfiles private List<TsFileResource> seqTsFileResources; @@ -77,8 +78,11 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl(); for (int i = 0; i < targetResources.size(); i++) { this.targetFileWriters.add( - new TsFileIOWriter( - targetResources.get(i).getTsFile(), enableMemoryControl, memorySizeForEachWriter)); + new CompactionTsFileWriter( + targetResources.get(i).getTsFile(), + enableMemoryControl, + memorySizeForEachWriter, + CompactionType.CROSS_COMPACTION)); isEmptyFile[i] = true; } this.seqTsFileResources = seqFileResources; @@ -99,7 +103,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public void endChunkGroup() throws IOException { for (int i = 0; i < seqTsFileResources.size(); i++) { - TsFileIOWriter targetFileWriter = targetFileWriters.get(i); + CompactionTsFileWriter targetFileWriter = targetFileWriters.get(i); if (isDeviceExistedInTargetFiles[i]) { // update resource CompactionUtils.updateResource(targetResources.get(i), targetFileWriter, deviceId); @@ -129,7 +133,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr writeDataPoint(timestamp, value, chunkWriters[subTaskId]); chunkPointNumArray[subTaskId]++; checkChunkSizeAndMayOpenANewChunk( - targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, true); + targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId); isDeviceExistedInTargetFiles[fileIndex] = true; isEmptyFile[fileIndex] = false; lastTime[subTaskId] = timestamp; @@ -153,7 +157,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public void close() throws IOException { - for (TsFileIOWriter targetWriter : targetFileWriters) { + for (CompactionTsFileWriter targetWriter : targetFileWriters) { if (targetWriter != null && targetWriter.canWrite()) { targetWriter.close(); } @@ -165,8 +169,8 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public void checkAndMayFlushChunkMetadata() throws IOException { for (int i = 0; i < targetFileWriters.size(); i++) { - TsFileIOWriter fileIOWriter = targetFileWriters.get(i); - fileIOWriter.checkMetadataSizeAndMayFlush(); + CompactionTsFileWriter fileIoWriter = targetFileWriters.get(i); + fileIoWriter.checkMetadataSizeAndMayFlush(); } } @@ -234,7 +238,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr @Override public long getWriterSize() throws IOException { long totalSize = 0; - for (TsFileIOWriter writer : targetFileWriters) { + for (CompactionTsFileWriter writer : targetFileWriters) { totalSize += writer.getPos(); } return totalSize; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index 6404730d0e4..3795cf1d71b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -20,18 +20,19 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWriter { - protected TsFileIOWriter fileWriter; + protected CompactionTsFileWriter fileWriter; protected boolean isEmptyFile; @@ -50,7 +51,11 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); boolean enableMemoryControl = IoTDBDescriptor.getInstance().getConfig().isEnableMemControl(); this.fileWriter = - new TsFileIOWriter(targetFileResource.getTsFile(), enableMemoryControl, sizeForFileWriter); + new CompactionTsFileWriter( + targetFileResource.getTsFile(), + enableMemoryControl, + sizeForFileWriter, + CompactionType.INNER_UNSEQ_COMPACTION); this.targetResource = targetFileResource; isEmptyFile = true; } @@ -77,7 +82,7 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException { writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]); chunkPointNumArray[subTaskId]++; - checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId, false); + checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); isEmptyFile = false; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java index 109d66c03b3..b3464080e76 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/FastCrossCompactionWriter.java @@ -150,7 +150,7 @@ public class FastCrossCompactionWriter extends AbstractCrossCompactionWriter { // check chunk size and may open a new chunk checkChunkSizeAndMayOpenANewChunk( - targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, true); + targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId); isDeviceExistedInTargetFiles[fileIndex] = true; isEmptyFile[fileIndex] = false; @@ -178,7 +178,7 @@ public class FastCrossCompactionWriter extends AbstractCrossCompactionWriter { // check chunk size and may open a new chunk checkChunkSizeAndMayOpenANewChunk( - targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId, true); + targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId); isDeviceExistedInTargetFiles[fileIndex] = true; isEmptyFile[fileIndex] = false; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java index 772a5b18bd9..497afa3d32e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java @@ -57,7 +57,7 @@ public class ReadPointCrossCompactionWriter extends AbstractCrossCompactionWrite } chunkPointNumArray[subTaskId] += timestamps.getTimes().length; checkChunkSizeAndMayOpenANewChunk( - targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, subTaskId, true); + targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, subTaskId); isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true; isEmptyFile[seqFileIndexArray[subTaskId]] = false; lastTime[subTaskId] = timestamps.getEndTime(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java index 0fdb712084d..f0dd4eef1f9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java @@ -42,7 +42,7 @@ public class ReadPointInnerCompactionWriter extends AbstractInnerCompactionWrite AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; chunkWriter.write(timestamps, columns, batchSize); chunkPointNumArray[subTaskId] += timestamps.getTimes().length; - checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId, false); + checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId); isEmptyFile = false; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java new file mode 100644 index 00000000000..6e8b7b3d890 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.io; + +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.service.metrics.CompactionMetrics; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.TsFileDeviceIterator; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class extends the TsFileSequenceReader class to read and manage TsFile with a focus on + * compaction-related operations. This includes functions for tracking and recording the amount of + * data read and distinguishing between aligned and not aligned series during compaction. + */ +public class CompactionTsFileReader extends TsFileSequenceReader { + /** Tracks the total amount of data (in bytes) that has been read. */ + private AtomicLong readDataSize = new AtomicLong(0L); + + /** The type of compaction running. */ + CompactionType compactionType; + + /** A flag that indicates if an aligned series is being read. */ + private volatile boolean readingAlignedSeries = false; + + /** + * Constructs a new instance of CompactionTsFileReader. + * + * @param file The file to be read. + * @param compactionType The type of compaction running. + * @throws IOException If an error occurs during file operations. + */ + public CompactionTsFileReader(String file, CompactionType compactionType) throws IOException { + super(file); + this.compactionType = compactionType; + } + + @Override + protected ByteBuffer readData(long position, int totalSize) throws IOException { + ByteBuffer buffer = super.readData(position, totalSize); + readDataSize.addAndGet(totalSize); + return buffer; + } + + /** Marks the start of reading an aligned series. */ + public void markStartOfAlignedSeries() { + readingAlignedSeries = true; + } + + /** Marks the end of reading an aligned series. */ + public void markEndOfAlignedSeries() { + readingAlignedSeries = false; + } + + @Override + public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { + synchronized (this) { + // using synchronized to avoid concurrent read that makes readDataSize not correct + long before = readDataSize.get(); + Chunk chunk = super.readMemChunk(metaData); + long dataSize = readDataSize.get() - before; + CompactionMetrics.getInstance() + .recordReadInfo( + compactionType, + readingAlignedSeries + ? CompactionIoDataType.ALIGNED + : CompactionIoDataType.NOT_ALIGNED, + dataSize); + return chunk; + } + } + + @Override + public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws IOException { + long before = readDataSize.get(); + TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned(); + long dataSize = readDataSize.get() - before; + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); + return iterator; + } + + @Override + public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset( + long startOffset, long endOffset) throws IOException { + long before = readDataSize.get(); + List<IChunkMetadata> chunkMetadataList = + super.getChunkMetadataListByTimeseriesMetadataOffset(startOffset, endOffset); + long dataSize = readDataSize.get() - before; + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); + return chunkMetadataList; + } + + @Override + public void getDevicesAndEntriesOfOneLeafNode( + Long startOffset, Long endOffset, Queue<Pair<String, long[]>> measurementNodeOffsetQueue) + throws IOException { + long before = readDataSize.get(); + super.getDevicesAndEntriesOfOneLeafNode(startOffset, endOffset, measurementNodeOffsetQueue); + long dataSize = readDataSize.get() - before; + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); + } + + @Override + public MetadataIndexNode readMetadataIndexNode(long start, long end) throws IOException { + long before = readDataSize.get(); + MetadataIndexNode metadataIndexNode = super.readMetadataIndexNode(start, end); + long dataSize = readDataSize.get() - before; + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); + return metadataIndexNode; + } + + @Override + public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> + getTimeseriesMetadataOffsetByDevice( + MetadataIndexNode measurementNode, + Set<String> excludedMeasurementIds, + boolean needChunkMetadata) + throws IOException { + long before = readDataSize.get(); + Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> result = + super.getTimeseriesMetadataOffsetByDevice( + measurementNode, excludedMeasurementIds, needChunkMetadata); + long dataSize = readDataSize.get() - before; + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); + return result; + } + + @Override + public void getDeviceTimeseriesMetadata( + List<TimeseriesMetadata> timeseriesMetadataList, + MetadataIndexNode measurementNode, + Set<String> excludedMeasurementIds, + boolean needChunkMetadata) + throws IOException { + long before = readDataSize.get(); + super.getDeviceTimeseriesMetadata( + timeseriesMetadataList, measurementNode, excludedMeasurementIds, needChunkMetadata); + long dataSize = readDataSize.get() - before; + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java new file mode 100644 index 00000000000..4dac7843aae --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.io; + +import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.service.metrics.CompactionMetrics; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; + +public class CompactionTsFileWriter extends TsFileIOWriter { + CompactionType type; + + private volatile boolean isWritingAligned = false; + + public CompactionTsFileWriter( + File file, boolean enableMemoryControl, long maxMetadataSize, CompactionType type) + throws IOException { + super(file, enableMemoryControl, maxMetadataSize); + this.type = type; + } + + public void markStartingWritingAligned() { + isWritingAligned = true; + } + + public void markEndingWritingAligned() { + isWritingAligned = false; + } + + public void writeChunk(IChunkWriter chunkWriter) throws IOException { + boolean isAligned = chunkWriter instanceof AlignedChunkWriterImpl; + long beforeOffset = this.getPos(); + chunkWriter.writeToFileWriter(this); + long writtenDataSize = this.getPos() - beforeOffset; + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } + CompactionMetrics.getInstance() + .recordWriteInfo( + type, + isAligned ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, + writtenDataSize); + } + + @Override + public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException { + long beforeOffset = this.getPos(); + super.writeChunk(chunk, chunkMetadata); + long writtenDataSize = this.getPos() - beforeOffset; + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } + CompactionMetrics.getInstance() + .recordWriteInfo( + type, + isWritingAligned ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, + writtenDataSize); + } + + @Override + public void writeEmptyValueChunk( + String measurementId, + CompressionType compressionType, + TSDataType tsDataType, + TSEncoding encodingType, + Statistics<? extends Serializable> statistics) + throws IOException { + long beforeOffset = this.getPos(); + super.writeEmptyValueChunk( + measurementId, compressionType, tsDataType, encodingType, statistics); + long writtenDataSize = this.getPos() - beforeOffset; + CompactionMetrics.getInstance() + .recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize); + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } + } + + @Override + public int checkMetadataSizeAndMayFlush() throws IOException { + int size = super.checkMetadataSizeAndMayFlush(); + if (size > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(size); + } + CompactionMetrics.getInstance().recordWriteInfo(type, CompactionIoDataType.METADATA, size); + return size; + } + + @Override + public void endFile() throws IOException { + long beforeSize = this.getPos(); + super.endFile(); + long writtenDataSize = this.getPos() - beforeSize; + if (writtenDataSize > 0) { + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) writtenDataSize); + } + CompactionMetrics.getInstance() + .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java index 6dd13065941..baad46f2a0d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java @@ -252,16 +252,6 @@ public class CompactionTaskManager implements IService { mergeWriteRateLimiter.setRate(throughout); } } - /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */ - public static void mergeRateLimiterAcquire(RateLimiter limiter, long bytesLength) { - while (bytesLength >= Integer.MAX_VALUE) { - limiter.acquire(Integer.MAX_VALUE); - bytesLength -= Integer.MAX_VALUE; - } - if (bytesLength > 0) { - limiter.acquire((int) bytesLength); - } - } public synchronized void removeRunningTaskFuture(AbstractCompactionTask task) { String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), task.getDataRegionId()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java new file mode 100644 index 00000000000..be5f76eb290 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.schedule.constant; + +public enum CompactionIoDataType { + NOT_ALIGNED(0), + ALIGNED(1), + METADATA(2); + + int value; + + CompactionIoDataType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index de9a822facf..468619f8a69 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -1550,6 +1550,11 @@ public class DataRegion implements IDataRegionForQuery { tsFileResourceList.addAll(tsFileManager.getTsFileList(false)); tsFileResourceList.forEach( x -> { + FileMetrics.getInstance() + .deleteFile( + new long[] {x.getTsFileSize()}, + x.isSeq(), + Collections.singletonList(x.getTsFile().getName())); if (x.getModFile().exists()) { FileMetrics.getInstance().decreaseModFileNum(1); FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize()); diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java index d482be8c9ec..f80ff96e664 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java @@ -25,8 +25,8 @@ import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus; import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; @@ -44,7 +44,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class CompactionMetrics implements IMetricSet { - private static final List<String> TYPES = Arrays.asList("aligned", "not_aligned"); + private static final String NOT_ALIGNED = "not_aligned"; + private static final String ALIGNED = "aligned"; + private static final String METADATA = "metadata"; + private static final List<String> TYPES = Arrays.asList(ALIGNED, NOT_ALIGNED); private static final CompactionMetrics INSTANCE = new CompactionMetrics(); private long lastUpdateTime = 0L; private static final long UPDATE_INTERVAL = 10_000L; @@ -57,47 +60,58 @@ public class CompactionMetrics implements IMetricSet { private final AtomicInteger finishSeqInnerCompactionTaskNum = new AtomicInteger(0); private final AtomicInteger finishUnseqInnerCompactionTaskNum = new AtomicInteger(0); private final AtomicInteger finishCrossCompactionTaskNum = new AtomicInteger(0); + // compaction type -> Counter[ Not-Aligned, Aligned, Metadata] + private final Map<String, Counter[]> writeCounters = new ConcurrentHashMap<>(); + private final Map<String, Counter[]> readCounters = new ConcurrentHashMap<>(); private CompactionMetrics() { for (String type : TYPES) { - Map<CompactionType, Map<ProcessChunkType, Counter>> compactionTypeProcessChunkTypeMap = - writeInfoCounterMap.computeIfAbsent(type, k -> new ConcurrentHashMap<>()); - for (CompactionType compactionType : CompactionType.values()) { - Map<ProcessChunkType, Counter> counterMap = - compactionTypeProcessChunkTypeMap.computeIfAbsent( - compactionType, k -> new ConcurrentHashMap<>()); - for (ProcessChunkType processChunkType : ProcessChunkType.values()) { - counterMap.put(processChunkType, DoNothingMetricManager.DO_NOTHING_COUNTER); - } - } + readCounters.put( + type, + new Counter[] { + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER + }); + writeCounters.put( + type, + new Counter[] { + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER + }); } } - // region compaction write info - private Map<String, Map<CompactionType, Map<ProcessChunkType, Counter>>> writeInfoCounterMap = - new ConcurrentHashMap<>(); private Counter totalCompactionWriteInfoCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private void bindWriteInfo(AbstractMetricService metricService) { - for (String type : TYPES) { - Map<CompactionType, Map<ProcessChunkType, Counter>> compactionTypeProcessChunkTypeMap = - writeInfoCounterMap.computeIfAbsent(type, k -> new ConcurrentHashMap<>()); - for (CompactionType compactionType : CompactionType.values()) { - Map<ProcessChunkType, Counter> counterMap = - compactionTypeProcessChunkTypeMap.computeIfAbsent( - compactionType, k -> new ConcurrentHashMap<>()); - for (ProcessChunkType processChunkType : ProcessChunkType.values()) { - Counter counter = - metricService.getOrCreateCounter( - Metric.DATA_WRITTEN.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction_" + compactionType.toString(), - Tag.STATUS.toString(), - type + "_" + processChunkType.toString()); - counterMap.put(processChunkType, counter); - } - } + for (CompactionType compactionType : CompactionType.values()) { + writeCounters.put( + compactionType.toString(), + new Counter[] { + metricService.getOrCreateCounter( + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + NOT_ALIGNED), + metricService.getOrCreateCounter( + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + ALIGNED), + metricService.getOrCreateCounter( + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + METADATA) + }); } totalCompactionWriteInfoCounter = metricService.getOrCreateCounter( @@ -110,22 +124,28 @@ public class CompactionMetrics implements IMetricSet { } private void unbindWriteInfo(AbstractMetricService metricService) { - for (String type : TYPES) { - for (CompactionType compactionType : CompactionType.values()) { - for (ProcessChunkType processChunkType : ProcessChunkType.values()) { - metricService.remove( - MetricType.COUNTER, - Metric.DATA_WRITTEN.toString(), - Tag.NAME.toString(), - "compaction_" + compactionType.toString(), - Tag.STATUS.toString(), - type + "_" + processChunkType.toString()); - writeInfoCounterMap - .get(type) - .get(compactionType) - .put(processChunkType, DoNothingMetricManager.DO_NOTHING_COUNTER); - } - } + for (CompactionType compactionType : CompactionType.values()) { + metricService.remove( + MetricType.COUNTER, + Metric.DATA_WRITTEN.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + NOT_ALIGNED); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_WRITTEN.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + ALIGNED); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_WRITTEN.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + METADATA); } metricService.remove( MetricType.COUNTER, @@ -137,13 +157,12 @@ public class CompactionMetrics implements IMetricSet { } public void recordWriteInfo( - CompactionType compactionType, - ProcessChunkType processChunkType, - boolean aligned, - long byteNum) { - String type = aligned ? "aligned" : "not_aligned"; - writeInfoCounterMap.get(type).get(compactionType).get(processChunkType).inc(byteNum / 1024L); - totalCompactionWriteInfoCounter.inc(byteNum / 1024L); + CompactionType compactionType, CompactionIoDataType dataType, long byteNum) { + Counter[] counters = writeCounters.get(compactionType.toString()); + if (counters != null) { + counters[dataType.getValue()].inc(byteNum); + } + totalCompactionWriteInfoCounter.inc(byteNum); } // endregion @@ -152,18 +171,73 @@ public class CompactionMetrics implements IMetricSet { private Counter totalCompactionReadInfoCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private void bindReadInfo(AbstractMetricService metricService) { + for (CompactionType compactionType : CompactionType.values()) { + readCounters.put( + compactionType.toString(), + new Counter[] { + metricService.getOrCreateCounter( + Metric.DATA_READ.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + NOT_ALIGNED), + metricService.getOrCreateCounter( + Metric.DATA_READ.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + ALIGNED), + metricService.getOrCreateCounter( + Metric.DATA_READ.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + METADATA) + }); + } totalCompactionReadInfoCounter = metricService.getOrCreateCounter( Metric.DATA_READ.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), "compaction"); } private void unbindReadInfo(AbstractMetricService metricService) { + for (CompactionType compactionType : CompactionType.values()) { + metricService.remove( + MetricType.COUNTER, + Metric.DATA_READ.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + NOT_ALIGNED); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_READ.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + ALIGNED); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_READ.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + METADATA); + } metricService.remove( MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(), "compaction"); } - public void recordReadInfo(long byteNum) { - totalCompactionReadInfoCounter.inc(byteNum / 1024L); + public void recordReadInfo( + CompactionType compactionType, CompactionIoDataType dataType, long byteNum) { + Counter[] counters = readCounters.get(compactionType.toString()); + if (counters != null) { + counters[dataType.getValue()].inc(byteNum); + } + totalCompactionReadInfoCounter.inc(byteNum); } // endregion diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java index 1b82f2a512d..153d0388c2d 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java @@ -156,6 +156,7 @@ public class FileMetrics implements IMetricSet { FileMetrics::getModFileNum, Tag.NAME.toString(), "mods"); + checkIfThereRemainingData(); } private void bindWalFileMetrics(AbstractMetricService metricService) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java index 76609747c37..075c8da0eae 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java @@ -82,8 +82,7 @@ public class TsFileDeviceIterator implements Iterator<Pair<String, Boolean>> { try { // get the first measurement node of this device, to know if the device is aligned this.measurementNode = - MetadataIndexNode.deserializeFrom( - reader.readData(startEndPair.right[0], startEndPair.right[1])); + reader.readMetadataIndexNode(startEndPair.right[0], startEndPair.right[1]); boolean isAligned = reader.isAlignedDevice(measurementNode); currentDevice = new Pair<>(startEndPair.left, isAligned); return currentDevice; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 8e760edaef4..8a8516f7f0b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -2250,6 +2250,18 @@ public class TsFileSequenceReader implements AutoCloseable { } } + /** + * Read MetadataIndexNode by start and end offset. + * + * @param start the start offset of the MetadataIndexNode + * @param end the end offset of the MetadataIndexNode + * @return MetadataIndexNode + * @throws IOException IOException + */ + public MetadataIndexNode readMetadataIndexNode(long start, long end) throws IOException { + return MetadataIndexNode.deserializeFrom(readData(start, end)); + } + @Override public int hashCode() { return file.hashCode(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index e62aeb40c7e..ab4ea6198f9 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -631,7 +631,7 @@ public class TsFileIOWriter implements AutoCloseable { * * @throws IOException */ - public void checkMetadataSizeAndMayFlush() throws IOException { + public int checkMetadataSizeAndMayFlush() throws IOException { // This function should be called after all data of an aligned device has been written if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) { try { @@ -642,11 +642,13 @@ public class TsFileIOWriter implements AutoCloseable { chunkMetadataCount, currentChunkMetadataSize / chunkMetadataCount); } - sortAndFlushChunkMetadata(); + return sortAndFlushChunkMetadata(); } catch (IOException e) { logger.error("Meets exception when flushing metadata to temp file for {}", file, e); throw e; } + } else { + return 0; } } @@ -656,7 +658,8 @@ public class TsFileIOWriter implements AutoCloseable { * * @throws IOException */ - protected void sortAndFlushChunkMetadata() throws IOException { + protected int sortAndFlushChunkMetadata() throws IOException { + int writtenSize = 0; // group by series List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList = TSMIterator.sortChunkMetadata( @@ -673,7 +676,7 @@ public class TsFileIOWriter implements AutoCloseable { pathCount++; } List<IChunkMetadata> iChunkMetadataList = pair.right; - writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath); + writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath); lastSerializePath = seriesPath; logger.debug("Flushing {}", seriesPath); } @@ -684,11 +687,13 @@ public class TsFileIOWriter implements AutoCloseable { } chunkMetadataCount = 0; currentChunkMetadataSize = 0; + return writtenSize; } - private void writeChunkMetadataToTempFile( + private int writeChunkMetadataToTempFile( List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath) throws IOException { + int writtenSize = 0; // [DeviceId] measurementId datatype size chunkMetadataBuffer if (lastSerializePath == null || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) { @@ -696,20 +701,25 @@ public class TsFileIOWriter implements AutoCloseable { endPosInCMTForDevice.add(tempOutput.getPosition()); // serialize the device // for each device, we only serialize it once, in order to save io - ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream()); + writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream()); } if (isNewPath && iChunkMetadataList.size() > 0) { // serialize the public info of this measurement - ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream()); - ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream()); + writtenSize += + ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream()); + writtenSize += + ReadWriteIOUtils.write( + iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream()); } PublicBAOS buffer = new PublicBAOS(); int totalSize = 0; for (IChunkMetadata chunkMetadata : iChunkMetadataList) { totalSize += chunkMetadata.serializeTo(buffer, true); } - ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream()); + writtenSize += ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream()); buffer.writeTo(tempOutput); + writtenSize += buffer.size(); + return writtenSize; } public String getCurrentChunkGroupDeviceId() {
