This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55bcfeb439f09f99b69d66d9686c8de7d57ac873 Author: Liu Xuxin <[email protected]> AuthorDate: Tue Jun 6 22:49:53 2023 +0800 refactor the read process --- .../impl/ReadChunkCompactionPerformer.java | 6 ---- .../execute/utils/MultiTsFileDeviceIterator.java | 13 ++++++-- .../utils/writer/AbstractCompactionWriter.java | 9 ++++-- .../writer/AbstractCrossCompactionWriter.java | 1 - .../writer/AbstractInnerCompactionWriter.java | 1 - .../compaction/io/CompactionTsFileReader.java | 36 +++++++++++++++++++++- .../compaction/io/CompactionTsFileWriter.java | 3 +- 7 files changed, 53 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index fff1aec7e6e..27d4d48afc7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -18,7 +18,6 @@ */ package org.apache.iotdb.db.engine.compaction.execute.performer.impl; -import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -37,16 +36,11 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.LinkedList; import java.util.List; public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { - private static final Logger LOGGER = - LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private TsFileResource targetResource; private List<TsFileResource> seqFiles; private CompactionTaskSummary summary; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java index 7e2484e0d71..0cb49de129e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -70,7 +72,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); try { for (TsFileResource tsFileResource : this.tsFileResourcesSortedByDesc) { - TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath()); + CompactionTsFileReader reader = + new CompactionTsFileReader( + tsFileResource.getTsFilePath(), CompactionType.INNER_SEQ_COMPACTION); readerMap.put(tsFileResource, reader); deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned()); } @@ -112,8 +116,13 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { Collections.sort( this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc); this.readerMap = readerMap; + CompactionType type = + seqResources.size() > 0 && unseqResources.size() > 0 + ? CompactionType.CROSS_COMPACTION + : CompactionType.INNER_UNSEQ_COMPACTION; for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) { - TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath()); + TsFileSequenceReader reader = + new CompactionTsFileReader(tsFileResource.getTsFilePath(), type); readerMap.put(tsFileResource, reader); deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned()); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 6297b3b8ef5..95e1f3a7e4c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -40,7 +40,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import com.google.common.util.concurrent.RateLimiter; @@ -167,7 +166,8 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } } - protected void sealChunk(CompactionTsFileWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) + protected void sealChunk( + CompactionTsFileWriter targetWriter, IChunkWriter iChunkWriter, int subTaskId) throws IOException { CompactionTaskManager.mergeRateLimiterAcquire( compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize()); @@ -293,7 +293,10 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } protected void checkChunkSizeAndMayOpenANewChunk( - CompactionTsFileWriter fileWriter, IChunkWriter iChunkWriter, int subTaskId, boolean isCrossSpace) + CompactionTsFileWriter fileWriter, + IChunkWriter iChunkWriter, + int subTaskId, + boolean isCrossSpace) throws IOException { if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) { // if chunk point num reaches the check point, then check if the chunk size over threshold diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index 975377264e8..334d994eb95 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -31,7 +31,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; import java.util.ArrayList; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index b22d8d8689e..cd764f82452 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -28,7 +28,6 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java index 7c9e06789d3..fcc752108da 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java @@ -19,12 +19,46 @@ package org.apache.iotdb.db.engine.compaction.io; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.read.TsFileDeviceIterator; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; import java.io.IOException; +import java.nio.ByteBuffer; public class CompactionTsFileReader extends TsFileSequenceReader { - public CompactionTsFileReader(String file) throws IOException { + long readDataSize = 0L; + CompactionType compactionType; + + public CompactionTsFileReader(String file, CompactionType compactionType) throws IOException { super(file); + this.compactionType = compactionType; + } + + @Override + protected ByteBuffer readData(long position, int totalSize) throws IOException { + ByteBuffer buffer = super.readData(position, totalSize); + readDataSize += totalSize; + return buffer; + } + + @Override + public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { + long before = readDataSize; + Chunk chunk = super.readMemChunk(metaData); + long dataSize = readDataSize - before; + // TODO: record metrics size + return chunk; + } + + @Override + public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws IOException { + long before = readDataSize; + TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned(); + long dataSize = readDataSize - before; + // TODO: record metrics size + return iterator; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java index f23adc332d9..a74405ee839 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java @@ -71,8 +71,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter { super.writeEmptyValueChunk( measurementId, compressionType, tsDataType, encodingType, statistics); long writtenDataSize = this.getPos() - beforeOffset; - CompactionMetrics.getInstance() - .recordWriteInfo(type, WrittenDataType.ALIGNED, writtenDataSize); + CompactionMetrics.getInstance().recordWriteInfo(type, WrittenDataType.ALIGNED, writtenDataSize); } public void writeChunk(IChunkWriter chunkWriter) throws IOException {
