This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new cc32ebefa1 add thread pool metric (#11461)
cc32ebefa1 is described below
commit cc32ebefa1c9a96b03570073b7d6106ff1855740
Author: Webster <[email protected]>
AuthorDate: Sun Feb 12 08:57:35 2023 +0800
add thread pool metric (#11461)
* add thread pool metric,and run successfully
* add more thread pool metric,and run successfully
* delete comment
* avoid start import
* avoid start import and unused import
* avoid check
* ut pass
* Resolve unit test failures
The previous unit test run failed due to the introduction of thread pool
metrics,
Unit tests now filter thread pool metrics
* Spin-off the applicationModel from the DefaultMetric Collector
---------
Co-authored-by: dongdong.yang <[email protected]>
---
.../dubbo/common/constants/MetricsConstants.java | 4 +
.../config/deploy/DefaultApplicationDeployer.java | 1 +
.../dubbo/metrics/model/MetricsCategory.java | 1 +
.../org/apache/dubbo/metrics/model/MetricsKey.java | 7 ++
.../dubbo/metrics/model/ThreadPoolMetric.java | 123 +++++++++++++++++++++
.../metrics/collector/DefaultMetricsCollector.java | 42 ++++++-
.../dubbo/metrics/filter/MetricsFilterTest.java | 12 +-
.../collector/DefaultMetricsCollectorTest.java | 27 ++++-
8 files changed, 209 insertions(+), 8 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
index 55e92df088..89e0333b36 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/MetricsConstants.java
@@ -22,6 +22,8 @@ public interface MetricsConstants {
String TAG_IP = "ip";
+ String TAG_PID = "pid";
+
String TAG_HOSTNAME = "hostname";
String TAG_APPLICATION_NAME = "application.name";
@@ -77,4 +79,6 @@ public interface MetricsConstants {
String PROMETHEUS_DEFAULT_JOB_NAME = "default_dubbo_job";
String METRIC_FILTER_START_TIME = "metric_filter_start_time";
+
+ String TAG_THREAD_NAME = "thread.pool.name";
}
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
index 25115c169d..21613c019e 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
@@ -366,6 +366,7 @@ public class DefaultApplicationDeployer extends
AbstractDeployer<ApplicationMode
if (metricsConfig != null &&
PROTOCOL_PROMETHEUS.equals(metricsConfig.getProtocol())) {
collector.setCollectEnabled(true);
collector.addApplicationInfo(applicationModel.getApplicationName(),
Version.getVersion());
+ collector.addThreadPool(applicationModel.getFrameworkModel(),
applicationModel.getApplicationName());
String protocol = metricsConfig.getProtocol();
if (StringUtils.isNotEmpty(protocol)) {
MetricsReporterFactory metricsReporterFactory =
getExtensionLoader(MetricsReporterFactory.class).getAdaptiveExtension();
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsCategory.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsCategory.java
index dd0df8da47..96de914503 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsCategory.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsCategory.java
@@ -24,5 +24,6 @@ public enum MetricsCategory {
RT,
QPS,
REQUESTS,
+ THREAD_POOL,
APPLICATION
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsKey.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsKey.java
index 147e08a2de..82cf70b0b3 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsKey.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/MetricsKey.java
@@ -48,6 +48,13 @@ public enum MetricsKey {
PROVIDER_METRIC_RT_P99("dubbo.provider.rt.seconds.p99", "Response Time
P99"),
PROVIDER_METRIC_RT_P95("dubbo.provider.rt.seconds.p95", "Response Time
P95"),
+ THREAD_POOL_CORE_SIZE("dubbo.thread.pool.core.size","Thread Pool Core
Size"),
+ THREAD_POOL_LARGEST_SIZE("dubbo.thread.pool.largest.size","Thread Pool
Largest Size"),
+ THREAD_POOL_MAX_SIZE("dubbo.thread.pool.max.size","Thread Pool Max Size"),
+ THREAD_POOL_ACTIVE_SIZE("dubbo.thread.pool.active.size","Thread Pool
Active Size"),
+ THREAD_POOL_THREAD_COUNT("dubbo.thread.pool.thread.count","Thread Pool
Thread Count"),
+ THREAD_POOL_QUEUE_SIZE("dubbo.thread.pool.queue.size","Thread Pool Queue
Size"),
+
// consumer metrics key
;
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/ThreadPoolMetric.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/ThreadPoolMetric.java
new file mode 100644
index 0000000000..162925e9fa
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/ThreadPoolMetric.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.model;
+
+import org.apache.dubbo.common.utils.ConfigUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_IP;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_PID;
+import static org.apache.dubbo.common.constants.MetricsConstants.TAG_HOSTNAME;
+import static
org.apache.dubbo.common.constants.MetricsConstants.TAG_APPLICATION_NAME;
+import static
org.apache.dubbo.common.constants.MetricsConstants.TAG_THREAD_NAME;
+import static org.apache.dubbo.common.utils.NetUtils.getLocalHost;
+import static org.apache.dubbo.common.utils.NetUtils.getLocalHostName;
+
+public class ThreadPoolMetric {
+
+ private String applicationName;
+
+ private String threadPoolName;
+
+ private ThreadPoolExecutor threadPoolExecutor;
+
+ public ThreadPoolMetric(String applicationName, String threadPoolName,
ThreadPoolExecutor threadPoolExecutor) {
+ this.applicationName = applicationName;
+ this.threadPoolExecutor = threadPoolExecutor;
+ this.threadPoolName = threadPoolName;
+ }
+
+ public String getThreadPoolName() {
+ return threadPoolName;
+ }
+
+ public void setThreadPoolName(String threadPoolName) {
+ this.threadPoolName = threadPoolName;
+ }
+
+ public ThreadPoolExecutor getThreadPoolExecutor() {
+ return threadPoolExecutor;
+ }
+
+ public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
+ this.threadPoolExecutor = threadPoolExecutor;
+ }
+
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ public void setApplicationName(String applicationName) {
+ this.applicationName = applicationName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ThreadPoolMetric that = (ThreadPoolMetric) o;
+ return Objects.equals(applicationName, that.applicationName) &&
+ Objects.equals(threadPoolName, that.threadPoolName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(applicationName, threadPoolName);
+ }
+
+ public Map<String, String> getTags() {
+ Map<String, String> tags = new HashMap<>();
+ tags.put(TAG_IP, getLocalHost());
+ tags.put(TAG_PID, ConfigUtils.getPid()+"");
+ tags.put(TAG_HOSTNAME, getLocalHostName());
+ tags.put(TAG_APPLICATION_NAME, applicationName);
+
+ tags.put(TAG_THREAD_NAME, threadPoolName);
+ return tags;
+ }
+
+ public double getCorePoolSize() {
+ return threadPoolExecutor.getCorePoolSize();
+ }
+
+ public double getLargestPoolSize() {
+ return threadPoolExecutor.getLargestPoolSize();
+ }
+
+ public double getMaximumPoolSize() {
+ return threadPoolExecutor.getMaximumPoolSize();
+ }
+
+ public double getActiveCount() {
+ return threadPoolExecutor.getActiveCount();
+ }
+
+ public double getPoolSize(){
+ return threadPoolExecutor.getPoolSize();
+ }
+
+ public double getQueueSize(){
+ return threadPoolExecutor.getQueue().size();
+ }
+
+}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
index 5835b92051..ae7b2ebecd 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.metrics.collector;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.metrics.collector.stat.MetricsStatComposite;
import org.apache.dubbo.metrics.collector.stat.MetricsStatHandler;
import org.apache.dubbo.metrics.event.EmptyEvent;
@@ -24,17 +25,25 @@ import org.apache.dubbo.metrics.event.MetricsEvent;
import org.apache.dubbo.metrics.event.SimpleMetricsEventMulticaster;
import org.apache.dubbo.metrics.listener.MetricsListener;
import org.apache.dubbo.metrics.model.MetricsKey;
+import org.apache.dubbo.metrics.model.ThreadPoolMetric;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
+import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.Invocation;
import java.util.ArrayList;
import java.util.List;
+import java.util.HashSet;
+
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import static org.apache.dubbo.metrics.model.MetricsCategory.THREAD_POOL;
import static org.apache.dubbo.metrics.model.MetricsCategory.APPLICATION;
import static org.apache.dubbo.metrics.model.MetricsCategory.REQUESTS;
import static org.apache.dubbo.metrics.model.MetricsCategory.RT;
@@ -46,11 +55,12 @@ import static
org.apache.dubbo.metrics.model.MetricsCategory.RT;
public class DefaultMetricsCollector implements MetricsCollector {
private AtomicBoolean collectEnabled = new AtomicBoolean(false);
+ private final Set<ThreadPoolMetric> threadPoolMetricSet = new
HashSet<ThreadPoolMetric>();
private final MetricsStatComposite stats;
private final SimpleMetricsEventMulticaster eventMulticaster;
public DefaultMetricsCollector() {
- this.stats = new MetricsStatComposite(this);
+ this.stats = new MetricsStatComposite( this);
this.eventMulticaster = SimpleMetricsEventMulticaster.getInstance();
}
@@ -112,22 +122,46 @@ public class DefaultMetricsCollector implements
MetricsCollector {
public void addApplicationInfo(String applicationName, String version) {
doExecute(MetricsEvent.Type.APPLICATION_INFO, statHandler ->
statHandler.addApplication(applicationName,version));
}
+
+ public void addThreadPool(FrameworkModel frameworkModel, String
applicationName) {
+ FrameworkExecutorRepository frameworkExecutorRepository =
+
frameworkModel.getBeanFactory().getBean(FrameworkExecutorRepository.class);
+ addThreadPoolExecutor(applicationName, "SharedExecutor",
frameworkExecutorRepository.getSharedExecutor());
+ addThreadPoolExecutor(applicationName, "MappingRefreshingExecutor",
frameworkExecutorRepository.getMappingRefreshingExecutor());
+ addThreadPoolExecutor(applicationName, "PoolRouterExecutor",
frameworkExecutorRepository.getPoolRouterExecutor());
+ }
+
+ private void addThreadPoolExecutor(String applicationName, String
threadPoolName, ExecutorService executorService) {
+ Optional<ExecutorService> executorOptional =
Optional.ofNullable(executorService);
+ if (executorOptional.isPresent() && executorOptional.get() instanceof
ThreadPoolExecutor ) {
+ threadPoolMetricSet.add(new ThreadPoolMetric(applicationName,
threadPoolName,
+ (ThreadPoolExecutor) executorOptional.get()));
+ }
+ }
+
@Override
public List<MetricSample> collect() {
List<MetricSample> list = new ArrayList<>();
collectApplication(list);
collectRequests(list);
collectRT(list);
-
+ collectThreadPool(list);
return list;
}
+ private void collectThreadPool(List<MetricSample> list) {
+ threadPoolMetricSet.forEach(e -> list.add(new
GaugeMetricSample(MetricsKey.THREAD_POOL_CORE_SIZE, e.getTags(), THREAD_POOL,
e::getCorePoolSize)));
+ threadPoolMetricSet.forEach(e -> list.add(new
GaugeMetricSample(MetricsKey.THREAD_POOL_LARGEST_SIZE, e.getTags(),
THREAD_POOL, e::getLargestPoolSize)));
+ threadPoolMetricSet.forEach(e -> list.add(new
GaugeMetricSample(MetricsKey.THREAD_POOL_MAX_SIZE, e.getTags(), THREAD_POOL,
e::getMaximumPoolSize)));
+ threadPoolMetricSet.forEach(e -> list.add(new
GaugeMetricSample(MetricsKey.THREAD_POOL_ACTIVE_SIZE, e.getTags(), THREAD_POOL,
e::getActiveCount)));
+ threadPoolMetricSet.forEach(e -> list.add(new
GaugeMetricSample(MetricsKey.THREAD_POOL_THREAD_COUNT, e.getTags(),
THREAD_POOL, e::getPoolSize)));
+ threadPoolMetricSet.forEach(e -> list.add(new
GaugeMetricSample(MetricsKey.THREAD_POOL_QUEUE_SIZE, e.getTags(), THREAD_POOL,
e::getQueueSize)));
+ }
+
private void collectApplication(List<MetricSample> list) {
doCollect(MetricsEvent.Type.APPLICATION_INFO,
MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new
GaugeMetricSample(MetricsKey.APPLICATION_METRIC_INFO, k.getTags(),
APPLICATION, v::get))));
-
-
}
private void collectRequests(List<MetricSample> list) {
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
index 3f4236800e..88b2177313 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/filter/MetricsFilterTest.java
@@ -17,10 +17,13 @@
package org.apache.dubbo.metrics.filter;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+
+import org.apache.dubbo.common.constants.MetricsConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.model.MetricsKey;
@@ -322,6 +325,13 @@ class MetricsFilterTest {
private Map<String, MetricSample> getMetricsMap() {
List<MetricSample> samples = collector.collect();
- return
samples.stream().collect(Collectors.toMap(MetricSample::getName,
Function.identity()));
+ List<MetricSample> samples1 = new ArrayList<>();
+ for (MetricSample sample : samples) {
+ if (sample.getName().contains("dubbo.thread.pool")) {
+ continue;
+ }
+ samples1.add(sample);
+ }
+ return
samples1.stream().collect(Collectors.toMap(MetricSample::getName,
Function.identity()));
}
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/collector/DefaultMetricsCollectorTest.java
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/collector/DefaultMetricsCollectorTest.java
index 543791aa54..12d9571a37 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/collector/DefaultMetricsCollectorTest.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/collector/DefaultMetricsCollectorTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -55,6 +56,7 @@ class DefaultMetricsCollectorTest {
private String group;
private String version;
private RpcInvocation invocation;
+ public static final String DUBBO_THREAD_METRIC_MARK =
"dubbo.thread.pool";
@BeforeEach
public void setup() {
@@ -93,6 +95,9 @@ class DefaultMetricsCollectorTest {
List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
+ if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
+ continue;
+ }
Assertions.assertTrue(sample instanceof GaugeMetricSample);
GaugeMetricSample gaugeSample = (GaugeMetricSample) sample;
Map<String, String> tags = gaugeSample.getTags();
@@ -107,7 +112,14 @@ class DefaultMetricsCollectorTest {
collector.decreaseProcessingRequests(applicationName, invocation);
samples = collector.collect();
- Map<String, Long> sampleMap =
samples.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
+ List<MetricSample> samples1 = new ArrayList<>();
+ for (MetricSample sample : samples) {
+ if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
+ continue;
+ }
+ samples1.add(sample);
+ }
+ Map<String, Long> sampleMap =
samples1.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
Number number = ((GaugeMetricSample) k).getSupplier().get();
return number.longValue();
}));
@@ -125,6 +137,9 @@ class DefaultMetricsCollectorTest {
List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
+ if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
+ continue;
+ }
Map<String, String> tags = sample.getTags();
Assertions.assertEquals(tags.get(TAG_INTERFACE_KEY),
interfaceName);
@@ -132,8 +147,14 @@ class DefaultMetricsCollectorTest {
Assertions.assertEquals(tags.get(TAG_GROUP_KEY), group);
Assertions.assertEquals(tags.get(TAG_VERSION_KEY), version);
}
-
- Map<String, Long> sampleMap =
samples.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
+ List<MetricSample> samples1 = new ArrayList<>();
+ for (MetricSample sample : samples) {
+ if (sample.getName().contains(DUBBO_THREAD_METRIC_MARK)) {
+ continue;
+ }
+ samples1.add(sample);
+ }
+ Map<String, Long> sampleMap =
samples1.stream().collect(Collectors.toMap(MetricSample::getName, k -> {
Number number = ((GaugeMetricSample) k).getSupplier().get();
return number.longValue();
}));