This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b8090e8382a2c0a15503dd5d8e2645f6e3896d16 Author: Liu Xuxin <[email protected]> AuthorDate: Thu Jun 8 10:26:59 2023 +0800 refactor read process --- .../readchunk/AlignedSeriesCompactionExecutor.java | 14 +++++++++----- .../readchunk/SingleSeriesCompactionExecutor.java | 8 -------- .../utils/writer/AbstractCompactionWriter.java | 4 ++-- .../compaction/io/CompactionTsFileReader.java | 21 +++++++++++++++++++-- .../compaction/io/CompactionTsFileWriter.java | 11 ++++++----- ...ittenDataType.java => CompactionIoDataType.java} | 4 ++-- .../iotdb/db/service/metrics/CompactionMetrics.java | 9 +++++---- 7 files changed, 43 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index a4df7677b29..849c3ef5742 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -20,11 +20,10 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader; import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -134,6 +133,11 @@ public class AlignedSeriesCompactionExecutor { readerAndChunkMetadataList.removeFirst(); TsFileSequenceReader reader = readerListPair.left; List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right; + + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markStartAlignedSeries(); + } + TsFileAlignedSeriesReaderIterator readerIterator = new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, schemaList); while (readerIterator.hasNext()) { @@ -141,12 +145,12 @@ public class AlignedSeriesCompactionExecutor { readerIterator.nextReader(); summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum()); summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum()); - CompactionMetrics.getInstance() - .recordReadInfo( - CompactionType.INNER_SEQ_COMPACTION, true, nextAlignedChunkInfo.getTotalSize()); compactOneAlignedChunk( nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum()); } + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markEndAlignedSeries(); + } } if (remainingPointInChunkWriter != 0L) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index 454564f3643..689f13aac20 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -24,9 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -128,12 +126,6 @@ public class SingleSeriesCompactionExecutor { if (this.chunkWriter == null) { constructChunkWriterFromReadChunk(currentChunk); } - CompactionMetrics.getInstance() - .recordReadInfo( - CompactionType.INNER_SEQ_COMPACTION, - false, - (long) currentChunk.getHeader().getSerializedSize() - + currentChunk.getHeader().getDataSize()); // if this chunk is modified, deserialize it into points if (chunkMetadata.getDeleteIntervalList() != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 95e1f3a7e4c..75e3c76b9af 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -21,8 +21,8 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.PageException; @@ -309,7 +309,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { isCrossSpace ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, - isAlign ? WrittenDataType.ALIGNED : WrittenDataType.NOT_ALIGNED, + isAlign ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, iChunkWriter.estimateMaxSeriesMemSize()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java index fcc752108da..91a9c6d2108 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.engine.compaction.io; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileDeviceIterator; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -32,6 +34,8 @@ public class CompactionTsFileReader extends TsFileSequenceReader { long readDataSize = 0L; CompactionType compactionType; + boolean readingAlignedSeries = false; + public CompactionTsFileReader(String file, CompactionType compactionType) throws IOException { super(file); this.compactionType = compactionType; @@ -44,12 +48,24 @@ public class CompactionTsFileReader extends TsFileSequenceReader { return buffer; } + public void markStartAlignedSeries() { + readingAlignedSeries = true; + } + + public void markEndAlignedSeries() { + readingAlignedSeries = false; + } + @Override public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { long before = readDataSize; Chunk chunk = super.readMemChunk(metaData); long dataSize = readDataSize - before; - // TODO: record metrics size + CompactionMetrics.getInstance() + .recordReadInfo( + compactionType, + readingAlignedSeries ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, + dataSize); return chunk; } @@ -58,7 +74,8 @@ public class CompactionTsFileReader extends TsFileSequenceReader { long before = readDataSize; TsFileDeviceIterator iterator = super.getAllDevicesIteratorWithIsAligned(); long dataSize = readDataSize - before; - // TODO: record metrics size + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); return iterator; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java index a74405ee839..f86f7a4391f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java @@ -20,8 +20,8 @@ package org.apache.iotdb.db.engine.compaction.io; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -56,7 +56,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter { super.writeChunk(chunk, chunkMetadata); long writtenDataSize = this.getPos() - beforeOffset; CompactionMetrics.getInstance() - .recordWriteInfo(type, WrittenDataType.NOT_ALIGNED, writtenDataSize); + .recordWriteInfo(type, CompactionIoDataType.NOT_ALIGNED, writtenDataSize); } @Override @@ -71,7 +71,8 @@ public class CompactionTsFileWriter extends TsFileIOWriter { super.writeEmptyValueChunk( measurementId, compressionType, tsDataType, encodingType, statistics); long writtenDataSize = this.getPos() - beforeOffset; - CompactionMetrics.getInstance().recordWriteInfo(type, WrittenDataType.ALIGNED, writtenDataSize); + CompactionMetrics.getInstance() + .recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize); } public void writeChunk(IChunkWriter chunkWriter) throws IOException { @@ -82,7 +83,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter { CompactionMetrics.getInstance() .recordWriteInfo( type, - isAligned ? WrittenDataType.ALIGNED : WrittenDataType.NOT_ALIGNED, + isAligned ? CompactionIoDataType.ALIGNED : CompactionIoDataType.NOT_ALIGNED, writtenDataSize); } @@ -92,6 +93,6 @@ public class CompactionTsFileWriter extends TsFileIOWriter { super.endFile(); long writtenDataSize = this.getPos() - beforeSize; CompactionMetrics.getInstance() - .recordWriteInfo(type, WrittenDataType.METADATA, writtenDataSize); + .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java similarity index 93% rename from server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java index 6c522026bc2..be5f76eb290 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/CompactionIoDataType.java @@ -19,14 +19,14 @@ package org.apache.iotdb.db.engine.compaction.schedule.constant; -public enum WrittenDataType { +public enum CompactionIoDataType { NOT_ALIGNED(0), ALIGNED(1), METADATA(2); int value; - WrittenDataType(int value) { + CompactionIoDataType(int value) { this.value = value; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java index 9123306137f..a5ca2242e2c 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java @@ -25,9 +25,9 @@ import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskStatus; import org.apache.iotdb.db.engine.compaction.constant.CompactionTaskType; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; @@ -158,7 +158,7 @@ public class CompactionMetrics implements IMetricSet { } public void recordWriteInfo( - CompactionType compactionType, WrittenDataType dataType, long byteNum) { + CompactionType compactionType, CompactionIoDataType dataType, long byteNum) { Counter[] counters = writeCounters.get(compactionType.toString()); counters[dataType.getValue()].inc(byteNum); totalCompactionWriteInfoCounter.inc(byteNum); @@ -230,9 +230,10 @@ public class CompactionMetrics implements IMetricSet { MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(), "compaction"); } - public void recordReadInfo(CompactionType compactionType, boolean aligned, long byteNum) { + public void recordReadInfo( + CompactionType compactionType, CompactionIoDataType dataType, long byteNum) { Counter[] counters = readCounters.get(compactionType.toString()); - counters[aligned ? 1 : 0].inc(byteNum); + counters[dataType.getValue()].inc(byteNum); totalCompactionReadInfoCounter.inc(byteNum); } // endregion
