This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4ca4219d2c83ab2fed785b0dfcc17a21611dd64f Author: Liu Xuxin <[email protected]> AuthorDate: Mon Jun 5 21:04:52 2023 +0800 refactor compaction metrics --- .../readchunk/AlignedSeriesCompactionExecutor.java | 15 +-- .../readchunk/SingleSeriesCompactionExecutor.java | 19 +-- .../utils/writer/AbstractCompactionWriter.java | 2 - .../compaction/utils/CompactionTsFileWriter.java | 46 +++++++ .../db/service/metrics/CompactionMetrics.java | 146 +++++++++++++-------- 5 files changed, 147 insertions(+), 81 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index 6690d133ac8..c529ca05208 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.header.ChunkHeader; @@ -142,7 +141,9 @@ public class AlignedSeriesCompactionExecutor { readerIterator.nextReader(); summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum()); summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum()); - CompactionMetrics.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize()); + CompactionMetrics.getInstance() + .recordReadInfo( + CompactionType.INNER_SEQ_COMPACTION, true, nextAlignedChunkInfo.getTotalSize()); compactOneAlignedChunk( nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum()); } @@ -153,10 +154,7 @@ public class AlignedSeriesCompactionExecutor { rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetrics.getInstance() .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - true, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionType.INNER_SEQ_COMPACTION, true, chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(writer); } writer.checkMetadataSizeAndMayFlush(); @@ -195,10 +193,7 @@ public class AlignedSeriesCompactionExecutor { rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetrics.getInstance() .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - true, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionType.INNER_SEQ_COMPACTION, true, chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(writer); remainingPointInChunkWriter = 0L; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index 47d80ccf4ea..11a68cdecf0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -24,7 +24,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.header.ChunkHeader; @@ -131,6 +130,8 @@ public class SingleSeriesCompactionExecutor { } CompactionMetrics.getInstance() .recordReadInfo( + CompactionType.INNER_SEQ_COMPACTION, + false, (long) currentChunk.getHeader().getSerializedSize() + currentChunk.getHeader().getDataSize()); @@ -337,11 +338,7 @@ public class SingleSeriesCompactionExecutor { maxEndTimestamp = chunkMetadata.getEndTime(); } CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK, - false, - getChunkSize(chunk)); + .recordWriteInfo(CompactionType.INNER_SEQ_COMPACTION, false, getChunkSize(chunk)); fileWriter.writeChunk(chunk, chunkMetadata); } @@ -352,10 +349,7 @@ public class SingleSeriesCompactionExecutor { compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetrics.getInstance() .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - false, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionType.INNER_SEQ_COMPACTION, false, chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(fileWriter); pointCountInChunkWriter = 0L; } @@ -375,10 +369,7 @@ public class SingleSeriesCompactionExecutor { compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); CompactionMetrics.getInstance() .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - false, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionType.INNER_SEQ_COMPACTION, false, chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(fileWriter); pointCountInChunkWriter = 0L; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 1f4b13b9a14..37b4c731226 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.PageException; @@ -305,7 +304,6 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { isCrossSpace ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, isAlign, iChunkWriter.estimateMaxSeriesMemSize()); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java new file mode 100644 index 00000000000..eaa3dd3d0d0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.utils; + +import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.service.metrics.CompactionMetrics; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.File; +import java.io.IOException; + +public class CompactionTsFileWriter extends TsFileIOWriter { + public CompactionTsFileWriter(File file, boolean enableMemoryControl, long maxMetadataSize) + throws IOException { + super(file, enableMemoryControl, maxMetadataSize); + } + + public void writeChunk( + CompactionType type, Chunk chunk, ChunkMetadata chunkMetadata, boolean aligned) + throws IOException { + int dataSize = chunk.getHeader().getSerializedSize() + chunk.getHeader().getDataSize(); + CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(dataSize); + super.writeChunk(chunk, chunkMetadata); + CompactionMetrics.getInstance().recordWriteInfo(type, aligned, dataSize); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java index d482be8c9ec..082ea55b956 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java @@ -57,19 +57,22 @@ public class CompactionMetrics implements IMetricSet { private final AtomicInteger finishSeqInnerCompactionTaskNum = new AtomicInteger(0); private final AtomicInteger finishUnseqInnerCompactionTaskNum = new AtomicInteger(0); private final AtomicInteger finishCrossCompactionTaskNum = new AtomicInteger(0); + // compaction type -> Counter[ Not-Aligned, Aligned] + private final Map<String, Counter[]> writeCounters = new ConcurrentHashMap<>(); + private final Map<String, Counter[]> readCounters = new ConcurrentHashMap<>(); private CompactionMetrics() { for (String type : TYPES) { - Map<CompactionType, Map<ProcessChunkType, Counter>> compactionTypeProcessChunkTypeMap = - writeInfoCounterMap.computeIfAbsent(type, k -> new ConcurrentHashMap<>()); - for (CompactionType compactionType : CompactionType.values()) { - Map<ProcessChunkType, Counter> counterMap = - compactionTypeProcessChunkTypeMap.computeIfAbsent( - compactionType, k -> new ConcurrentHashMap<>()); - for (ProcessChunkType processChunkType : ProcessChunkType.values()) { - counterMap.put(processChunkType, DoNothingMetricManager.DO_NOTHING_COUNTER); - } - } + readCounters.put( + type, + new Counter[] { + DoNothingMetricManager.DO_NOTHING_COUNTER, DoNothingMetricManager.DO_NOTHING_COUNTER + }); + writeCounters.put( + type, + new Counter[] { + DoNothingMetricManager.DO_NOTHING_COUNTER, DoNothingMetricManager.DO_NOTHING_COUNTER + }); } } @@ -79,25 +82,25 @@ public class CompactionMetrics implements IMetricSet { private Counter totalCompactionWriteInfoCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private void bindWriteInfo(AbstractMetricService metricService) { - for (String type : TYPES) { - Map<CompactionType, Map<ProcessChunkType, Counter>> compactionTypeProcessChunkTypeMap = - writeInfoCounterMap.computeIfAbsent(type, k -> new ConcurrentHashMap<>()); - for (CompactionType compactionType : CompactionType.values()) { - Map<ProcessChunkType, Counter> counterMap = - compactionTypeProcessChunkTypeMap.computeIfAbsent( - compactionType, k -> new ConcurrentHashMap<>()); - for (ProcessChunkType processChunkType : ProcessChunkType.values()) { - Counter counter = - metricService.getOrCreateCounter( - Metric.DATA_WRITTEN.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction_" + compactionType.toString(), - Tag.STATUS.toString(), - type + "_" + processChunkType.toString()); - counterMap.put(processChunkType, counter); - } - } + for (CompactionType compactionType : CompactionType.values()) { + writeCounters.put( + compactionType.toString(), + new Counter[] { + metricService.getOrCreateCounter( + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "not_aligned"), + metricService.getOrCreateCounter( + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "aligned") + }); } totalCompactionWriteInfoCounter = metricService.getOrCreateCounter( @@ -110,22 +113,21 @@ public class CompactionMetrics implements IMetricSet { } private void unbindWriteInfo(AbstractMetricService metricService) { - for (String type : TYPES) { - for (CompactionType compactionType : CompactionType.values()) { - for (ProcessChunkType processChunkType : ProcessChunkType.values()) { - metricService.remove( - MetricType.COUNTER, - Metric.DATA_WRITTEN.toString(), - Tag.NAME.toString(), - "compaction_" + compactionType.toString(), - Tag.STATUS.toString(), - type + "_" + processChunkType.toString()); - writeInfoCounterMap - .get(type) - .get(compactionType) - .put(processChunkType, DoNothingMetricManager.DO_NOTHING_COUNTER); - } - } + for (CompactionType compactionType : CompactionType.values()) { + metricService.remove( + MetricType.COUNTER, + Metric.DATA_WRITTEN.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "not_aligned"); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_WRITTEN.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "aligned"); } metricService.remove( MetricType.COUNTER, @@ -136,14 +138,10 @@ public class CompactionMetrics implements IMetricSet { "total"); } - public void recordWriteInfo( - CompactionType compactionType, - ProcessChunkType processChunkType, - boolean aligned, - long byteNum) { - String type = aligned ? "aligned" : "not_aligned"; - writeInfoCounterMap.get(type).get(compactionType).get(processChunkType).inc(byteNum / 1024L); - totalCompactionWriteInfoCounter.inc(byteNum / 1024L); + public void recordWriteInfo(CompactionType compactionType, boolean aligned, long byteNum) { + Counter[] counters = writeCounters.get(compactionType.toString()); + counters[aligned ? 1 : 0].inc(byteNum); + totalCompactionWriteInfoCounter.inc(byteNum); } // endregion @@ -152,18 +150,56 @@ public class CompactionMetrics implements IMetricSet { private Counter totalCompactionReadInfoCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private void bindReadInfo(AbstractMetricService metricService) { + for (CompactionType compactionType : CompactionType.values()) { + readCounters.put( + compactionType.toString(), + new Counter[] { + metricService.getOrCreateCounter( + Metric.DATA_READ.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "not_aligned"), + metricService.getOrCreateCounter( + Metric.DATA_READ.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "aligned") + }); + } totalCompactionReadInfoCounter = metricService.getOrCreateCounter( Metric.DATA_READ.toString(), MetricLevel.IMPORTANT, Tag.NAME.toString(), "compaction"); } private void unbindReadInfo(AbstractMetricService metricService) { + for (CompactionType compactionType : CompactionType.values()) { + metricService.remove( + MetricType.COUNTER, + Metric.DATA_READ.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "not_aligned"); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_READ.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "aligned"); + } metricService.remove( MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(), "compaction"); } - public void recordReadInfo(long byteNum) { - totalCompactionReadInfoCounter.inc(byteNum / 1024L); + public void recordReadInfo(CompactionType compactionType, boolean aligned, long byteNum) { + Counter[] counters = readCounters.get(compactionType.toString()); + counters[aligned ? 1 : 0].inc(byteNum); + totalCompactionReadInfoCounter.inc(byteNum); } // endregion
