This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5e0ac213b59c5272a9fec402eb74c667ba951086 Author: Liu Xuxin <[email protected]> AuthorDate: Tue Jun 6 15:45:20 2023 +0800 refactor the write process of compaction --- .../impl/ReadChunkCompactionPerformer.java | 15 +++-- .../readchunk/AlignedSeriesCompactionExecutor.java | 20 ++---- .../readchunk/SingleSeriesCompactionExecutor.java | 24 ++----- .../utils/writer/AbstractCompactionWriter.java | 3 +- .../compaction/io/CompactionTsFileReader.java | 30 +++++++++ .../compaction/io/CompactionTsFileWriter.java | 77 ++++++++++++++++++++++ .../schedule/constant/WrittenDataType.java | 36 ++++++++++ .../compaction/utils/CompactionTsFileWriter.java | 46 ------------- .../db/service/metrics/CompactionMetrics.java | 48 ++++++++++++-- 9 files changed, 207 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index da0c9c91dd8..fff1aec7e6e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -27,6 +27,8 @@ import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.execute.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.AlignedSeriesCompactionExecutor; import org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.rescon.SystemInfo; @@ -34,7 +36,6 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,8 +72,12 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() * IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion()); try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles); - TsFileIOWriter writer = - new TsFileIOWriter(targetResource.getTsFile(), true, sizeForFileWriter)) { + CompactionTsFileWriter writer = + new CompactionTsFileWriter( + targetResource.getTsFile(), + true, + sizeForFileWriter, + CompactionType.INNER_SEQ_COMPACTION)) { while (deviceIterator.hasNextDevice()) { Pair<String, Boolean> deviceInfo = deviceIterator.nextDevice(); String device = deviceInfo.left; @@ -113,7 +118,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { private void compactAlignedSeries( String device, TsFileResource targetResource, - TsFileIOWriter writer, + CompactionTsFileWriter writer, MultiTsFileDeviceIterator deviceIterator) throws IOException, InterruptedException { checkThreadInterrupted(); @@ -153,7 +158,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { private void compactNotAlignedSeries( String device, TsFileResource targetResource, - TsFileIOWriter writer, + CompactionTsFileWriter writer, MultiTsFileDeviceIterator deviceIterator) throws IOException, MetadataException, InterruptedException { writer.startChunkGroup(device); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index c529ca05208..a4df7677b29 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -38,7 +39,6 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import com.google.common.util.concurrent.RateLimiter; @@ -55,7 +55,7 @@ public class AlignedSeriesCompactionExecutor { private final LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList; private final TsFileResource targetResource; - private final TsFileIOWriter writer; + private final CompactionTsFileWriter writer; private final AlignedChunkWriterImpl chunkWriter; private final List<IMeasurementSchema> schemaList; @@ -73,7 +73,7 @@ public class AlignedSeriesCompactionExecutor { String device, TsFileResource targetResource, LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList, - TsFileIOWriter writer, + CompactionTsFileWriter writer, CompactionTaskSummary summary) throws IOException { this.device = device; @@ -150,12 +150,7 @@ public class AlignedSeriesCompactionExecutor { } if (remainingPointInChunkWriter != 0L) { - CompactionTaskManager.mergeRateLimiterAcquire( - rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, true, chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(writer); + writer.writeChunk(chunkWriter); } writer.checkMetadataSizeAndMayFlush(); } @@ -189,12 +184,7 @@ public class AlignedSeriesCompactionExecutor { private void flushChunkWriterIfLargeEnough() throws IOException { if (remainingPointInChunkWriter >= chunkPointNumThreshold || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) { - CompactionTaskManager.mergeRateLimiterAcquire( - rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, true, chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(writer); + writer.writeChunk(chunkWriter); remainingPointInChunkWriter = 0L; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index 11a68cdecf0..454564f3643 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -38,7 +39,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import com.google.common.util.concurrent.RateLimiter; @@ -51,7 +51,7 @@ public class SingleSeriesCompactionExecutor { private String device; private PartialPath series; private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList; - private TsFileIOWriter fileWriter; + private CompactionTsFileWriter fileWriter; private TsFileResource targetResource; private IMeasurementSchema schema; @@ -79,7 +79,7 @@ public class SingleSeriesCompactionExecutor { PartialPath series, IMeasurementSchema measurementSchema, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, - TsFileIOWriter fileWriter, + CompactionTsFileWriter fileWriter, TsFileResource targetResource) { this.device = series.getDevice(); this.series = series; @@ -96,7 +96,7 @@ public class SingleSeriesCompactionExecutor { public SingleSeriesCompactionExecutor( PartialPath series, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, - TsFileIOWriter fileWriter, + CompactionTsFileWriter fileWriter, TsFileResource targetResource, CompactionTaskSummary summary) { this.device = series.getDevice(); @@ -337,20 +337,13 @@ public class SingleSeriesCompactionExecutor { if (chunkMetadata.getEndTime() > maxEndTimestamp) { maxEndTimestamp = chunkMetadata.getEndTime(); } - CompactionMetrics.getInstance() - .recordWriteInfo(CompactionType.INNER_SEQ_COMPACTION, false, getChunkSize(chunk)); fileWriter.writeChunk(chunk, chunkMetadata); } private void flushChunkWriterIfLargeEnough() throws IOException { if (pointCountInChunkWriter >= targetChunkPointNum || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, false, chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriter); + fileWriter.writeChunk(chunkWriter); pointCountInChunkWriter = 0L; } } @@ -365,12 +358,7 @@ public class SingleSeriesCompactionExecutor { } private void flushChunkWriter() throws IOException { - CompactionTaskManager.mergeRateLimiterAcquire( - compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetrics.getInstance() - .recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, false, chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriter); + fileWriter.writeChunk(chunkWriter); pointCountInChunkWriter = 0L; } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 37b4c731226..9a26c4de327 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.PageException; @@ -304,7 +305,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { isCrossSpace ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, - isAlign, + isAlign ? WrittenDataType.ALIGNED : WrittenDataType.NOT_ALIGNED, iChunkWriter.estimateMaxSeriesMemSize()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java new file mode 100644 index 00000000000..7c9e06789d3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.io; + +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; + +import java.io.IOException; + +public class CompactionTsFileReader extends TsFileSequenceReader { + public CompactionTsFileReader(String file) throws IOException { + super(file); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java new file mode 100644 index 00000000000..62f3b65a0a2 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.io; + +import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; +import org.apache.iotdb.db.service.metrics.CompactionMetrics; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import java.io.File; +import java.io.IOException; + +public class CompactionTsFileWriter extends TsFileIOWriter { + CompactionType type; + + public CompactionTsFileWriter( + File file, boolean enableMemoryControl, long maxMetadataSize, CompactionType type) + throws IOException { + super(file, enableMemoryControl, maxMetadataSize); + this.type = type; + } + + @Override + public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException { + long beforeOffset = this.getPos(); + CompactionTaskManager.getInstance() + .getMergeWriteRateLimiter() + .acquire(chunk.getHeader().getDataSize() + chunk.getHeader().getSerializedSize()); + super.writeChunk(chunk, chunkMetadata); + long writtenDataSize = this.getPos() - beforeOffset; + CompactionMetrics.getInstance() + .recordWriteInfo(type, WrittenDataType.NOT_ALIGNED, writtenDataSize); + } + + public void writeChunk(IChunkWriter chunkWriter) throws IOException { + boolean isAligned = chunkWriter instanceof AlignedChunkWriterImpl; + long beforeOffset = this.getPos(); + chunkWriter.writeToFileWriter(this); + long writtenDataSize = this.getPos() - beforeOffset; + CompactionMetrics.getInstance() + .recordWriteInfo( + type, + isAligned ? WrittenDataType.ALIGNED : WrittenDataType.NOT_ALIGNED, + writtenDataSize); + } + + @Override + public void endFile() throws IOException { + long beforeSize = this.getPos(); + super.endFile(); + long writtenDataSize = this.getPos() - beforeSize; + CompactionMetrics.getInstance() + .recordWriteInfo(type, WrittenDataType.METADATA, writtenDataSize); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java new file mode 100644 index 00000000000..6c522026bc2 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/constant/WrittenDataType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.schedule.constant; + +public enum WrittenDataType { + NOT_ALIGNED(0), + ALIGNED(1), + METADATA(2); + + int value; + + WrittenDataType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java deleted file mode 100644 index eaa3dd3d0d0..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionTsFileWriter.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.engine.compaction.utils; - -import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.service.metrics.CompactionMetrics; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -import org.apache.iotdb.tsfile.read.common.Chunk; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - -import java.io.File; -import java.io.IOException; - -public class CompactionTsFileWriter extends TsFileIOWriter { - public CompactionTsFileWriter(File file, boolean enableMemoryControl, long maxMetadataSize) - throws IOException { - super(file, enableMemoryControl, maxMetadataSize); - } - - public void writeChunk( - CompactionType type, Chunk chunk, ChunkMetadata chunkMetadata, boolean aligned) - throws IOException { - int dataSize = chunk.getHeader().getSerializedSize() + chunk.getHeader().getDataSize(); - CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(dataSize); - super.writeChunk(chunk, chunkMetadata); - CompactionMetrics.getInstance().recordWriteInfo(type, aligned, dataSize); - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java index 082ea55b956..9123306137f 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; +import org.apache.iotdb.db.engine.compaction.schedule.constant.WrittenDataType; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; @@ -57,7 +58,7 @@ public class CompactionMetrics implements IMetricSet { private final AtomicInteger finishSeqInnerCompactionTaskNum = new AtomicInteger(0); private final AtomicInteger finishUnseqInnerCompactionTaskNum = new AtomicInteger(0); private final AtomicInteger finishCrossCompactionTaskNum = new AtomicInteger(0); - // compaction type -> Counter[ Not-Aligned, Aligned] + // compaction type -> Counter[ Not-Aligned, Aligned, Metadata] private final Map<String, Counter[]> writeCounters = new ConcurrentHashMap<>(); private final Map<String, Counter[]> readCounters = new ConcurrentHashMap<>(); @@ -66,12 +67,16 @@ public class CompactionMetrics implements IMetricSet { readCounters.put( type, new Counter[] { - DoNothingMetricManager.DO_NOTHING_COUNTER, DoNothingMetricManager.DO_NOTHING_COUNTER + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER }); writeCounters.put( type, new Counter[] { - DoNothingMetricManager.DO_NOTHING_COUNTER, DoNothingMetricManager.DO_NOTHING_COUNTER + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER, + DoNothingMetricManager.DO_NOTHING_COUNTER }); } } @@ -99,7 +104,14 @@ public class CompactionMetrics implements IMetricSet { Tag.TYPE.toString(), compactionType.toString(), Tag.NAME.toString(), - "aligned") + "aligned"), + metricService.getOrCreateCounter( + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "metadata") }); } totalCompactionWriteInfoCounter = @@ -128,6 +140,13 @@ public class CompactionMetrics implements IMetricSet { compactionType.toString(), Tag.NAME.toString(), "aligned"); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_WRITTEN.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "metadata"); } metricService.remove( MetricType.COUNTER, @@ -138,9 +157,10 @@ public class CompactionMetrics implements IMetricSet { "total"); } - public void recordWriteInfo(CompactionType compactionType, boolean aligned, long byteNum) { + public void recordWriteInfo( + CompactionType compactionType, WrittenDataType dataType, long byteNum) { Counter[] counters = writeCounters.get(compactionType.toString()); - counters[aligned ? 1 : 0].inc(byteNum); + counters[dataType.getValue()].inc(byteNum); totalCompactionWriteInfoCounter.inc(byteNum); } @@ -167,7 +187,14 @@ public class CompactionMetrics implements IMetricSet { Tag.TYPE.toString(), compactionType.toString(), Tag.NAME.toString(), - "aligned") + "aligned"), + metricService.getOrCreateCounter( + Metric.DATA_READ.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "metadata") }); } totalCompactionReadInfoCounter = @@ -191,6 +218,13 @@ public class CompactionMetrics implements IMetricSet { compactionType.toString(), Tag.NAME.toString(), "aligned"); + metricService.remove( + MetricType.COUNTER, + Metric.DATA_READ.toString(), + Tag.TYPE.toString(), + compactionType.toString(), + Tag.NAME.toString(), + "metadata"); } metricService.remove( MetricType.COUNTER, Metric.DATA_READ.toString(), Tag.NAME.toString(), "compaction");
