This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch active-load-metric-1.3.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1b439b1436b0a93009faec55cbda33236caa3bc8 Author: YC27 <[email protected]> AuthorDate: Fri Aug 30 18:19:35 2024 +0800 Active Load: Add metrics for file size and refactor the code (#13329) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit c5b8a59c477fe0bf39a3a8359752f295cdb449c9) --- .../db/service/metrics/DataNodeMetricsHelper.java | 6 +- .../load/active/ActiveLoadDirScanner.java | 39 ++++- .../load/active/ActiveLoadPendingQueue.java | 10 +- .../load/active/ActiveLoadTsFileLoader.java | 20 ++- .../load/metrics/ActiveLoadingFilesMetricsSet.java | 175 +++++++++++++-------- ...ava => ActiveLoadingFilesNumberMetricsSet.java} | 84 +++++----- .../metrics/ActiveLoadingFilesSizeMetricsSet.java | 69 ++++++++ .../iotdb/commons/service/metric/enums/Metric.java | 3 +- 8 files changed, 281 insertions(+), 125 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java index d9d86001b9a..17463aa357f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java @@ -39,7 +39,8 @@ import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; -import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet; +import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet; +import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet; import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics; @@ -98,7 +99,8 @@ public class DataNodeMetricsHelper { // bind load related metrics MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance()); - MetricService.getInstance().addMetricSet(ActiveLoadingFilesMetricsSet.getInstance()); + MetricService.getInstance().addMetricSet(ActiveLoadingFilesNumberMetricsSet.getInstance()); + MetricService.getInstance().addMetricSet(ActiveLoadingFilesSizeMetricsSet.getInstance()); } private static void initSystemMetrics() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java index 22375261e13..259ee7079c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java @@ -20,7 +20,8 @@ package org.apache.iotdb.db.storageengine.load.active; import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet; +import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet; +import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet; import org.apache.commons.io.FileUtils; import org.apache.tsfile.common.conf.TSFileConfig; @@ -113,13 +114,15 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService { } } } - // Hot reload active load listening dir for pipe data sync // Active load is always enabled for pipe data sync listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir()); // Create directories if not exists listeningDirs.forEach(this::createDirectoriesIfNotExists); + + ActiveLoadingFilesNumberMetricsSet.getInstance().updatePendingDirList(listeningDirs); + ActiveLoadingFilesSizeMetricsSet.getInstance().updatePendingDirList(listeningDirs); } catch (final Exception e) { LOGGER.warn( "Error occurred during hot reload active load dirs. " @@ -147,23 +150,45 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService { // Metrics public long countAndReportActiveListeningDirsFileNumber() { - final long[] fileCount = {0}; + long totalFileCount = 0; + long totalFileSize = 0; + try { - for (String dir : listeningDirs) { + for (final String dir : listeningDirs) { + final long[] fileCountInDir = {0}; + final long[] fileSizeInDir = {0}; + Files.walkFileTree( new File(dir).toPath(), new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - fileCount[0]++; + fileCountInDir[0]++; + try { + fileSizeInDir[0] += file.toFile().length(); + } catch (Exception e) { + LOGGER.debug("Failed to count active listening dirs file number.", e); + } return FileVisitResult.CONTINUE; } }); + + ActiveLoadingFilesNumberMetricsSet.getInstance() + .updatePendingFileCounterInDir(dir, fileCountInDir[0]); + ActiveLoadingFilesSizeMetricsSet.getInstance() + .updatePendingFileCounterInDir(dir, fileSizeInDir[0]); + + totalFileCount += fileCountInDir[0]; + totalFileSize += fileSizeInDir[0]; } - ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]); + + ActiveLoadingFilesNumberMetricsSet.getInstance() + .updateTotalPendingFileCounter(totalFileCount); + ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalPendingFileCounter(totalFileSize); } catch (final IOException e) { LOGGER.debug("Failed to count active listening dirs file number.", e); } - return fileCount[0]; + + return totalFileCount; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java index f04d846c6ca..6c2b2cd41f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.storageengine.load.active; -import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet; +import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet; import org.apache.tsfile.utils.Pair; @@ -39,7 +39,7 @@ public class ActiveLoadPendingQueue { if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) { pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe)); - ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(1); + ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1); return true; } return false; @@ -51,8 +51,8 @@ public class ActiveLoadPendingQueue { pendingFileSet.remove(pair.left); loadingFileSet.add(pair.left); - ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(1); - ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(-1); + ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1); + ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1); } return pair; } @@ -60,7 +60,7 @@ public class ActiveLoadPendingQueue { public synchronized void removeFromLoading(final String file) { loadingFileSet.remove(file); - ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1); + ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(-1); } public synchronized boolean isFilePendingOrLoading(final String file) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java index e237dc2077f..61678779772 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java @@ -34,7 +34,8 @@ import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; -import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet; +import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet; +import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.io.FileUtils; @@ -96,6 +97,9 @@ public class ActiveLoadTsFileLoader { e); } failDir.set(IOTDB_CONFIG.getLoadActiveListeningFailDir()); + + ActiveLoadingFilesSizeMetricsSet.getInstance().updateFailedDir(failDir.get()); + ActiveLoadingFilesNumberMetricsSet.getInstance().updateFailedDir(failDir.get()); } } } @@ -262,6 +266,8 @@ public class ActiveLoadTsFileLoader { // Metrics public long countAndReportFailedFileNumber() { final long[] fileCount = {0}; + final long[] fileSize = {0}; + try { initFailDirIfNecessary(); Files.walkFileTree( @@ -270,13 +276,21 @@ public class ActiveLoadTsFileLoader { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { fileCount[0]++; + try { + fileSize[0] += file.toFile().length(); + } catch (Exception e) { + LOGGER.debug("Failed to count failed files in fail directory.", e); + } return FileVisitResult.CONTINUE; } }); - ActiveLoadingFilesMetricsSet.getInstance().recordFailedFileCounter(fileCount[0]); + + ActiveLoadingFilesNumberMetricsSet.getInstance().updateTotalFailedFileCounter(fileCount[0]); + ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalFailedFileCounter(fileSize[0]); } catch (final IOException e) { - LOGGER.warn("Failed to count failed files in fail directory.", e); + LOGGER.debug("Failed to count failed files in fail directory.", e); } + return fileCount[0]; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java index aae166ef5d7..f93e8fc9299 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.load.metrics; -import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; @@ -28,86 +27,138 @@ import org.apache.iotdb.metrics.type.Counter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; -public class ActiveLoadingFilesMetricsSet implements IMetricSet { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - private static final ActiveLoadingFilesMetricsSet INSTANCE = new ActiveLoadingFilesMetricsSet(); +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReference; - public static final String PENDING = "pending"; - public static final String QUEUING = "queuing"; - public static final String LOADING = "loading"; - public static final String FAILED = "failed"; +public abstract class ActiveLoadingFilesMetricsSet implements IMetricSet { - private ActiveLoadingFilesMetricsSet() { - // empty construct + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadingFilesMetricsSet.class); + + protected static final String FAILED_PREFIX = "failed - "; + protected static final String PENDING_PREFIX = "pending - "; + + protected AtomicReference<AbstractMetricService> metricService = new AtomicReference<>(); + + private final AtomicReference<String> failedDir = new AtomicReference<>(); + private final Set<String> pendingDirs = new CopyOnWriteArraySet<>(); + + protected Counter totalFailedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + protected Counter totalPendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + protected Map<String, Counter> dir2PendingFileCounterMap = new ConcurrentHashMap<>(); + + public void updateTotalFailedFileCounter(final long number) { + totalFailedFileCounter.inc(number - totalFailedFileCounter.getCount()); + } + + public void updateTotalPendingFileCounter(final long number) { + totalPendingFileCounter.inc(number - totalPendingFileCounter.getCount()); + } + + public void updatePendingFileCounterInDir(final String dirName, final long number) { + final Counter counter = dir2PendingFileCounterMap.get(dirName); + if (counter == null) { + LOGGER.debug("Failed to update file counter, dir({}) does not exist", dirName); + return; + } + counter.inc(number - counter.getCount()); + } + + public void updatePendingDirList(final Set<String> givenListeningDirs) { + if (metricService.get() == null || Objects.equals(pendingDirs, givenListeningDirs)) { + return; + } + + pendingDirs.clear(); + pendingDirs.addAll(givenListeningDirs); + + unbindDir2PendingFileCounters(metricService.get()); + rebindDir2PendingFileCounters(); } - private Counter pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - private Counter queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - private Counter loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - private Counter failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + protected void unbindDir2PendingFileCounters(final AbstractMetricService metricService) { + dir2PendingFileCounterMap + .keySet() + .forEach( + dir -> + metricService.remove( + MetricType.COUNTER, + getMetricName(), + Tag.TYPE.toString(), + PENDING_PREFIX + dir)); + dir2PendingFileCounterMap.clear(); + } - public void recordPendingFileCounter(final long number) { - pendingFileCounter.inc(number - pendingFileCounter.getCount()); + private void rebindDir2PendingFileCounters() { + dir2PendingFileCounterMap.clear(); + if (!pendingDirs.isEmpty()) { + for (String dir : pendingDirs) { + dir2PendingFileCounterMap.put( + dir, + metricService + .get() + .getOrCreateCounter( + getMetricName(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + PENDING_PREFIX + dir)); + } + } } - public void recordQueuingFileCounter(final long number) { - queuingFileCounter.inc(number); + public void updateFailedDir(final String dirName) { + if (metricService.get() == null || Objects.equals(failedDir.get(), dirName)) { + return; + } + + failedDir.set(dirName); + + unbindFailedDirCounter(metricService.get()); + rebindFailedDirCounter(); } - public void recordLoadingFileCounter(final long number) { - loadingFileCounter.inc(number); + protected void unbindFailedDirCounter(final AbstractMetricService metricService) { + totalFailedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + metricService.remove( + MetricType.COUNTER, getMetricName(), Tag.TYPE.toString(), FAILED_PREFIX + failedDir.get()); } - public void recordFailedFileCounter(final long number) { - failedFileCounter.inc(number - failedFileCounter.getCount()); + private void rebindFailedDirCounter() { + totalFailedFileCounter = + metricService + .get() + .getOrCreateCounter( + getMetricName(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + FAILED_PREFIX + failedDir.get()); } @Override public void bindTo(final AbstractMetricService metricService) { - pendingFileCounter = - metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), - MetricLevel.IMPORTANT, - Tag.TYPE.toString(), - PENDING); - queuingFileCounter = - metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), - MetricLevel.IMPORTANT, - Tag.TYPE.toString(), - QUEUING); - loadingFileCounter = - metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), - MetricLevel.IMPORTANT, - Tag.TYPE.toString(), - LOADING); - failedFileCounter = - metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), - MetricLevel.IMPORTANT, - Tag.TYPE.toString(), - FAILED); + this.metricService.set(metricService); + + // Dir2PendingFileCounters' binding is triggered by updatePendingDirList + // FailedDirCounter's binding is triggered by updateFailedDir + bindOtherCounters(metricService); } + protected abstract void bindOtherCounters(final AbstractMetricService metricService); + @Override public void unbindFrom(final AbstractMetricService metricService) { - pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - - metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), PENDING); - metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), QUEUING); - metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), LOADING); - metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), FAILED); + unbindDir2PendingFileCounters(metricService); + unbindFailedDirCounter(metricService); + unbindOtherCounters(metricService); } - public static ActiveLoadingFilesMetricsSet getInstance() { - return ActiveLoadingFilesMetricsSet.INSTANCE; - } + protected abstract void unbindOtherCounters(final AbstractMetricService metricService); + + protected abstract String getMetricName(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java similarity index 50% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java index aae166ef5d7..11105ed14c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesMetricsSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesNumberMetricsSet.java @@ -23,91 +23,85 @@ import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; -import org.apache.iotdb.metrics.metricsets.IMetricSet; import org.apache.iotdb.metrics.type.Counter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; -public class ActiveLoadingFilesMetricsSet implements IMetricSet { +public class ActiveLoadingFilesNumberMetricsSet extends ActiveLoadingFilesMetricsSet { - private static final ActiveLoadingFilesMetricsSet INSTANCE = new ActiveLoadingFilesMetricsSet(); + private static final String PENDING = "pending"; + private static final String QUEUING = "queuing"; + private static final String LOADING = "loading"; - public static final String PENDING = "pending"; - public static final String QUEUING = "queuing"; - public static final String LOADING = "loading"; - public static final String FAILED = "failed"; - - private ActiveLoadingFilesMetricsSet() { - // empty construct - } - - private Counter pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private Counter queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private Counter loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - private Counter failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - - public void recordPendingFileCounter(final long number) { - pendingFileCounter.inc(number - pendingFileCounter.getCount()); - } - public void recordQueuingFileCounter(final long number) { + public void increaseQueuingFileCounter(final long number) { queuingFileCounter.inc(number); } - public void recordLoadingFileCounter(final long number) { + public void increaseLoadingFileCounter(final long number) { loadingFileCounter.inc(number); } - public void recordFailedFileCounter(final long number) { - failedFileCounter.inc(number - failedFileCounter.getCount()); - } - @Override - public void bindTo(final AbstractMetricService metricService) { - pendingFileCounter = + protected void bindOtherCounters(final AbstractMetricService metricService) { + totalPendingFileCounter = metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), + Metric.ACTIVE_LOADING_FILES_NUMBER.toString(), MetricLevel.IMPORTANT, Tag.TYPE.toString(), PENDING); queuingFileCounter = metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), + Metric.ACTIVE_LOADING_FILES_NUMBER.toString(), MetricLevel.IMPORTANT, Tag.TYPE.toString(), QUEUING); loadingFileCounter = metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), + Metric.ACTIVE_LOADING_FILES_NUMBER.toString(), MetricLevel.IMPORTANT, Tag.TYPE.toString(), LOADING); - failedFileCounter = - metricService.getOrCreateCounter( - Metric.ACTIVE_LOADING_FILES.toString(), - MetricLevel.IMPORTANT, - Tag.TYPE.toString(), - FAILED); } @Override - public void unbindFrom(final AbstractMetricService metricService) { - pendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + protected void unbindOtherCounters(final AbstractMetricService metricService) { + totalPendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; queuingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; loadingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; - failedFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), PENDING); - metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), QUEUING); + MetricType.COUNTER, + Metric.ACTIVE_LOADING_FILES_NUMBER.toString(), + Tag.TYPE.toString(), + PENDING); metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), LOADING); + MetricType.COUNTER, + Metric.ACTIVE_LOADING_FILES_NUMBER.toString(), + Tag.TYPE.toString(), + QUEUING); metricService.remove( - MetricType.COUNTER, Metric.ACTIVE_LOADING_FILES.toString(), Tag.TYPE.toString(), FAILED); + MetricType.COUNTER, + Metric.ACTIVE_LOADING_FILES_NUMBER.toString(), + Tag.TYPE.toString(), + LOADING); + } + + @Override + protected String getMetricName() { + return Metric.ACTIVE_LOADING_FILES_NUMBER.toString(); } - public static ActiveLoadingFilesMetricsSet getInstance() { - return ActiveLoadingFilesMetricsSet.INSTANCE; + public static ActiveLoadingFilesNumberMetricsSet getInstance() { + return ActiveLoadingFilesNumberMetricsSet.INSTANCE; + } + + private static final ActiveLoadingFilesNumberMetricsSet INSTANCE = + new ActiveLoadingFilesNumberMetricsSet(); + + private ActiveLoadingFilesNumberMetricsSet() { + // empty construct } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java new file mode 100644 index 00000000000..554918d981c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/ActiveLoadingFilesSizeMetricsSet.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.metrics; + +import org.apache.iotdb.commons.service.metric.enums.Metric; +import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.impl.DoNothingMetricManager; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.apache.iotdb.metrics.utils.MetricType; + +public class ActiveLoadingFilesSizeMetricsSet extends ActiveLoadingFilesMetricsSet { + + private static final String PENDING_SIZE = "pending (total)"; + + @Override + protected void bindOtherCounters(AbstractMetricService metricService) { + totalPendingFileCounter = + metricService.getOrCreateCounter( + Metric.ACTIVE_LOADING_FILES_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.TYPE.toString(), + PENDING_SIZE); + } + + @Override + protected void unbindOtherCounters(AbstractMetricService metricService) { + totalPendingFileCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + + metricService.remove( + MetricType.COUNTER, + Metric.ACTIVE_LOADING_FILES_SIZE.toString(), + Tag.TYPE.toString(), + PENDING_SIZE); + } + + @Override + protected String getMetricName() { + return Metric.ACTIVE_LOADING_FILES_SIZE.toString(); + } + + public static ActiveLoadingFilesSizeMetricsSet getInstance() { + return INSTANCE; + } + + private static final ActiveLoadingFilesSizeMetricsSet INSTANCE = + new ActiveLoadingFilesSizeMetricsSet(); + + private ActiveLoadingFilesSizeMetricsSet() { + // empty construct + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 707f46fa259..7c2f4ac1016 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -172,7 +172,8 @@ public enum Metric { SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"), SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"), // load related - ACTIVE_LOADING_FILES("active_loading_files"), + ACTIVE_LOADING_FILES_NUMBER("active_loading_files_number"), + ACTIVE_LOADING_FILES_SIZE("active_loading_files_size"), LOAD_MEM("load_mem"), LOAD_DISK_IO("load_disk_io"), LOAD_TIME_COST("load_time_cost"),
