This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-5470 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0c722e45c36f4c7f639f9a4072dfb441d784eef5 Author: Liu Xuxin <[email protected]> AuthorDate: Fri Feb 10 10:06:18 2023 +0800 cherry pick IOTDB-5470 and fix conflict --- .../execute/task/AbstractCompactionTask.java | 14 ++ .../execute/task/CrossSpaceCompactionTask.java | 6 +- .../execute/task/InnerSpaceCompactionTask.java | 6 +- .../readchunk/AlignedSeriesCompactionExecutor.java | 26 +-- .../readchunk/SingleSeriesCompactionExecutor.java | 42 ++-- .../utils/writer/AbstractCompactionWriter.java | 15 +- .../compaction/schedule/CompactionTaskManager.java | 15 +- .../compaction/schedule/CompactionWorker.java | 7 +- .../db/service/metrics/CompactionMetrics.java | 232 +++++++++++++++++++++ .../db/service/metrics/DataNodeMetricsHelper.java | 1 + .../metrics/recorder/CompactionMetricsManager.java | 225 ++++++++++++++++++++ .../recorder/CompactionMetricsRecorder.java | 190 ----------------- 12 files changed, 535 insertions(+), 244 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java index 0583811d6e..482194705d 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.storagegroup.TsFileManager; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,8 @@ public abstract class AbstractCompactionTask { protected int hashCode = -1; protected CompactionTaskSummary summary; protected long serialId; + protected boolean crossTask; + protected boolean innerSeqTask; public AbstractCompactionTask( String storageGroupName, @@ -71,6 +74,7 @@ public abstract class AbstractCompactionTask { public void start() { currentTaskNum.incrementAndGet(); boolean isSuccess = false; + CompactionMetricsManager.getInstance().reportTaskStartRunning(crossTask, innerSeqTask); try { summary.start(); doCompaction(); @@ -79,6 +83,8 @@ public abstract class AbstractCompactionTask { this.currentTaskNum.decrementAndGet(); summary.finish(isSuccess); CompactionTaskManager.getInstance().removeRunningTaskFuture(this); + CompactionMetricsManager.getInstance() + .reportTaskFinishOrAbort(crossTask, innerSeqTask, summary.getTimeCost()); } } @@ -150,4 +156,12 @@ public abstract class AbstractCompactionTask { } protected abstract void createSummary(); + + public boolean isCrossTask() { + return crossTask; + } + + public boolean isInnerSeqTask() { + return innerSeqTask; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java index a87cdd3d29..59efe6c880 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java @@ -34,7 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; import org.apache.iotdb.db.rescon.SystemInfo; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -90,6 +90,8 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { this.performer = performer; this.hashCode = this.toString().hashCode(); this.memoryCost = memoryCost; + this.crossTask = true; + this.innerSeqTask = false; createSummary(); } @@ -234,7 +236,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { TsFileMetricManager.getInstance() .deleteFile(unsequenceFileSize, false, selectedUnsequenceFiles.size()); - CompactionMetricsRecorder.updateSummary(summary); + CompactionMetricsManager.getInstance().updateSummary(summary); long costTime = (System.currentTimeMillis() - startTime) / 1000; LOGGER.info( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java index a5c4b9fbd5..c1bb3eb505 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList; import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; @@ -95,6 +95,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { tsFileResourceList = tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition); } this.hashCode = this.toString().hashCode(); + this.innerSeqTask = sequence; + this.crossTask = false; collectSelectedFilesInfo(); createSummary(); } @@ -246,7 +248,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { TsFileMetricManager.getInstance() .deleteFile(totalSizeOfDeletedFile, sequence, selectedTsFileResourceList.size()); - CompactionMetricsRecorder.updateSummary(summary); + CompactionMetricsManager.getInstance().updateSummary(summary); double costTime = (System.currentTimeMillis() - startTime) / 1000.0d; LOGGER.info( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index af90e81cd2..9e292a45af 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -26,7 +26,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -145,7 +145,7 @@ public class AlignedSeriesCompactionExecutor { readerIterator.nextReader(); summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum()); summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum()); - CompactionMetricsRecorder.recordReadInfo(nextAlignedChunkInfo.getTotalSize()); + CompactionMetricsManager.getInstance().recordReadInfo(nextAlignedChunkInfo.getTotalSize()); compactOneAlignedChunk( nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum()); } @@ -154,11 +154,12 @@ public class AlignedSeriesCompactionExecutor { if (remainingPointInChunkWriter != 0L) { CompactionTaskManager.mergeRateLimiterAcquire( rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetricsRecorder.recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - true, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionMetricsManager.getInstance() + .recordWriteInfo( + CompactionType.INNER_SEQ_COMPACTION, + ProcessChunkType.DESERIALIZE_CHUNK, + true, + chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(writer); } writer.checkMetadataSizeAndMayFlush(); @@ -199,11 +200,12 @@ public class AlignedSeriesCompactionExecutor { || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * schemaList.size()) { CompactionTaskManager.mergeRateLimiterAcquire( rateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetricsRecorder.recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - true, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionMetricsManager.getInstance() + .recordWriteInfo( + CompactionType.INNER_SEQ_COMPACTION, + ProcessChunkType.DESERIALIZE_CHUNK, + true, + chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(writer); remainingPointInChunkWriter = 0L; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index d5184dcea4..d8d3d71172 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -25,7 +25,7 @@ import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -128,9 +128,10 @@ public class SingleSeriesCompactionExecutor { if (this.chunkWriter == null) { constructChunkWriterFromReadChunk(currentChunk); } - CompactionMetricsRecorder.recordReadInfo( - (long) currentChunk.getHeader().getSerializedSize() - + currentChunk.getHeader().getDataSize()); + CompactionMetricsManager.getInstance() + .recordReadInfo( + (long) currentChunk.getHeader().getSerializedSize() + + currentChunk.getHeader().getDataSize()); // if this chunk is modified, deserialize it into points if (chunkMetadata.getDeleteIntervalList() != null) { @@ -321,11 +322,12 @@ public class SingleSeriesCompactionExecutor { if (chunkMetadata.getEndTime() > maxEndTimestamp) { maxEndTimestamp = chunkMetadata.getEndTime(); } - CompactionMetricsRecorder.recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK, - false, - getChunkSize(chunk)); + CompactionMetricsManager.getInstance() + .recordWriteInfo( + CompactionType.INNER_SEQ_COMPACTION, + isCachedChunk ? ProcessChunkType.MERGE_CHUNK : ProcessChunkType.FLUSH_CHUNK, + false, + getChunkSize(chunk)); fileWriter.writeChunk(chunk, chunkMetadata); } @@ -334,11 +336,12 @@ public class SingleSeriesCompactionExecutor { || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) { CompactionTaskManager.mergeRateLimiterAcquire( compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetricsRecorder.recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - false, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionMetricsManager.getInstance() + .recordWriteInfo( + CompactionType.INNER_SEQ_COMPACTION, + ProcessChunkType.DESERIALIZE_CHUNK, + false, + chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(fileWriter); pointCountInChunkWriter = 0L; } @@ -356,11 +359,12 @@ public class SingleSeriesCompactionExecutor { private void flushChunkWriter() throws IOException { CompactionTaskManager.mergeRateLimiterAcquire( compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize()); - CompactionMetricsRecorder.recordWriteInfo( - CompactionType.INNER_SEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - false, - chunkWriter.estimateMaxSeriesMemSize()); + CompactionMetricsManager.getInstance() + .recordWriteInfo( + CompactionType.INNER_SEQ_COMPACTION, + ProcessChunkType.DESERIALIZE_CHUNK, + false, + chunkWriter.estimateMaxSeriesMemSize()); chunkWriter.writeToFileWriter(fileWriter); pointCountInChunkWriter = 0L; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java index 92ff3008e5..553167b1c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -22,7 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.iotdb.tsfile.exception.write.PageException; import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -296,11 +296,14 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { sealChunk(fileWriter, iChunkWriter, subTaskId); lastCheckIndex = 0; - CompactionMetricsRecorder.recordWriteInfo( - isCrossSpace ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, - ProcessChunkType.DESERIALIZE_CHUNK, - isAlign, - iChunkWriter.estimateMaxSeriesMemSize()); + CompactionMetricsManager.getInstance() + .recordWriteInfo( + isCrossSpace + ? CompactionType.CROSS_COMPACTION + : CompactionType.INNER_UNSEQ_COMPACTION, + ProcessChunkType.DESERIALIZE_CHUNK, + isAlign, + iChunkWriter.estimateMaxSeriesMemSize()); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java index 0e51e046a1..76bf9db322 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java @@ -31,8 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import com.google.common.util.concurrent.RateLimiter; @@ -97,8 +96,8 @@ public class CompactionTaskManager implements IService { AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles); candidateCompactionTaskQueue.regsitPollLastHook( x -> - CompactionMetricsRecorder.recordTaskInfo( - x, CompactionTaskStatus.POLL_FROM_QUEUE, candidateCompactionTaskQueue.size())); + CompactionMetricsManager.getInstance() + .reportPollTaskFromWaitingQueue(x.isCrossTask(), x.isInnerSeqTask())); init = true; } logger.info("Compaction task manager started."); @@ -225,8 +224,9 @@ public class CompactionTaskManager implements IService { candidateCompactionTaskQueue.put(compactionTask); // add metrics - CompactionMetricsRecorder.recordTaskInfo( - compactionTask, CompactionTaskStatus.ADD_TO_QUEUE, candidateCompactionTaskQueue.size()); + CompactionMetricsManager.getInstance() + .reportAddTaskToWaitingQueue( + compactionTask.isCrossTask(), compactionTask.isInnerSeqTask()); return true; } @@ -272,9 +272,6 @@ public class CompactionTaskManager implements IService { if (storageGroupTasks.containsKey(regionWithSG)) { storageGroupTasks.get(regionWithSG).remove(task); } - // add metrics - CompactionMetricsRecorder.recordTaskInfo( - task, CompactionTaskStatus.FINISHED, currentTaskNum.get()); finishedTaskNum.incrementAndGet(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java index 18e7875349..b38f247f01 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionWorker.java @@ -20,8 +20,7 @@ package org.apache.iotdb.db.engine.compaction.schedule; import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus; -import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsRecorder; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import org.jetbrains.annotations.NotNull; @@ -55,10 +54,10 @@ public class CompactionWorker implements Runnable { log.warn("CompactionThread-{} terminates because interruption", threadId); return; } + CompactionMetricsManager.getInstance() + .reportPollTaskFromWaitingQueue(task.isCrossTask(), task.isInnerSeqTask()); if (task != null) { // add metrics - CompactionMetricsRecorder.recordTaskInfo( - task, CompactionTaskStatus.POLL_FROM_QUEUE, compactionTaskQueue.size()); if (task.checkValidAndSetMerging()) { CompactionTaskSummary summary = task.getSummary(); CompactionTaskFuture future = new CompactionTaskFuture(summary); diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java new file mode 100644 index 0000000000..6ef7ad973a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/CompactionMetrics.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with COMPACTION_METRICS_MANAGER work for additional information + * regarding copyright ownership. The ASF licenses COMPACTION_METRICS_MANAGER file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use COMPACTION_METRICS_MANAGER file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.service.metrics; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +public class CompactionMetrics implements IMetricSet { + private final CompactionMetricsManager COMPACTION_METRICS_MANAGER = + CompactionMetricsManager.getInstance(); + + @Override + public void bindTo(AbstractMetricService metricService) { + bindTaskInfo(metricService); + bindPerformanceInfo(metricService); + } + + @Override + public void unbindFrom(AbstractMetricService metricService) { + unbindTaskInfo(metricService); + unbindPerformanceInfo(metricService); + } + + private void bindTaskInfo(AbstractMetricService metricService) { + metricService.createAutoGauge( + Metric.QUEUE.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getWaitingCrossCompactionTaskNum, + Tag.NAME.toString(), + "compaction_cross", + Tag.STATUS.toString(), + "waiting"); + metricService.createAutoGauge( + Metric.QUEUE.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getWaitingSeqInnerCompactionTaskNum, + Tag.NAME.toString(), + "compaction_inner_seq", + Tag.STATUS.toString(), + "waiting"); + metricService.createAutoGauge( + Metric.QUEUE.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getWaitingUnseqInnerCompactionTaskNum, + Tag.NAME.toString(), + "compaction_inner_unseq", + Tag.STATUS.toString(), + "waiting"); + metricService.createAutoGauge( + Metric.QUEUE.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getRunningCrossCompactionTaskNum, + Tag.NAME.toString(), + "compaction_cross", + Tag.STATUS.toString(), + "running"); + metricService.createAutoGauge( + Metric.QUEUE.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getRunningSeqInnerCompactionTaskNum, + Tag.NAME.toString(), + "compaction_inner_seq", + Tag.STATUS.toString(), + "running"); + metricService.createAutoGauge( + Metric.QUEUE.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getRunningUnseqInnerCompactionTaskNum, + Tag.NAME.toString(), + "compaction_inner_unseq", + Tag.STATUS.toString(), + "running"); + metricService.createAutoGauge( + Metric.COMPACTION_TASK_COUNT.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getFinishSeqInnerCompactionTaskNum, + Tag.NAME.toString(), + "inner_seq"); + metricService.createAutoGauge( + Metric.COMPACTION_TASK_COUNT.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getFinishUnseqInnerCompactionTaskNum, + Tag.NAME.toString(), + "inner_unseq"); + metricService.createAutoGauge( + Metric.COMPACTION_TASK_COUNT.toString(), + MetricLevel.IMPORTANT, + COMPACTION_METRICS_MANAGER, + CompactionMetricsManager::getFinishCrossCompactionTaskNum, + Tag.NAME.toString(), + "cross"); + metricService.getOrCreateTimer( + Metric.COST_TASK.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "inner_seq_compaction"); + metricService.getOrCreateTimer( + Metric.COST_TASK.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "inner_unseq_compaction"); + metricService.getOrCreateTimer( + Metric.COST_TASK.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "cross_compaction"); + } + + private void unbindTaskInfo(AbstractMetricService metricService) { + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.QUEUE.toString(), + Tag.NAME.toString(), + "compaction_cross", + Tag.STATUS.toString(), + "waiting"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.QUEUE.toString(), + Tag.NAME.toString(), + "compaction_inner_seq", + Tag.STATUS.toString(), + "waiting"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.QUEUE.toString(), + Tag.NAME.toString(), + "compaction_inner_unseq", + Tag.STATUS.toString(), + "waiting"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.QUEUE.toString(), + Tag.NAME.toString(), + "compaction_cross", + Tag.STATUS.toString(), + "running"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.QUEUE.toString(), + Tag.NAME.toString(), + "compaction_inner_seq", + Tag.STATUS.toString(), + "running"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.QUEUE.toString(), + Tag.NAME.toString(), + "compaction_inner_unseq", + Tag.STATUS.toString(), + "running"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.COMPACTION_TASK_COUNT.toString(), + Tag.NAME.toString(), + "inner_seq"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.COMPACTION_TASK_COUNT.toString(), + Tag.NAME.toString(), + "inner_unseq"); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.COMPACTION_TASK_COUNT.toString(), + Tag.NAME.toString(), + "cross"); + metricService.remove( + MetricType.TIMER, Metric.COST_TASK.toString(), Tag.NAME.toString(), "inner_seq_compaction"); + metricService.remove( + MetricType.TIMER, + Metric.COST_TASK.toString(), + Tag.NAME.toString(), + "inner_unseq_compaction"); + metricService.remove( + MetricType.TIMER, Metric.COST_TASK.toString(), Tag.NAME.toString(), "cross_compaction"); + } + + private void bindPerformanceInfo(AbstractMetricService metricService) { + metricService.getOrCreateCounter( + "Compacted_Point_Num", MetricLevel.IMPORTANT, Tag.NAME.toString(), "compaction"); + metricService.getOrCreateCounter( + "Compacted_Chunk_Num", MetricLevel.IMPORTANT, Tag.NAME.toString(), "compaction"); + metricService.getOrCreateCounter( + "Directly_Flush_Chunk_Num", MetricLevel.NORMAL, Tag.NAME.toString(), "compaction"); + metricService.getOrCreateCounter( + "Deserialized_Chunk_Num", MetricLevel.NORMAL, Tag.NAME.toString(), "compaction"); + metricService.getOrCreateCounter( + "Merged_Chunk_Num", MetricLevel.NORMAL, Tag.NAME.toString(), "compaction"); + } + + private void unbindPerformanceInfo(AbstractMetricService metricService) { + metricService.remove( + MetricType.COUNTER, "Compacted_Point_Num", Tag.NAME.toString(), "compaction"); + metricService.remove( + MetricType.COUNTER, "Compacted_Chunk_Num", Tag.NAME.toString(), "compaction"); + metricService.remove( + MetricType.COUNTER, "Directly_Flush_Chunk_Num", Tag.NAME.toString(), "compaction"); + metricService.remove( + MetricType.COUNTER, "Deserialized_Chunk_Num", Tag.NAME.toString(), "compaction"); + metricService.remove(MetricType.COUNTER, "Merged_Chunk_Num", Tag.NAME.toString(), "compaction"); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java index 1674b22eb4..7efd413183 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java @@ -29,6 +29,7 @@ public class DataNodeMetricsHelper { MetricService.getInstance().addMetricSet(new JvmMetrics()); MetricService.getInstance().addMetricSet(new LogbackMetrics()); MetricService.getInstance().addMetricSet(new FileMetrics()); + MetricService.getInstance().addMetricSet(new CompactionMetrics()); MetricService.getInstance().addMetricSet(new ProcessMetrics()); MetricService.getInstance().addMetricSet(new SystemMetrics(true)); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java new file mode 100644 index 0000000000..6f531373aa --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsManager.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.service.metrics.recorder; + +import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; +import org.apache.iotdb.metrics.utils.MetricLevel; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class CompactionMetricsManager { + private static final CompactionMetricsManager INSTANCE = new CompactionMetricsManager(); + private final AtomicInteger waitingSeqInnerCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger waitingUnseqInnerCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger waitingCrossCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger runningSeqInnerCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger runningUnseqInnerCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger runningCrossCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger finishSeqInnerCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger finishUnseqInnerCompactionTaskNum = new AtomicInteger(0); + private final AtomicInteger finishCrossCompactionTaskNum = new AtomicInteger(0); + + private CompactionMetricsManager() {} + + public static CompactionMetricsManager getInstance() { + return INSTANCE; + } + + public void recordWriteInfo( + CompactionType compactionType, + ProcessChunkType processChunkType, + boolean aligned, + long byteNum) { + MetricService.getInstance() + .count( + byteNum / 1024L, + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "compaction_" + compactionType.toString(), + Tag.TYPE.toString(), + (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString()); + MetricService.getInstance() + .count( + byteNum / 1024L, + Metric.DATA_WRITTEN.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "compaction", + Tag.TYPE.toString(), + "total"); + } + + public void recordReadInfo(long byteNum) { + MetricService.getInstance() + .count( + byteNum, + Metric.DATA_READ.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "compaction"); + } + + public void updateSummary(CompactionTaskSummary summary) { + MetricService.getInstance() + .count( + summary.getProcessPointNum(), + "Compacted_Point_Num", + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "compaction"); + MetricService.getInstance() + .count( + summary.getProcessChunkNum(), + "Compacted_Chunk_Num", + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "compaction"); + MetricService.getInstance() + .count( + summary.getDirectlyFlushChunkNum(), + "Directly_Flush_Chunk_Num", + MetricLevel.NORMAL, + Tag.NAME.toString(), + "compaction"); + MetricService.getInstance() + .count( + summary.getDeserializeChunkCount(), + "Deserialized_Chunk_Num", + MetricLevel.NORMAL, + Tag.NAME.toString(), + "compaction"); + MetricService.getInstance() + .count( + summary.getMergedChunkNum(), + "Merged_Chunk_Num", + MetricLevel.NORMAL, + Tag.NAME.toString(), + "compaction"); + } + + public void reportAddTaskToWaitingQueue(boolean isCrossTask, boolean isSeq) { + if (isCrossTask) { + waitingCrossCompactionTaskNum.incrementAndGet(); + } else if (isSeq) { + waitingSeqInnerCompactionTaskNum.incrementAndGet(); + } else { + waitingUnseqInnerCompactionTaskNum.incrementAndGet(); + } + } + + public void reportPollTaskFromWaitingQueue(boolean isCrossTask, boolean isSeq) { + if (isCrossTask) { + waitingCrossCompactionTaskNum.decrementAndGet(); + } else if (isSeq) { + waitingSeqInnerCompactionTaskNum.decrementAndGet(); + } else { + waitingUnseqInnerCompactionTaskNum.decrementAndGet(); + } + } + + public void reportTaskStartRunning(boolean isCrossTask, boolean isSeq) { + if (isCrossTask) { + runningCrossCompactionTaskNum.incrementAndGet(); + } else if (isSeq) { + runningSeqInnerCompactionTaskNum.incrementAndGet(); + } else { + runningUnseqInnerCompactionTaskNum.incrementAndGet(); + } + } + + public void reportTaskFinishOrAbort(boolean isCrossTask, boolean isSeq, long timeCost) { + if (isCrossTask) { + runningCrossCompactionTaskNum.decrementAndGet(); + finishCrossCompactionTaskNum.incrementAndGet(); + MetricService.getInstance() + .timer( + timeCost, + TimeUnit.MILLISECONDS, + Metric.COST_TASK.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "cross_compaction"); + } else if (isSeq) { + runningSeqInnerCompactionTaskNum.decrementAndGet(); + finishSeqInnerCompactionTaskNum.incrementAndGet(); + MetricService.getInstance() + .timer( + timeCost, + TimeUnit.MILLISECONDS, + Metric.COST_TASK.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "inner_seq_compaction"); + } else { + runningUnseqInnerCompactionTaskNum.decrementAndGet(); + finishUnseqInnerCompactionTaskNum.incrementAndGet(); + MetricService.getInstance() + .timer( + timeCost, + TimeUnit.MILLISECONDS, + Metric.COST_TASK.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + "inner_unseq_compaction"); + } + } + + public int getWaitingSeqInnerCompactionTaskNum() { + return waitingSeqInnerCompactionTaskNum.get(); + } + + public int getWaitingUnseqInnerCompactionTaskNum() { + return waitingUnseqInnerCompactionTaskNum.get(); + } + + public int getWaitingCrossCompactionTaskNum() { + return waitingCrossCompactionTaskNum.get(); + } + + public int getRunningSeqInnerCompactionTaskNum() { + return runningSeqInnerCompactionTaskNum.get(); + } + + public int getRunningUnseqInnerCompactionTaskNum() { + return runningUnseqInnerCompactionTaskNum.get(); + } + + public int getRunningCrossCompactionTaskNum() { + return runningCrossCompactionTaskNum.get(); + } + + public int getFinishSeqInnerCompactionTaskNum() { + return finishSeqInnerCompactionTaskNum.get(); + } + + public int getFinishUnseqInnerCompactionTaskNum() { + return finishUnseqInnerCompactionTaskNum.get(); + } + + public int getFinishCrossCompactionTaskNum() { + return finishCrossCompactionTaskNum.get(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java deleted file mode 100644 index 568fb76fb0..0000000000 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/recorder/CompactionMetricsRecorder.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.service.metrics.recorder; - -import org.apache.iotdb.commons.service.metric.MetricService; -import org.apache.iotdb.commons.service.metric.enums.Metric; -import org.apache.iotdb.commons.service.metric.enums.Tag; -import org.apache.iotdb.db.engine.compaction.execute.task.AbstractCompactionTask; -import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; -import org.apache.iotdb.db.engine.compaction.execute.task.CrossSpaceCompactionTask; -import org.apache.iotdb.db.engine.compaction.execute.task.InnerSpaceCompactionTask; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionTaskStatus; -import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; -import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; -import org.apache.iotdb.metrics.utils.MetricLevel; - -import java.util.concurrent.TimeUnit; - -public class CompactionMetricsRecorder { - - public static void recordWriteInfo( - CompactionType compactionType, - ProcessChunkType processChunkType, - boolean aligned, - long byteNum) { - MetricService.getInstance() - .count( - byteNum / 1024L, - Metric.DATA_WRITTEN.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction_" + compactionType.toString(), - Tag.TYPE.toString(), - (aligned ? "ALIGNED" : "NOT_ALIGNED") + "_" + processChunkType.toString()); - MetricService.getInstance() - .count( - byteNum / 1024L, - Metric.DATA_WRITTEN.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction", - Tag.TYPE.toString(), - "total"); - } - - public static void recordReadInfo(long byteNum) { - MetricService.getInstance() - .count( - byteNum, - Metric.DATA_READ.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction"); - } - - public static void updateSummary(CompactionTaskSummary summary) { - MetricService.getInstance() - .count( - summary.getProcessPointNum(), - "Compacted_Point_Num", - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction"); - MetricService.getInstance() - .count( - summary.getProcessChunkNum(), - "Compacted_Chunk_Num", - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction"); - MetricService.getInstance() - .count( - summary.getDirectlyFlushChunkNum(), - "Directly_Flush_Chunk_Num", - MetricLevel.NORMAL, - Tag.NAME.toString(), - "compaction"); - MetricService.getInstance() - .count( - summary.getDeserializeChunkCount(), - "Deserialized_Chunk_Num", - MetricLevel.NORMAL, - Tag.NAME.toString(), - "compaction"); - MetricService.getInstance() - .count( - summary.getMergedChunkNum(), - "Merged_Chunk_Num", - MetricLevel.NORMAL, - Tag.NAME.toString(), - "compaction"); - } - - public static void recordTaskInfo( - AbstractCompactionTask task, CompactionTaskStatus status, int size) { - String taskType = "unknown"; - boolean isInnerTask = false; - if (task instanceof InnerSpaceCompactionTask) { - isInnerTask = true; - taskType = "inner"; - } else if (task instanceof CrossSpaceCompactionTask) { - taskType = "cross"; - } - - switch (status) { - case ADD_TO_QUEUE: - case POLL_FROM_QUEUE: - MetricService.getInstance() - .getOrCreateGauge( - Metric.QUEUE.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction_" + taskType, - Tag.STATUS.toString(), - "waiting") - .set(size); - break; - case READY_TO_EXECUTE: - MetricService.getInstance() - .getOrCreateGauge( - Metric.QUEUE.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction_" + taskType, - Tag.STATUS.toString(), - "running") - .set(size); - break; - case FINISHED: - MetricService.getInstance() - .getOrCreateGauge( - Metric.QUEUE.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "compaction_" + taskType, - Tag.STATUS.toString(), - "running") - .set(size); - MetricService.getInstance() - .timer( - task.getTimeCost(), - TimeUnit.MILLISECONDS, - Metric.COST_TASK.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - isInnerTask ? "inner_compaction" : "cross_compaction"); - if (isInnerTask) { - MetricService.getInstance() - .count( - 1, - Metric.COMPACTION_TASK_COUNT.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "inner_compaction", - Tag.TYPE.toString(), - ((InnerSpaceCompactionTask) task).isSequence() ? "sequence" : "unsequence"); - } else { - MetricService.getInstance() - .count( - 1, - Metric.COMPACTION_TASK_COUNT.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - "cross_compaction", - Tag.TYPE.toString(), - "cross"); - } - break; - default: - // do nothing - break; - } - } -}
