This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 88ffa48e320 Fix ThreadPoolMetrics concurrent NPE bug & Fix metric
leaks when frequently creating and deleting database (#14388)
88ffa48e320 is described below
commit 88ffa48e320740b6b8bbf77c5bf4348302fde3c4
Author: Potato <[email protected]>
AuthorDate: Thu Dec 12 18:18:18 2024 +0800
Fix ThreadPoolMetrics concurrent NPE bug & Fix metric leaks when frequently
creating and deleting database (#14388)
* fix concurrent bug
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix threadpoolmetric leak & points asyncreporter metric leak
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix compile
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix gile_global_count & dataregion mem leak
Signed-off-by: OneSizeFitQuorum <[email protected]>
---------
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../PipeDataNodeRemainingEventAndTimeOperator.java | 2 +-
.../db/service/metrics/file/TsFileMetrics.java | 33 +++++
.../db/storageengine/dataregion/DataRegion.java | 7 +-
.../dataregion/DataRegionMetrics.java | 14 +-
.../memtable/TsFileProcessorInfoMetrics.java | 8 +-
.../iotdb/metrics/core/IoTDBMetricManager.java | 2 +-
.../iotdb/metrics/AbstractMetricManager.java | 12 +-
.../iotdb/metrics/AbstractMetricService.java | 40 +++--
.../iotdb/metrics/impl/DoNothingMetricManager.java | 2 +-
.../commons/concurrent/ThreadPoolMetrics.java | 164 +++++++++------------
.../WrappedScheduledExecutorService.java | 2 +
.../WrappedSingleThreadExecutorService.java | 2 +
.../WrappedSingleThreadScheduledExecutor.java | 2 +
.../threadpool/WrappedThreadPoolExecutor.java | 2 +
14 files changed, 161 insertions(+), 131 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index 4194acc9a03..10e605bba08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -51,7 +51,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
private final AtomicReference<Meter> dataRegionCommitMeter = new
AtomicReference<>(null);
private final AtomicReference<Meter> schemaRegionCommitMeter = new
AtomicReference<>(null);
private final IoTDBHistogram collectInvocationHistogram =
- (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null);
+ (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram();
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java
index 7423f0afa03..5496a417b94 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -120,6 +121,10 @@ public class TsFileMetrics implements IMetricSet {
.forEach(map -> deleteRegionFromMap(map, database, regionId));
Arrays.asList(seqFileSizeMap, unseqFileSizeMap)
.forEach(map -> deleteRegionFromMap(map, database, regionId));
+ Arrays.asList(SEQUENCE, UNSEQUENCE)
+ .forEach(orderStr -> deleteGlobalTsFileCountGauge(orderStr, database,
regionId));
+ Arrays.asList(SEQUENCE, UNSEQUENCE)
+ .forEach(orderStr -> deleteGlobalTsFileSizeGauge(orderStr, database,
regionId));
}
private <T> void deleteRegionFromMap(
@@ -199,6 +204,20 @@ public class TsFileMetrics implements IMetricSet {
regionId);
}
+ public void deleteGlobalTsFileCountGauge(String orderStr, String database,
String regionId) {
+ metricService
+ .get()
+ .remove(
+ MetricType.GAUGE,
+ FILE_GLOBAL_COUNT,
+ Tag.NAME.toString(),
+ orderStr,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ regionId);
+ }
+
private void updateGlobalTsFileSizeMap(
Map<String, Map<String, Pair<Long, Gauge>>> map,
String orderStr,
@@ -246,6 +265,20 @@ public class TsFileMetrics implements IMetricSet {
regionId);
}
+ public void deleteGlobalTsFileSizeGauge(String orderStr, String database,
String regionId) {
+ metricService
+ .get()
+ .remove(
+ MetricType.GAUGE,
+ FILE_GLOBAL_SIZE,
+ Tag.NAME.toString(),
+ orderStr,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ regionId);
+ }
+
// endregion
// region update level tsfile value map and gauge map
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d583eafd9d9..165baa6bdaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -322,6 +322,8 @@ public class DataRegion implements IDataRegionForQuery {
PerformanceOverviewMetrics.getInstance();
private final ExecutorService upgradeModFileThreadPool;
+ private final DataRegionMetrics metrics;
+
/**
* Construct a database processor.
*
@@ -385,7 +387,8 @@ public class DataRegion implements IDataRegionForQuery {
recover();
}
- MetricService.getInstance().addMetricSet(new DataRegionMetrics(this));
+ this.metrics = new DataRegionMetrics(this);
+ MetricService.getInstance().addMetricSet(metrics);
}
@TestOnly
@@ -396,6 +399,7 @@ public class DataRegion implements IDataRegionForQuery {
this.partitionMaxFileVersions = new HashMap<>();
partitionMaxFileVersions.put(0L, 0L);
upgradeModFileThreadPool = null;
+ this.metrics = new DataRegionMetrics(this);
}
@Override
@@ -3691,6 +3695,7 @@ public class DataRegion implements IDataRegionForQuery {
deletedCondition.await();
}
FileMetrics.getInstance().deleteRegion(databaseName, dataRegionId);
+ MetricService.getInstance().removeMetricSet(metrics);
} catch (InterruptedException e) {
logger.error("Interrupted When waiting for data region deleted.");
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionMetrics.java
index 87915c17238..7fc0f6b8c8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionMetrics.java
@@ -29,12 +29,12 @@ import org.apache.iotdb.metrics.utils.MetricType;
import java.util.Objects;
public class DataRegionMetrics implements IMetricSet {
- private DataRegion dataRegion;
- private String storageGroupName;
+ private final DataRegion dataRegion;
+ private final String databaseName;
public DataRegionMetrics(DataRegion dataRegion) {
this.dataRegion = dataRegion;
- this.storageGroupName = dataRegion.getDatabaseName();
+ this.databaseName = dataRegion.getDatabaseName();
}
@Override
@@ -45,7 +45,7 @@ public class DataRegionMetrics implements IMetricSet {
dataRegion,
DataRegion::getMemCost,
Tag.NAME.toString(),
- "database_" + storageGroupName);
+ "database_" + databaseName);
}
@Override
@@ -54,7 +54,7 @@ public class DataRegionMetrics implements IMetricSet {
MetricType.AUTO_GAUGE,
Metric.MEM.toString(),
Tag.NAME.toString(),
- "database_" + storageGroupName);
+ "database_" + databaseName);
}
@Override
@@ -67,11 +67,11 @@ public class DataRegionMetrics implements IMetricSet {
}
DataRegionMetrics that = (DataRegionMetrics) o;
return Objects.equals(dataRegion, that.dataRegion)
- && Objects.equals(storageGroupName, that.storageGroupName);
+ && Objects.equals(databaseName, that.databaseName);
}
@Override
public int hashCode() {
- return Objects.hash(dataRegion, storageGroupName);
+ return Objects.hash(dataRegion, databaseName);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfoMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfoMetrics.java
index ff7ca4dc62c..fb106a9bac7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfoMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfoMetrics.java
@@ -27,12 +27,12 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
public class TsFileProcessorInfoMetrics implements IMetricSet {
- private final String storageGroupName;
+ private final String databaseName;
private final TsFileProcessorInfo tsFileProcessorInfo;
public TsFileProcessorInfoMetrics(
String storageGroupName, TsFileProcessorInfo tsFileProcessorInfo) {
- this.storageGroupName = storageGroupName;
+ this.databaseName = storageGroupName;
this.tsFileProcessorInfo = tsFileProcessorInfo;
}
@@ -45,7 +45,7 @@ public class TsFileProcessorInfoMetrics implements IMetricSet
{
tsFileProcessorInfo,
TsFileProcessorInfo::getMemCost,
Tag.NAME.toString(),
- "chunkMetaData_" + storageGroupName);
+ "chunkMetaData_" + databaseName);
}
@Override
@@ -55,6 +55,6 @@ public class TsFileProcessorInfoMetrics implements IMetricSet
{
MetricType.AUTO_GAUGE,
Metric.MEM.toString(),
Tag.NAME.toString(),
- "chunkMetaData_" + storageGroupName);
+ "chunkMetaData_" + databaseName);
}
}
diff --git
a/iotdb-core/metrics/core/src/main/java/org/apache/iotdb/metrics/core/IoTDBMetricManager.java
b/iotdb-core/metrics/core/src/main/java/org/apache/iotdb/metrics/core/IoTDBMetricManager.java
index f5197dce1ce..a4a7f376bbd 100644
---
a/iotdb-core/metrics/core/src/main/java/org/apache/iotdb/metrics/core/IoTDBMetricManager.java
+++
b/iotdb-core/metrics/core/src/main/java/org/apache/iotdb/metrics/core/IoTDBMetricManager.java
@@ -93,7 +93,7 @@ public class IoTDBMetricManager extends AbstractMetricManager
{
}
@Override
- public Histogram createHistogram(MetricInfo metricInfo) {
+ public Histogram createHistogram() {
// create distributionSummary
io.micrometer.core.instrument.DistributionSummary distributionSummary =
new CumulativeDistributionSummary(
diff --git
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricManager.java
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricManager.java
index 0d531a0fddd..fcbb8266251 100644
---
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricManager.java
+++
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricManager.java
@@ -252,7 +252,7 @@ public abstract class AbstractMetricManager {
metrics.computeIfAbsent(
metricInfo,
key -> {
- Histogram histogram = createHistogram(metricInfo);
+ Histogram histogram = createHistogram();
nameToMetaInfo.put(name, metricInfo.getMetaInfo());
notifyReporterOnAdd(histogram, metricInfo);
return histogram;
@@ -263,12 +263,8 @@ public abstract class AbstractMetricManager {
throw new IllegalArgumentException(metricInfo + ALREADY_EXISTS);
}
- /**
- * Create histogram according to metric framework.
- *
- * @param metricInfo the metricInfo of metric
- */
- protected abstract Histogram createHistogram(MetricInfo metricInfo);
+ /** Create histogram according to metric framework. */
+ protected abstract Histogram createHistogram();
/**
* Get timer. return if exists, create if not.
@@ -466,7 +462,7 @@ public abstract class AbstractMetricManager {
protected abstract boolean stopFramework();
- private boolean invalid(MetricLevel metricLevel, String name, String...
tags) {
+ public boolean invalid(MetricLevel metricLevel, String name, String... tags)
{
if (!isEnableMetricInGivenLevel(metricLevel)) {
return true;
}
diff --git
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
index 02ce273e5bc..bbb786a8133 100644
---
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
+++
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java
@@ -259,29 +259,45 @@ public abstract class AbstractMetricService {
/** Count with internal report. */
public void countWithInternalReportAsync(
long delta, String metric, MetricLevel metricLevel, long time, String...
tags) {
- internalReporter.writeMetricToIoTDB(
- metricManager.count(delta, metric, metricLevel, tags), metric, time,
tags);
+ if (metricManager.invalid(metricLevel, metric, tags)) {
+ return;
+ }
+ Counter counter = metricManager.createCounter();
+ counter.inc(delta);
+ internalReporter.writeMetricToIoTDB(counter, metric, time, tags);
}
/** Gauge value with internal report. */
public void gaugeWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, long time, String...
tags) {
- internalReporter.writeMetricToIoTDB(
- metricManager.gauge(value, metric, metricLevel, tags), metric, time,
tags);
+ if (metricManager.invalid(metricLevel, metric, tags)) {
+ return;
+ }
+ Gauge gauge = metricManager.createGauge();
+ gauge.set(value);
+ internalReporter.writeMetricToIoTDB(gauge, metric, time, tags);
}
/** Rate with internal report. */
public void rateWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, long time, String...
tags) {
- internalReporter.writeMetricToIoTDB(
- metricManager.rate(value, metric, metricLevel, tags), metric, time,
tags);
+ if (metricManager.invalid(metricLevel, metric, tags)) {
+ return;
+ }
+ Rate rate = metricManager.createRate();
+ rate.mark(value);
+ internalReporter.writeMetricToIoTDB(rate, metric, time, tags);
}
/** Histogram with internal report. */
public void histogramWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, long time, String...
tags) {
- internalReporter.writeMetricToIoTDB(
- metricManager.histogram(value, metric, metricLevel, tags), metric,
time, tags);
+ if (metricManager.invalid(metricLevel, metric, tags)) {
+ return;
+ }
+ Histogram histogram = metricManager.createHistogram();
+ histogram.update(value);
+ internalReporter.writeMetricToIoTDB(histogram, metric, time, tags);
}
/** Timer with internal report. */
@@ -292,8 +308,12 @@ public abstract class AbstractMetricService {
MetricLevel metricLevel,
long time,
String... tags) {
- internalReporter.writeMetricToIoTDB(
- metricManager.timer(delta, timeUnit, metric, metricLevel, tags),
metric, time, tags);
+ if (metricManager.invalid(metricLevel, metric, tags)) {
+ return;
+ }
+ Timer timer = metricManager.createTimer();
+ timer.update(delta, timeUnit);
+ internalReporter.writeMetricToIoTDB(timer, metric, time, tags);
}
public List<Pair<String, String[]>> getAllMetricKeys() {
diff --git
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/impl/DoNothingMetricManager.java
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/impl/DoNothingMetricManager.java
index d46b28a6d9c..d9521314da1 100644
---
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/impl/DoNothingMetricManager.java
+++
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/impl/DoNothingMetricManager.java
@@ -57,7 +57,7 @@ public class DoNothingMetricManager extends
AbstractMetricManager {
}
@Override
- public Histogram createHistogram(MetricInfo metricInfo) {
+ public Histogram createHistogram() {
return DO_NOTHING_HISTOGRAM;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java
index 67bc150664f..05b6c63b0a5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadPoolMetrics.java
@@ -32,9 +32,10 @@ import java.util.Map;
@SuppressWarnings("java:S6548")
public class ThreadPoolMetrics implements IMetricSet {
+
private AbstractMetricService metricService;
- private Map<String, IThreadPoolMBean> notRegisteredPoolMap = new HashMap<>();
- private Map<String, IThreadPoolMBean> registeredPoolMap = new HashMap<>();
+ private final Map<String, IThreadPoolMBean> notRegisteredPoolMap = new
HashMap<>();
+ private final Map<String, IThreadPoolMBean> registeredPoolMap = new
HashMap<>();
public static ThreadPoolMetrics getInstance() {
return ThreadPoolMetricsHolder.INSTANCE;
@@ -42,129 +43,96 @@ public class ThreadPoolMetrics implements IMetricSet {
private ThreadPoolMetrics() {}
- public void registerThreadPool(IThreadPoolMBean pool, String name) {
- synchronized (this) {
- if (metricService == null) {
- notRegisteredPoolMap.put(name, pool);
- } else {
- registeredPoolMap.put(name, pool);
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_ACTIVE_THREAD_COUNT.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> registeredPoolMap.get(name).getActiveCount(),
- SystemTag.POOL_NAME.toString(),
- name);
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_CORE_SIZE.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> registeredPoolMap.get(name).getCorePoolSize(),
- SystemTag.POOL_NAME.toString(),
- name);
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_WAITING_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> registeredPoolMap.get(name).getQueueLength(),
- SystemTag.POOL_NAME.toString(),
- name);
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_DONE_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> registeredPoolMap.get(name).getCompletedTaskCount(),
- SystemTag.POOL_NAME.toString(),
- name);
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_LARGEST_POOL_SIZE.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> registeredPoolMap.get(name).getLargestPoolSize(),
- SystemTag.POOL_NAME.toString(),
- name);
- }
- }
- }
-
- @Override
- public void bindTo(AbstractMetricService metricService) {
- synchronized (this) {
- this.metricService = metricService;
- for (Map.Entry<String, IThreadPoolMBean> entry :
notRegisteredPoolMap.entrySet()) {
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_ACTIVE_THREAD_COUNT.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> entry.getValue().getActiveCount(),
- SystemTag.POOL_NAME.toString(),
- entry.getKey());
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_CORE_SIZE.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> entry.getValue().getCorePoolSize(),
- SystemTag.POOL_NAME.toString(),
- entry.getKey());
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_WAITING_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> entry.getValue().getQueue().size(),
- SystemTag.POOL_NAME.toString(),
- entry.getKey());
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_DONE_TASK_COUNT.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> entry.getValue().getCompletedTaskCount(),
- SystemTag.POOL_NAME.toString(),
- entry.getKey());
- metricService.createAutoGauge(
- SystemMetric.THREAD_POOL_LARGEST_POOL_SIZE.toString(),
- MetricLevel.IMPORTANT,
- registeredPoolMap,
- map -> entry.getValue().getLargestPoolSize(),
- SystemTag.POOL_NAME.toString(),
- entry.getKey());
- }
- registeredPoolMap.putAll(notRegisteredPoolMap);
- notRegisteredPoolMap.clear();
+ public synchronized void registerThreadPool(IThreadPoolMBean pool, String
name) {
+ if (metricService == null) {
+ notRegisteredPoolMap.put(name, pool);
+ } else {
+ registeredPoolMap.put(name, pool);
+ metricService.createAutoGauge(
+ SystemMetric.THREAD_POOL_ACTIVE_THREAD_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ registeredPoolMap,
+ map -> registeredPoolMap.get(name).getActiveCount(),
+ SystemTag.POOL_NAME.toString(),
+ name);
+ metricService.createAutoGauge(
+ SystemMetric.THREAD_POOL_CORE_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ registeredPoolMap,
+ map -> registeredPoolMap.get(name).getCorePoolSize(),
+ SystemTag.POOL_NAME.toString(),
+ name);
+ metricService.createAutoGauge(
+ SystemMetric.THREAD_POOL_WAITING_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ registeredPoolMap,
+ map -> registeredPoolMap.get(name).getQueueLength(),
+ SystemTag.POOL_NAME.toString(),
+ name);
+ metricService.createAutoGauge(
+ SystemMetric.THREAD_POOL_DONE_TASK_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ registeredPoolMap,
+ map -> registeredPoolMap.get(name).getCompletedTaskCount(),
+ SystemTag.POOL_NAME.toString(),
+ name);
+ metricService.createAutoGauge(
+ SystemMetric.THREAD_POOL_LARGEST_POOL_SIZE.toString(),
+ MetricLevel.IMPORTANT,
+ registeredPoolMap,
+ map -> registeredPoolMap.get(name).getLargestPoolSize(),
+ SystemTag.POOL_NAME.toString(),
+ name);
}
}
- @Override
- public void unbindFrom(AbstractMetricService metricService) {
- for (Map.Entry<String, IThreadPoolMBean> entry :
registeredPoolMap.entrySet()) {
+ public synchronized void unRegisterThreadPool(String name) {
+ if (metricService == null) {
+ notRegisteredPoolMap.remove(name);
+ } else {
+ registeredPoolMap.remove(name);
metricService.remove(
MetricType.GAUGE,
SystemMetric.THREAD_POOL_ACTIVE_THREAD_COUNT.toString(),
SystemTag.POOL_NAME.toString(),
- entry.getKey());
+ name);
metricService.remove(
MetricType.GAUGE,
SystemMetric.THREAD_POOL_CORE_SIZE.toString(),
SystemTag.POOL_NAME.toString(),
- entry.getKey());
+ name);
metricService.remove(
MetricType.GAUGE,
SystemMetric.THREAD_POOL_WAITING_TASK_COUNT.toString(),
SystemTag.POOL_NAME.toString(),
- entry.getKey());
+ name);
metricService.remove(
MetricType.GAUGE,
SystemMetric.THREAD_POOL_DONE_TASK_COUNT.toString(),
SystemTag.POOL_NAME.toString(),
- entry.getKey());
+ name);
metricService.remove(
MetricType.GAUGE,
SystemMetric.THREAD_POOL_LARGEST_POOL_SIZE.toString(),
SystemTag.POOL_NAME.toString(),
- entry.getKey());
+ name);
}
}
+ @Override
+ public synchronized void bindTo(AbstractMetricService metricService) {
+ this.metricService = metricService;
+ notRegisteredPoolMap.forEach((name, pool) -> registerThreadPool(pool,
name));
+ notRegisteredPoolMap.clear();
+ }
+
+ @Override
+ public synchronized void unbindFrom(AbstractMetricService metricService) {
+ registeredPoolMap.forEach((name, pool) -> unRegisterThreadPool(name));
+ }
+
private static class ThreadPoolMetricsHolder {
+
private static final ThreadPoolMetrics INSTANCE = new ThreadPoolMetrics();
private ThreadPoolMetricsHolder() {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedScheduledExecutorService.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedScheduledExecutorService.java
index 9ea0059a1f4..05aa8047739 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedScheduledExecutorService.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedScheduledExecutorService.java
@@ -80,11 +80,13 @@ public class WrappedScheduledExecutorService
public void shutdown() {
service.shutdown();
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
}
@Override
public List<Runnable> shutdownNow() {
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
return service.shutdownNow();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java
index 7e2663722f1..604a2ae438b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadExecutorService.java
@@ -56,11 +56,13 @@ public class WrappedSingleThreadExecutorService
public void shutdown() {
service.shutdown();
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
}
@Override
public List<Runnable> shutdownNow() {
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
return service.shutdownNow();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
index 082bce0b194..406a00371f4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java
@@ -80,11 +80,13 @@ public class WrappedSingleThreadScheduledExecutor
public void shutdown() {
service.shutdown();
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
}
@Override
public List<Runnable> shutdownNow() {
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
return service.shutdownNow();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index 2c24f045ca5..85e18e8ffdd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -96,11 +96,13 @@ public class WrappedThreadPoolExecutor extends
ThreadPoolExecutor
public void shutdown() {
super.shutdown();
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
}
@Override
public List<Runnable> shutdownNow() {
JMXService.deregisterMBean(mbeanName);
+ ThreadPoolMetrics.getInstance().unRegisterThreadPool(this.mbeanName);
return super.shutdownNow();
}