This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch feature/disk-metric in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2217676d28aade3a5f14a5356461206d3352a437 Author: spricoder <[email protected]> AuthorDate: Fri Sep 22 16:57:21 2023 +0800 Add Gauge Map and simplify code --- .../iotdb/db/service/metrics/FileMetrics.java | 207 ++++++++++----------- 1 file changed, 99 insertions(+), 108 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java index f36431e4ec4..4aec979979f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/FileMetrics.java @@ -45,10 +45,10 @@ import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -67,10 +67,14 @@ public class FileMetrics implements IMetricSet { private static final String SEQUENCE = "sequence"; private static final String UNSEQUENCE = "unsequence"; private static final String LEVEL = "level"; - private final Map<String, Map<String, Long>> seqFileSizeGaugeMap = new ConcurrentHashMap<>(); - private final Map<String, Map<String, Long>> unseqFileSizeGaugeMap = new ConcurrentHashMap<>(); - private final Map<String, Map<String, Integer>> seqFileNumGaugeMap = new ConcurrentHashMap<>(); - private final Map<String, Map<String, Integer>> unseqFileNumGaugeMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Long>> seqFileSizeMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Long>> unseqFileSizeMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Integer>> seqFileNumMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Integer>> unseqFileNumMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Gauge>> seqFileSizeGaugeMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Gauge>> unseqFileSizeGaugeMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Gauge>> seqFileNumGaugeMap = new ConcurrentHashMap<>(); + private final Map<String, Map<String, Gauge>> unseqFileNumGaugeMap = new ConcurrentHashMap<>(); private final AtomicInteger modFileNum = new AtomicInteger(0); @@ -79,6 +83,10 @@ public class FileMetrics implements IMetricSet { private final Map<Integer, Integer> unseqLevelTsFileCountMap = new ConcurrentHashMap<>(); private final Map<Integer, Long> seqLevelTsFileSizeMap = new ConcurrentHashMap<>(); private final Map<Integer, Long> unseqLevelTsFileSizeMap = new ConcurrentHashMap<>(); + private final Map<Integer, Gauge> seqLevelCountGaugeMap = new ConcurrentHashMap<>(); + private final Map<Integer, Gauge> seqLevelSizeGaugeMap = new ConcurrentHashMap<>(); + private final Map<Integer, Gauge> unseqLevelCountGaugeMap = new ConcurrentHashMap<>(); + private final Map<Integer, Gauge> unseqLevelSizeGaugeMap = new ConcurrentHashMap<>(); private long lastUpdateTime = 0; // compaction temporal files @@ -88,11 +96,6 @@ public class FileMetrics implements IMetricSet { private final AtomicInteger innerSeqCompactionTempFileNum = new AtomicInteger(0); private final AtomicInteger innerUnseqCompactionTempFileNum = new AtomicInteger(0); private final AtomicInteger crossCompactionTempFileNum = new AtomicInteger(0); - private AtomicBoolean hasRemainData = new AtomicBoolean(false); - private final Map<Integer, Gauge> seqLevelCountGaugeMap = new ConcurrentHashMap<>(); - private final Map<Integer, Gauge> seqLevelSizeGaugeMap = new ConcurrentHashMap<>(); - private final Map<Integer, Gauge> unseqLevelCountGaugeMap = new ConcurrentHashMap<>(); - private final Map<Integer, Gauge> unseqLevelSizeGaugeMap = new ConcurrentHashMap<>(); @SuppressWarnings("squid:S1075") private String fileHandlerCntPathInLinux = "/proc/%s/fd"; @@ -129,7 +132,6 @@ public class FileMetrics implements IMetricSet { FileMetrics::getModFileNum, Tag.NAME.toString(), "mods"); - checkIfThereRemainingData(); } private void bindWalFileMetrics(AbstractMetricService metricService) { @@ -321,12 +323,12 @@ public class FileMetrics implements IMetricSet { String database, String regionId, long sizeDelta, int countDelta, boolean seq) { if (seq) { // update sequence file size - seqFileSizeGaugeMap.compute( + seqFileSizeMap.compute( database, (k, v) -> { long size = 0; if (v == null) { - v = new ConcurrentHashMap<>(); + v = new HashMap<>(); } else if (v.containsKey(regionId)) { size = v.get(regionId); } @@ -334,52 +336,26 @@ public class FileMetrics implements IMetricSet { return v; }); // update sequence file number - seqFileNumGaugeMap.compute( + seqFileNumMap.compute( database, (k, v) -> { int count = 0; if (v == null) { - v = new ConcurrentHashMap<>(); + v = new HashMap<>(); } else if (v.containsKey(regionId)) { count = v.get(regionId); } v.put(regionId, count + countDelta); return v; }); - if (metricService != null) { - // update sequence file size metric - metricService - .getOrCreateGauge( - Metric.FILE_SIZE.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - "seq", - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - regionId) - .set(seqFileSizeGaugeMap.get(database).get(regionId)); - // update sequence file number metric - metricService - .getOrCreateGauge( - Metric.FILE_COUNT.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - "seq", - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - regionId) - .set(seqFileNumGaugeMap.get(database).get(regionId)); - } } else { // update unsequence file size - unseqFileSizeGaugeMap.compute( + unseqFileSizeMap.compute( database, (k, v) -> { long size = 0; if (v == null) { - v = new ConcurrentHashMap<>(); + v = new HashMap<>(); } else if (v.containsKey(regionId)) { size = v.get(regionId); } @@ -387,12 +363,12 @@ public class FileMetrics implements IMetricSet { return v; }); // update unsequence file number - unseqFileNumGaugeMap.compute( + unseqFileNumMap.compute( database, (k, v) -> { int count = 0; if (v == null) { - v = new ConcurrentHashMap<>(); + v = new HashMap<>(); } else if (v.containsKey(regionId)) { count = v.get(regionId); } @@ -411,7 +387,7 @@ public class FileMetrics implements IMetricSet { database, Tag.REGION.toString(), regionId) - .set(unseqFileSizeGaugeMap.get(database).get(regionId)); + .set(unseqFileSizeMap.get(database).get(regionId)); // update unsequence file number metric metricService .getOrCreateGauge( @@ -423,11 +399,75 @@ public class FileMetrics implements IMetricSet { database, Tag.REGION.toString(), regionId) - .set(unseqFileNumGaugeMap.get(database).get(regionId)); + .set(unseqFileNumMap.get(database).get(regionId)); + } + } + if (metricService != null) { + if (seq) { + updateGlobalGauge( + database, + regionId, + seqFileSizeMap.get(database).get(regionId), + seqFileSizeGaugeMap, + SEQUENCE, + Metric.FILE_SIZE.toString()); + updateGlobalGauge( + database, + regionId, + seqFileNumMap.get(database).get(regionId), + seqFileNumGaugeMap, + SEQUENCE, + Metric.FILE_COUNT.toString()); + } else { + updateGlobalGauge( + database, + regionId, + unseqFileSizeMap.get(database).get(regionId), + unseqFileSizeGaugeMap, + UNSEQUENCE, + Metric.FILE_SIZE.toString()); + updateGlobalGauge( + database, + regionId, + unseqFileNumMap.get(database).get(regionId), + unseqFileNumGaugeMap, + UNSEQUENCE, + Metric.FILE_COUNT.toString()); } } } + private void updateGlobalGauge( + String database, + String regionId, + long value, + Map<String, Map<String, Gauge>> gaugeMap, + String orderStr, + String gaugeName) { + gaugeMap.compute( + database, + (k, v) -> { + if (v == null) { + v = new HashMap<>(); + } + if (!v.containsKey(regionId)) { + v.put( + regionId, + metricService.getOrCreateGauge( + gaugeName, + MetricLevel.CORE, + Tag.NAME.toString(), + orderStr, + Tag.DATABASE.toString(), + database, + Tag.REGION.toString(), + regionId)); + } + v.get(regionId).set(value); + return v; + }); + } + private void updateLevelCountAndSize(long sizeDelta, int countDelta, boolean seq, int level) { int count = 0; long totalSize = 0; @@ -443,70 +483,36 @@ public class FileMetrics implements IMetricSet { totalSize = unseqLevelTsFileSizeMap.compute(level, (k, v) -> v == null ? sizeDelta : v + sizeDelta); } - updateLevelFileInfoInMetricService(totalSize, count, seq, level); - } - - private void updateLevelFileInfoInMetricService( - long totalSize, int count, boolean seq, int level) { if (metricService != null) { - updateCountGauge( + updateLevelGauge( level, count, seq ? seqLevelCountGaugeMap : unseqLevelCountGaugeMap, - seq ? SEQUENCE : UNSEQUENCE); - updateSizeGauge( + seq ? SEQUENCE : UNSEQUENCE, + FILE_LEVEL_COUNT); + updateLevelGauge( level, totalSize, seq ? seqLevelSizeGaugeMap : unseqLevelSizeGaugeMap, - seq ? SEQUENCE : UNSEQUENCE); - checkIfThereRemainingData(); - } else { - // the metric service has not been set yet - hasRemainData.set(true); - } - } - - private void checkIfThereRemainingData() { - if (hasRemainData.get()) { - synchronized (this) { - if (hasRemainData.get()) { - hasRemainData.set(false); - updateRemainData(); - } - } + seq ? SEQUENCE : UNSEQUENCE, + FILE_LEVEL_SIZE); } } - private void updateCountGauge( - int level, int count, Map<Integer, Gauge> countGaugeMap, String orderStr) { - countGaugeMap - .computeIfAbsent( - level, - l -> - metricService.getOrCreateGauge( - FILE_LEVEL_COUNT, - MetricLevel.CORE, - Tag.TYPE.toString(), - orderStr, - LEVEL, - String.valueOf(level))) - .set(count); - } - - private void updateSizeGauge( - int level, long size, Map<Integer, Gauge> sizeGaugeMap, String orderStr) { - sizeGaugeMap + private void updateLevelGauge( + int level, long value, Map<Integer, Gauge> gaugeMap, String orderStr, String gaugeName) { + gaugeMap .computeIfAbsent( level, l -> metricService.getOrCreateGauge( - FILE_LEVEL_SIZE, + gaugeName, MetricLevel.CORE, Tag.TYPE.toString(), orderStr, LEVEL, String.valueOf(level))) - .set(size); + .set(value); } public void deleteFile(boolean seq, List<TsFileResource> tsFileResourceList) { @@ -525,25 +531,10 @@ public class FileMetrics implements IMetricSet { } } - private void updateRemainData() { - for (Map.Entry<Integer, Integer> entry : seqLevelTsFileCountMap.entrySet()) { - updateCountGauge(entry.getKey(), entry.getValue(), seqLevelCountGaugeMap, SEQUENCE); - } - for (Map.Entry<Integer, Long> entry : seqLevelTsFileSizeMap.entrySet()) { - updateSizeGauge(entry.getKey(), entry.getValue(), seqLevelSizeGaugeMap, SEQUENCE); - } - for (Map.Entry<Integer, Integer> entry : unseqLevelTsFileCountMap.entrySet()) { - updateCountGauge(entry.getKey(), entry.getValue(), unseqLevelCountGaugeMap, UNSEQUENCE); - } - for (Map.Entry<Integer, Long> entry : unseqLevelTsFileSizeMap.entrySet()) { - updateSizeGauge(entry.getKey(), entry.getValue(), unseqLevelSizeGaugeMap, UNSEQUENCE); - } - } - public long getFileNum(boolean seq) { long fileNum = 0; for (Map.Entry<String, Map<String, Integer>> entry : - (seq ? seqFileNumGaugeMap : unseqFileNumGaugeMap).entrySet()) { + (seq ? seqFileNumMap : unseqFileNumMap).entrySet()) { for (Map.Entry<String, Integer> regionEntry : entry.getValue().entrySet()) { fileNum += regionEntry.getValue(); }
