This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 70600eb632 Add metrics version compare (#13104)
70600eb632 is described below
commit 70600eb632776a72604a61aaa2d7a008a7f2c1bb
Author: Albumen Kevin <[email protected]>
AuthorDate: Sat Sep 30 10:06:21 2023 +0800
Add metrics version compare (#13104)
---
.../dubbo/metrics/collector/MetricsCollector.java | 8 +++
.../metrics/data/ApplicationStatComposite.java | 14 ++++-
.../dubbo/metrics/data/BaseStatComposite.java | 10 ++++
.../dubbo/metrics/data/MethodStatComposite.java | 17 +++++-
.../apache/dubbo/metrics/data/RtStatComposite.java | 18 +++++-
.../dubbo/metrics/data/ServiceStatComposite.java | 29 ++++++++-
.../apache/dubbo/metrics/report/MetricsExport.java | 7 +++
.../dubbo/metrics/report/MetricsReporter.java | 4 +-
.../collector/ConfigCenterMetricsCollector.java | 15 ++++-
.../collector/AggregateMetricsCollector.java | 66 +++++++++++++++------
.../metrics/collector/DefaultMetricsCollector.java | 20 +++++++
.../collector/HistogramMetricsCollector.java | 9 ++-
.../metrics/collector/sample/MetricsSampler.java | 9 +++
.../collector/sample/ThreadPoolMetricsSampler.java | 25 +++++---
.../sample/ThreadRejectMetricsCountSampler.java | 15 ++++-
.../metrics/report/AbstractMetricsReporter.java | 68 ++++++++++++++--------
.../metrics/report/nop/NopMetricsReporter.java | 2 +-
.../collector/MetadataMetricsCollector.java | 4 ++
.../prometheus/NopPrometheusMetricsReporter.java | 2 +-
.../prometheus/PrometheusMetricsReporter.java | 11 ++--
.../prometheus/PrometheusMetricsReporterCmd.java | 2 +-
.../prometheus/PrometheusMetricsReporterTest.java | 7 ++-
.../PrometheusMetricsThreadPoolTest.java | 5 +-
.../collector/RegistryMetricsCollector.java | 7 +++
.../registry/collector/RegistryStatComposite.java | 22 ++++++-
.../command/impl/DefaultMetricsReporterCmd.java | 2 +-
26 files changed, 316 insertions(+), 82 deletions(-)
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/MetricsCollector.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/MetricsCollector.java
index f8b676cd3b..19bd58bba9 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/MetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/collector/MetricsCollector.java
@@ -43,6 +43,14 @@ public interface MetricsCollector<E extends
TimeCounterEvent> extends MetricsLif
*/
List<MetricSample> collect();
+ /**
+ * Check if samples have been changed.
+ * Note that this method will reset the changed flag to false using CAS.
+ *
+ * @return true if samples have been changed
+ */
+ boolean calSamplesChanged();
+
default void initMetrics(MetricsEvent event) {};
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ApplicationStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ApplicationStatComposite.java
index 47a432e480..d0bfe24bb2 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ApplicationStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ApplicationStatComposite.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -46,11 +47,16 @@ public class ApplicationStatComposite extends
AbstractMetricsExport {
private final Map<MetricsKey, AtomicLong> applicationNumStats = new
ConcurrentHashMap<>();
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
+
public void init(List<MetricsKey> appKeys) {
if (CollectionUtils.isEmpty(appKeys)) {
return;
}
- appKeys.forEach(appKey -> applicationNumStats.put(appKey, new
AtomicLong(0L)));
+ appKeys.forEach(appKey -> {
+ applicationNumStats.put(appKey, new AtomicLong(0L));
+ });
+ samplesChanged.set(true);
}
public void incrementSize(MetricsKey metricsKey, int size) {
@@ -78,4 +84,10 @@ public class ApplicationStatComposite extends
AbstractMetricsExport {
return applicationNumStats;
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/BaseStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/BaseStatComposite.java
index 6dde0939bf..f3fcd2d997 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/BaseStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/BaseStatComposite.java
@@ -133,4 +133,14 @@ public abstract class BaseStatComposite implements
MetricsExport {
public RtStatComposite getRtStatComposite() {
return rtStatComposite;
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ // Should ensure that all the composite's samplesChanged have been
compareAndSet, and cannot flip the `or` logic
+ boolean changed = applicationStatComposite.calSamplesChanged();
+ changed = rtStatComposite.calSamplesChanged() || changed;
+ changed = serviceStatComposite.calSamplesChanged() || changed;
+ changed = methodStatComposite.calSamplesChanged() || changed;
+ return changed;
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
index 06cd8ade6e..1e26b7e25c 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/MethodStatComposite.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -43,6 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class MethodStatComposite extends AbstractMetricsExport {
private boolean serviceLevel;
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
+
public MethodStatComposite(ApplicationModel applicationModel) {
super(applicationModel);
this.serviceLevel = MethodMetric.isServiceLevel(getApplicationModel());
@@ -54,7 +57,10 @@ public class MethodStatComposite extends
AbstractMetricsExport {
if (CollectionUtils.isEmpty(metricsKeyWrappers)) {
return;
}
- metricsKeyWrappers.forEach(appKey -> methodNumStats.put(appKey, new
ConcurrentHashMap<>()));
+ metricsKeyWrappers.forEach(appKey -> {
+ methodNumStats.put(appKey, new ConcurrentHashMap<>());
+ });
+ samplesChanged.set(true);
}
public void initMethodKey(MetricsKeyWrapper wrapper, Invocation
invocation) {
@@ -63,6 +69,7 @@ public class MethodStatComposite extends
AbstractMetricsExport {
}
methodNumStats.get(wrapper).computeIfAbsent(new
MethodMetric(getApplicationModel(), invocation, serviceLevel), k -> new
AtomicLong(0L));
+ samplesChanged.set(true);
}
public void incrementMethodKey(MetricsKeyWrapper wrapper, MethodMetric
methodMetric, int size) {
@@ -71,7 +78,8 @@ public class MethodStatComposite extends
AbstractMetricsExport {
}
AtomicLong stat = methodNumStats.get(wrapper).get(methodMetric);
if (stat == null) {
- methodNumStats.get(wrapper).putIfAbsent(methodMetric, new
AtomicLong(0L));
+ methodNumStats.get(wrapper).computeIfAbsent(methodMetric, (k)->
new AtomicLong(0L));
+ samplesChanged.set(true);
stat = methodNumStats.get(wrapper).get(methodMetric);
}
stat.getAndAdd(size);
@@ -97,4 +105,9 @@ public class MethodStatComposite extends
AbstractMetricsExport {
return list;
}
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
index d335c0dfba..878349542b 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/RtStatComposite.java
@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.BiConsumer;
@@ -52,6 +53,8 @@ import java.util.stream.Collectors;
public class RtStatComposite extends AbstractMetricsExport {
private boolean serviceLevel;
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
+
public RtStatComposite(ApplicationModel applicationModel) {
super(applicationModel);
this.serviceLevel = MethodMetric.isServiceLevel(getApplicationModel());
@@ -66,10 +69,10 @@ public class RtStatComposite extends AbstractMetricsExport {
for (MetricsPlaceValue placeValue : placeValues) {
List<LongContainer<? extends Number>> containers =
initStats(placeValue);
for (LongContainer<? extends Number> container : containers) {
-
rtStats.computeIfAbsent(container.getMetricsKeyWrapper().getType(), k -> new
ArrayList<>())
- .add(container);
+
rtStats.computeIfAbsent(container.getMetricsKeyWrapper().getType(), k -> new
ArrayList<>()).add(container);
}
}
+ samplesChanged.set(true);
}
private List<LongContainer<? extends Number>> initStats(MetricsPlaceValue
placeValue) {
@@ -95,6 +98,7 @@ public class RtStatComposite extends AbstractMetricsExport {
Number current = (Number) container.get(key);
if (current == null) {
container.putIfAbsent(key, container.getInitFunc().apply(key));
+ samplesChanged.set(true);
current = (Number) container.get(key);
}
container.getConsumerFunc().accept(responseTime, current);
@@ -114,6 +118,7 @@ public class RtStatComposite extends AbstractMetricsExport {
if (actions == null) {
actions = calServiceRtActions(invocation, registryOpType);
cache.putIfAbsent(registryOpType, actions);
+ samplesChanged.set(true);
actions = cache.get(registryOpType);
}
} else {
@@ -134,6 +139,7 @@ public class RtStatComposite extends AbstractMetricsExport {
Number current = (Number) container.get(key);
if (current == null) {
container.putIfAbsent(key, container.getInitFunc().apply(key));
+ samplesChanged.set(true);
current = (Number) container.get(key);
}
actions.add(new Action(container.getConsumerFunc(), current));
@@ -155,6 +161,7 @@ public class RtStatComposite extends AbstractMetricsExport {
if (actions == null) {
actions = calMethodRtActions(invocation, registryOpType);
cache.putIfAbsent(registryOpType, actions);
+ samplesChanged.set(true);
actions = cache.get(registryOpType);
}
} else {
@@ -174,6 +181,7 @@ public class RtStatComposite extends AbstractMetricsExport {
Number current = (Number) container.get(key);
if (current == null) {
container.putIfAbsent(key, container.getInitFunc().apply(key));
+ samplesChanged.set(true);
current = (Number) container.get(key);
}
actions.add(new Action(container.getConsumerFunc(), current));
@@ -218,4 +226,10 @@ public class RtStatComposite extends AbstractMetricsExport
{
consumerFunc.accept(responseTime, initValue);
}
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
index afedd71641..1cf214293e 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/data/ServiceStatComposite.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -39,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class ServiceStatComposite extends AbstractMetricsExport {
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
+
public ServiceStatComposite(ApplicationModel applicationModel) {
super(applicationModel);
}
@@ -49,7 +52,10 @@ public class ServiceStatComposite extends
AbstractMetricsExport {
if (CollectionUtils.isEmpty(metricsKeyWrappers)) {
return;
}
- metricsKeyWrappers.forEach(appKey ->
serviceWrapperNumStats.put(appKey, new ConcurrentHashMap<>()));
+ metricsKeyWrappers.forEach(appKey -> {
+ serviceWrapperNumStats.put(appKey, new ConcurrentHashMap<>());
+ });
+ samplesChanged.set(true);
}
public void incrementServiceKey(MetricsKeyWrapper wrapper, String
serviceKey, int size) {
@@ -64,7 +70,13 @@ public class ServiceStatComposite extends
AbstractMetricsExport {
if (extra != null) {
serviceKeyMetric.setExtraInfo(extra);
}
- serviceWrapperNumStats.get(wrapper).computeIfAbsent(serviceKeyMetric,
k -> new AtomicLong(0L)).getAndAdd(size);
+ Map<ServiceKeyMetric, AtomicLong> map =
serviceWrapperNumStats.get(wrapper);
+ AtomicLong metrics = map.get(serviceKeyMetric);
+ if (metrics == null) {
+ metrics = map.computeIfAbsent(serviceKeyMetric, k -> new
AtomicLong(0L));
+ samplesChanged.set(true);
+ }
+ metrics.getAndAdd(size);
// MetricsSupport.fillZero(serviceWrapperNumStats);
}
@@ -80,7 +92,13 @@ public class ServiceStatComposite extends
AbstractMetricsExport {
if (extra != null) {
serviceKeyMetric.setExtraInfo(extra);
}
- serviceWrapperNumStats.get(wrapper).computeIfAbsent(serviceKeyMetric,
k -> new AtomicLong(0L)).set(num);
+ Map<ServiceKeyMetric, AtomicLong> stats =
serviceWrapperNumStats.get(wrapper);
+ AtomicLong metrics = stats.get(serviceKeyMetric);
+ if (metrics == null) {
+ metrics = stats.computeIfAbsent(serviceKeyMetric, k -> new
AtomicLong(0L));
+ samplesChanged.set(true);
+ }
+ metrics.set(num);
}
@Override
@@ -95,4 +113,9 @@ public class ServiceStatComposite extends
AbstractMetricsExport {
return list;
}
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsExport.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsExport.java
index d2b7c9eca5..3b1b9f3459 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsExport.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsExport.java
@@ -33,4 +33,11 @@ public interface MetricsExport {
*/
List<MetricSample> export(MetricsCategory category);
+ /**
+ * Check if samples have been changed.
+ * Note that this method will reset the changed flag to false using CAS.
+ *
+ * @return true if samples have been changed
+ */
+ boolean calSamplesChanged();
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsReporter.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsReporter.java
index 14bdc70217..50770e249f 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsReporter.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/report/MetricsReporter.java
@@ -28,10 +28,10 @@ public interface MetricsReporter {
*/
void init();
- void refreshData();
+ void resetIfSamplesChanged();
String getResponse();
-
+
default String getResponseWithName(String metricsName) {
return null;
}
diff --git
a/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
index d0dd1a7d7a..a86f97695e 100644
---
a/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-config-center/src/main/java/org/apache/dubbo/metrics/config/collector/ConfigCenterMetricsCollector.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.dubbo.metrics.model.MetricsCategory.CONFIGCENTER;
@@ -47,6 +48,7 @@ public class ConfigCenterMetricsCollector extends
CombMetricsCollector<ConfigCen
private Boolean collectEnabled = null;
private final ApplicationModel applicationModel;
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
private final Map<ConfigCenterMetric, AtomicLong> updatedMetrics = new
ConcurrentHashMap<>();
@@ -76,7 +78,12 @@ public class ConfigCenterMetricsCollector extends
CombMetricsCollector<ConfigCen
return;
}
ConfigCenterMetric metric = new
ConfigCenterMetric(applicationModel.getApplicationName(), key, group, protocol,
changeTypeName);
- updatedMetrics.computeIfAbsent(metric, k -> new
AtomicLong(0L)).addAndGet(size);
+ AtomicLong metrics = updatedMetrics.get(metric);
+ if (metrics == null) {
+ metrics = updatedMetrics.computeIfAbsent(metric, k -> new
AtomicLong(0L));
+ samplesChanged.set(true);
+ }
+ metrics.addAndGet(size);
}
@@ -91,5 +98,9 @@ public class ConfigCenterMetricsCollector extends
CombMetricsCollector<ConfigCen
return list;
}
-
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
index 5cc6aec5d9..f70e3bc0fd 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
@@ -44,6 +44,7 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
@@ -74,6 +75,7 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
private boolean enableRtPxx;
private boolean enableRt;
private boolean enableRequest;
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
private final ConcurrentMap<MethodMetric, TimeWindowAggregator> rtAgr =
new ConcurrentHashMap<>();
@@ -90,9 +92,9 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
AggregationConfig aggregation =
optional.get().getAggregation();
this.bucketNum =
Optional.ofNullable(aggregation.getBucketNum()).orElse(DEFAULT_BUCKET_NUM);
this.timeWindowSeconds =
Optional.ofNullable(aggregation.getTimeWindowSeconds())
- .orElse(DEFAULT_TIME_WINDOW_SECONDS);
+ .orElse(DEFAULT_TIME_WINDOW_SECONDS);
this.qpsTimeWindowMillSeconds =
Optional.ofNullable(aggregation.getQpsTimeWindowMillSeconds())
- .orElse(DEFAULT_QPS_TIME_WINDOW_MILL_SECONDS);
+ .orElse(DEFAULT_QPS_TIME_WINDOW_MILL_SECONDS);
this.enableQps =
Optional.ofNullable(aggregation.getEnableQps()).orElse(true);
this.enableRtPxx =
Optional.ofNullable(aggregation.getEnableRtPxx()).orElse(true);
this.enableRt =
Optional.ofNullable(aggregation.getEnableRt()).orElse(true);
@@ -127,8 +129,12 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
public void onEvent(RequestEvent event) {
if (enableQps) {
MethodMetric metric = calcWindowCounter(event,
MetricsKey.METRIC_REQUESTS);
- TimeWindowCounter qpsCounter =
ConcurrentHashMapUtils.computeIfAbsent(qps, metric,
- methodMetric -> new TimeWindowCounter(bucketNum,
TimeUnit.MILLISECONDS.toSeconds(qpsTimeWindowMillSeconds)));
+ TimeWindowCounter qpsCounter = qps.get(metric);
+ if (qpsCounter == null) {
+ qpsCounter = ConcurrentHashMapUtils.computeIfAbsent(qps,
metric,
+ methodMetric -> new TimeWindowCounter(bucketNum,
TimeUnit.MILLISECONDS.toSeconds(qpsTimeWindowMillSeconds)));
+ samplesChanged.set(true);
+ }
qpsCounter.increment();
}
}
@@ -163,14 +169,22 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
MethodMetric metric = new MethodMetric(applicationModel,
event.getAttachmentValue(MetricsConstants.INVOCATION), serviceLevel);
long responseTime = event.getTimePair().calc();
if (enableRt) {
- TimeWindowQuantile quantile =
ConcurrentHashMapUtils.computeIfAbsent(rt, metric,
- k -> new TimeWindowQuantile(DEFAULT_COMPRESSION, bucketNum,
timeWindowSeconds));
+ TimeWindowQuantile quantile = rt.get(metric);
+ if (quantile == null) {
+ quantile = ConcurrentHashMapUtils.computeIfAbsent(rt, metric,
+ k -> new TimeWindowQuantile(DEFAULT_COMPRESSION,
bucketNum, timeWindowSeconds));
+ samplesChanged.set(true);
+ }
quantile.add(responseTime);
}
if (enableRtPxx) {
- TimeWindowAggregator timeWindowAggregator =
ConcurrentHashMapUtils.computeIfAbsent(rtAgr, metric,
- methodMetric -> new TimeWindowAggregator(bucketNum,
timeWindowSeconds));
+ TimeWindowAggregator timeWindowAggregator = rtAgr.get(metric);
+ if (timeWindowAggregator == null) {
+ timeWindowAggregator =
ConcurrentHashMapUtils.computeIfAbsent(rtAgr, metric,
+ methodMetric -> new TimeWindowAggregator(bucketNum,
timeWindowSeconds));
+ samplesChanged.set(true);
+ }
timeWindowAggregator.add(responseTime);
}
}
@@ -183,8 +197,12 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
ConcurrentMap<MethodMetric, TimeWindowCounter> counter =
methodTypeCounter.computeIfAbsent(metricsKeyWrapper, k -> new
ConcurrentHashMap<>());
- TimeWindowCounter windowCounter =
ConcurrentHashMapUtils.computeIfAbsent(counter, metric,
- methodMetric -> new TimeWindowCounter(bucketNum,
timeWindowSeconds));
+ TimeWindowCounter windowCounter = counter.get(metric);
+ if (windowCounter == null) {
+ windowCounter = ConcurrentHashMapUtils.computeIfAbsent(counter,
metric,
+ methodMetric -> new TimeWindowCounter(bucketNum,
timeWindowSeconds));
+ samplesChanged.set(true);
+ }
windowCounter.increment();
return metric;
}
@@ -225,13 +243,13 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
ConcurrentHashMap<MethodMetric, TimeWindowCounter> windowCounter =
methodTypeCounter.get(metricsKeyWrapper);
if (windowCounter != null) {
windowCounter.forEach((k, v) -> list.add(new
GaugeMetricSample<>(metricsKey.getNameByType(k.getSide()),
- metricsKey.getDescription(), k.getTags(), REQUESTS, v,
TimeWindowCounter::get)));
+ metricsKey.getDescription(), k.getTags(), REQUESTS, v,
TimeWindowCounter::get)));
}
}
private void collectQPS(List<MetricSample> list) {
qps.forEach((k, v) -> list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_QPS.getNameByType(k.getSide()),
- MetricsKey.METRIC_QPS.getDescription(), k.getTags(), QPS, v, value
-> {
+ MetricsKey.METRIC_QPS.getDescription(), k.getTags(), QPS, v,
value -> {
double total = value.get();
long millSeconds = value.bucketLivedMillSeconds();
return total / millSeconds * 1000;
@@ -241,24 +259,24 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
private void collectRT(List<MetricSample> list) {
rt.forEach((k, v) -> {
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_P99.getNameByType(k.getSide()),
- MetricsKey.METRIC_RT_P99.getDescription(), k.getTags(), RT, v,
value -> value.quantile(0.99)));
+ MetricsKey.METRIC_RT_P99.getDescription(), k.getTags(),
RT, v, value -> value.quantile(0.99)));
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_P95.getNameByType(k.getSide()),
- MetricsKey.METRIC_RT_P95.getDescription(), k.getTags(), RT, v,
value -> value.quantile(0.95)));
+ MetricsKey.METRIC_RT_P95.getDescription(), k.getTags(),
RT, v, value -> value.quantile(0.95)));
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_P90.getNameByType(k.getSide()),
- MetricsKey.METRIC_RT_P90.getDescription(), k.getTags(), RT, v,
value -> value.quantile(0.90)));
+ MetricsKey.METRIC_RT_P90.getDescription(), k.getTags(),
RT, v, value -> value.quantile(0.90)));
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_P50.getNameByType(k.getSide()),
- MetricsKey.METRIC_RT_P50.getDescription(), k.getTags(), RT, v,
value -> value.quantile(0.50)));
+ MetricsKey.METRIC_RT_P50.getDescription(), k.getTags(),
RT, v, value -> value.quantile(0.50)));
});
rtAgr.forEach((k, v) -> {
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_MIN_AGG.getNameByType(k.getSide()),
- MetricsKey.METRIC_RT_MIN_AGG.getDescription(), k.getTags(),
RT, v, value -> v.get().getMin()));
+ MetricsKey.METRIC_RT_MIN_AGG.getDescription(),
k.getTags(), RT, v, value -> v.get().getMin()));
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_MAX_AGG.getNameByType(k.getSide()),
- MetricsKey.METRIC_RT_MAX_AGG.getDescription(), k.getTags(),
RT, v, value -> v.get().getMax()));
+ MetricsKey.METRIC_RT_MAX_AGG.getDescription(),
k.getTags(), RT, v, value -> v.get().getMax()));
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_AVG_AGG.getNameByType(k.getSide()),
- MetricsKey.METRIC_RT_AVG_AGG.getDescription(), k.getTags(),
RT, v, value -> v.get().getAvg()));
+ MetricsKey.METRIC_RT_AVG_AGG.getDescription(),
k.getTags(), RT, v, value -> v.get().getAvg()));
});
}
@@ -288,14 +306,17 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
public void initQpsMetric(MethodMetric metric) {
ConcurrentHashMapUtils.computeIfAbsent(qps, metric, methodMetric ->
new TimeWindowCounter(bucketNum, timeWindowSeconds));
+ samplesChanged.set(true);
}
public void initRtMetric(MethodMetric metric) {
ConcurrentHashMapUtils.computeIfAbsent(rt, metric, k -> new
TimeWindowQuantile(DEFAULT_COMPRESSION, bucketNum, timeWindowSeconds));
+ samplesChanged.set(true);
}
public void initRtAgrMetric(MethodMetric metric) {
ConcurrentHashMapUtils.computeIfAbsent(rtAgr, metric, k -> new
TimeWindowAggregator(bucketNum, timeWindowSeconds));
+ samplesChanged.set(true);
}
public void initWindowCounter(MetricsEvent event, MetricsKey targetKey) {
@@ -307,6 +328,13 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
ConcurrentMap<MethodMetric, TimeWindowCounter> counter =
methodTypeCounter.computeIfAbsent(metricsKeyWrapper, k -> new
ConcurrentHashMap<>());
ConcurrentHashMapUtils.computeIfAbsent(counter, metric, methodMetric
-> new TimeWindowCounter(bucketNum, timeWindowSeconds));
+ samplesChanged.set(true);
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
index 9d04633260..f1fd60d91a 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/DefaultMetricsCollector.java
@@ -72,6 +72,7 @@ public class DefaultMetricsCollector extends
CombMetricsCollector<RequestEvent>
private final AtomicBoolean initialized = new AtomicBoolean();
+ private final AtomicBoolean samplesChanged = new AtomicBoolean();
public DefaultMetricsCollector(ApplicationModel applicationModel) {
super(new BaseStatComposite(applicationModel) {
@@ -91,11 +92,13 @@ public class DefaultMetricsCollector extends
CombMetricsCollector<RequestEvent>
super.setEventMulticaster(new DefaultSubDispatcher(this));
samplers.add(applicationSampler);
samplers.add(threadPoolSampler);
+ samplesChanged.set(true);
this.applicationModel = applicationModel;
}
public void addSampler(MetricsSampler sampler) {
samplers.add(sampler);
+ samplesChanged.set(true);
}
public void setApplicationName(String applicationName) {
@@ -203,5 +206,22 @@ public class DefaultMetricsCollector extends
CombMetricsCollector<RequestEvent>
MetricsCountSampleConfigurer<String, MetricsEvent.Type,
ApplicationMetric> sampleConfigure) {
sampleConfigure.configureMetrics(configure -> new
ApplicationMetric(applicationModel));
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ return false;
+ }
};
+
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ boolean changed = samplesChanged.compareAndSet(true, false);
+ // Should ensure that all the sampler's samplesChanged have been
compareAndSet, and cannot flip the `or` logic
+ changed = stats.calSamplesChanged() || changed;
+ for (MetricsSampler sampler : samplers) {
+ changed = sampler.calSamplesChanged() || changed;
+ }
+ return changed;
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/HistogramMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/HistogramMetricsCollector.java
index 932c285d8e..a7169ba12a 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/HistogramMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/HistogramMetricsCollector.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.metrics.collector;
-import io.micrometer.core.instrument.Timer;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.context.ConfigManager;
@@ -33,6 +32,8 @@ import
org.apache.dubbo.metrics.register.HistogramMetricRegister;
import org.apache.dubbo.metrics.sample.HistogramMetricSample;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import io.micrometer.core.instrument.Timer;
+
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -110,4 +111,10 @@ public class HistogramMetricsCollector extends
AbstractMetricsListener<RequestEv
public List<MetricSample> collect() {
return new ArrayList<>();
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ // Histogram is directly register micrometer
+ return false;
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsSampler.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsSampler.java
index 55111af991..52578d627d 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsSampler.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsSampler.java
@@ -18,9 +18,18 @@
package org.apache.dubbo.metrics.collector.sample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
+
import java.util.List;
public interface MetricsSampler {
List<MetricSample> sample();
+
+ /**
+ * Check if samples have been changed.
+ * Note that this method will reset the changed flag to false using CAS.
+ *
+ * @return true if samples have been changed
+ */
+ boolean calSamplesChanged();
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java
index ec4a74b98d..9ccb55f8db 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadPoolMetricsSampler.java
@@ -23,13 +23,12 @@ import
org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
-import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.ThreadPoolMetric;
+import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.model.ApplicationModel;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -38,10 +37,10 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
-
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_METRICS_COLLECTOR_EXCEPTION;
import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;
@@ -56,6 +55,8 @@ public class ThreadPoolMetricsSampler implements
MetricsSampler {
private DataStore dataStore;
private final Map<String, ThreadPoolExecutor> sampleThreadPoolExecutor =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ThreadPoolMetric>
threadPoolMetricMap = new ConcurrentHashMap<>();
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
+
public ThreadPoolMetricsSampler(DefaultMetricsCollector collector) {
this.collector = collector;
}
@@ -63,7 +64,10 @@ public class ThreadPoolMetricsSampler implements
MetricsSampler {
public void addExecutors(String name, ExecutorService executorService) {
Optional.ofNullable(executorService).filter(Objects::nonNull).filter(e
-> e instanceof ThreadPoolExecutor)
.map(e -> (ThreadPoolExecutor) e)
- .ifPresent(threadPoolExecutor ->
sampleThreadPoolExecutor.put(name, threadPoolExecutor));
+ .ifPresent(threadPoolExecutor -> {
+ sampleThreadPoolExecutor.put(name, threadPoolExecutor);
+ samplesChanged.set(true);
+ });
}
@Override
@@ -113,7 +117,7 @@ public class ThreadPoolMetricsSampler implements
MetricsSampler {
for (Map.Entry<String, Object> entry : executors.entrySet()) {
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor instanceof ThreadPoolExecutor) {
- this.addExecutors( SERVER_THREAD_POOL_NAME + "-" +
entry.getKey(), executor);
+ this.addExecutors(SERVER_THREAD_POOL_NAME + "-" +
entry.getKey(), executor);
}
}
executors =
dataStore.get(CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY);
@@ -125,9 +129,9 @@ public class ThreadPoolMetricsSampler implements
MetricsSampler {
}
ThreadRejectMetricsCountSampler threadRejectMetricsCountSampler =
new ThreadRejectMetricsCountSampler(collector);
-
this.sampleThreadPoolExecutor.entrySet().stream().filter(entry->entry.getKey().startsWith(SERVER_THREAD_POOL_NAME)).forEach(entry->{
- if(entry.getValue().getRejectedExecutionHandler() instanceof
AbortPolicyWithReport) {
- MetricThreadPoolExhaustedListener
metricThreadPoolExhaustedListener=new
MetricThreadPoolExhaustedListener(entry.getKey(),threadRejectMetricsCountSampler);
+ this.sampleThreadPoolExecutor.entrySet().stream().filter(entry ->
entry.getKey().startsWith(SERVER_THREAD_POOL_NAME)).forEach(entry -> {
+ if (entry.getValue().getRejectedExecutionHandler() instanceof
AbortPolicyWithReport) {
+ MetricThreadPoolExhaustedListener
metricThreadPoolExhaustedListener = new
MetricThreadPoolExhaustedListener(entry.getKey(),
threadRejectMetricsCountSampler);
((AbortPolicyWithReport)
entry.getValue().getRejectedExecutionHandler()).addThreadPoolExhaustedEventListener(metricThreadPoolExhaustedListener);
}
});
@@ -137,4 +141,9 @@ public class ThreadPoolMetricsSampler implements
MetricsSampler {
}
}
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadRejectMetricsCountSampler.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadRejectMetricsCountSampler.java
index 160f3c31fc..2c0cbcd518 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadRejectMetricsCountSampler.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/ThreadRejectMetricsCountSampler.java
@@ -16,19 +16,23 @@
*/
package org.apache.dubbo.metrics.collector.sample;
+
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.model.Metric;
import org.apache.dubbo.metrics.model.MetricsCategory;
-import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.ThreadPoolRejectMetric;
+import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.ToDoubleFunction;
+
import static org.apache.dubbo.metrics.model.MetricsCategory.THREAD_POOL;
public class ThreadRejectMetricsCountSampler extends
SimpleMetricsCountSampler<String, String, ThreadPoolRejectMetric> {
@@ -36,6 +40,8 @@ public class ThreadRejectMetricsCountSampler extends
SimpleMetricsCountSampler<S
private final DefaultMetricsCollector collector;
private final Set<String> metricNames = new ConcurrentHashSet<>();
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
+
public ThreadRejectMetricsCountSampler(DefaultMetricsCollector collector) {
this.collector = collector;
this.collector.addSampler(this);
@@ -44,6 +50,7 @@ public class ThreadRejectMetricsCountSampler extends
SimpleMetricsCountSampler<S
public void addMetricName(String name){
this.metricNames.add(name);
this.initMetricsCounter(name,name);
+ samplesChanged.set(true);
}
@Override
@@ -82,4 +89,10 @@ public class ThreadRejectMetricsCountSampler extends
SimpleMetricsCountSampler<S
protected void countConfigure(MetricsCountSampleConfigurer<String, String,
ThreadPoolRejectMetric> sampleConfigure) {
sampleConfigure.configureMetrics(configure -> new
ThreadPoolRejectMetric(collector.getApplicationName(),configure.getSource()));
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/AbstractMetricsReporter.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/AbstractMetricsReporter.java
index 70033771c5..ac0700221f 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/AbstractMetricsReporter.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/AbstractMetricsReporter.java
@@ -148,38 +148,22 @@ public abstract class AbstractMetricsReporter implements
MetricsReporter {
NamedThreadFactory threadFactory = new
NamedThreadFactory("metrics-collector-sync-job", true);
collectorSyncJobExecutor = Executors.newScheduledThreadPool(1,
threadFactory);
- collectorSyncJobExecutor.scheduleWithFixedDelay(this::refreshData,
DEFAULT_SCHEDULE_INITIAL_DELAY, collectSyncPeriod, TimeUnit.SECONDS);
+
collectorSyncJobExecutor.scheduleWithFixedDelay(this::resetIfSamplesChanged,
DEFAULT_SCHEDULE_INITIAL_DELAY, collectSyncPeriod, TimeUnit.SECONDS);
}
}
- @SuppressWarnings({"unchecked", "rawtypes"})
- public void refreshData() {
+ @SuppressWarnings({"unchecked"})
+ public void resetIfSamplesChanged() {
collectors.forEach(collector -> {
+ if (!collector.calSamplesChanged()) {
+ // Metrics has not been changed since last time, no need to
reload
+ return;
+ }
+ // Collect all the samples and register them to the micrometer
registry
List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
try {
- switch (sample.getType()) {
- case GAUGE:
- GaugeMetricSample gaugeSample =
(GaugeMetricSample) sample;
- List<Tag> tags = getTags(gaugeSample);
-
- Gauge.builder(gaugeSample.getName(),
gaugeSample.getValue(), gaugeSample.getApply())
-
.description(gaugeSample.getDescription()).tags(tags).register(compositeRegistry);
- break;
- case COUNTER:
- CounterMetricSample counterMetricSample =
(CounterMetricSample) sample;
-
FunctionCounter.builder(counterMetricSample.getName(),
counterMetricSample.getValue(),
-
Number::doubleValue).description(counterMetricSample.getDescription())
- .tags(getTags(counterMetricSample))
- .register(compositeRegistry);
- case TIMER:
- case LONG_TASK_TIMER:
- case DISTRIBUTION_SUMMARY:
- // TODO
- break;
- default:
- break;
- }
+ registerSample(sample);
} catch (Exception e) {
logger.error(COMMON_METRICS_COLLECTOR_EXCEPTION, "", "",
"error occurred when synchronize metrics collector.", e);
}
@@ -187,6 +171,40 @@ public abstract class AbstractMetricsReporter implements
MetricsReporter {
});
}
+ @SuppressWarnings({"rawtypes"})
+ private void registerSample(MetricSample sample) {
+ switch (sample.getType()) {
+ case GAUGE:
+ registerGaugeSample((GaugeMetricSample) sample);
+ break;
+ case COUNTER:
+ registerCounterSample((CounterMetricSample) sample);
+ case TIMER:
+ case LONG_TASK_TIMER:
+ case DISTRIBUTION_SUMMARY:
+ // TODO
+ break;
+ default:
+ break;
+ }
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ private void registerCounterSample(CounterMetricSample sample) {
+ FunctionCounter.builder(sample.getName(), sample.getValue(),
Number::doubleValue)
+ .description(sample.getDescription())
+ .tags(getTags(sample))
+ .register(compositeRegistry);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void registerGaugeSample(GaugeMetricSample sample) {
+ Gauge.builder(sample.getName(), sample.getValue(), sample.getApply())
+ .description(sample.getDescription())
+ .tags(getTags(sample))
+ .register(compositeRegistry);
+ }
+
private static List<Tag> getTags(MetricSample gaugeSample) {
List<Tag> tags = new ArrayList<>();
gaugeSample.getTags().forEach((k, v) -> {
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/nop/NopMetricsReporter.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/nop/NopMetricsReporter.java
index 0d3a078378..f326f5467c 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/nop/NopMetricsReporter.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/report/nop/NopMetricsReporter.java
@@ -35,7 +35,7 @@ public class NopMetricsReporter implements MetricsReporter {
}
@Override
- public void refreshData() {
+ public void resetIfSamplesChanged() {
}
diff --git
a/dubbo-metrics/dubbo-metrics-metadata/src/main/java/org/apache/dubbo/metrics/metadata/collector/MetadataMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-metadata/src/main/java/org/apache/dubbo/metrics/metadata/collector/MetadataMetricsCollector.java
index f3127f1ab4..82b5e689d3 100644
---
a/dubbo-metrics/dubbo-metrics-metadata/src/main/java/org/apache/dubbo/metrics/metadata/collector/MetadataMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-metadata/src/main/java/org/apache/dubbo/metrics/metadata/collector/MetadataMetricsCollector.java
@@ -99,4 +99,8 @@ public class MetadataMetricsCollector extends
CombMetricsCollector<MetadataEvent
return list;
}
+ @Override
+ public boolean calSamplesChanged() {
+ return stats.calSamplesChanged();
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/NopPrometheusMetricsReporter.java
b/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/NopPrometheusMetricsReporter.java
index 1d1a82d3a7..a5bf1a94d9 100644
---
a/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/NopPrometheusMetricsReporter.java
+++
b/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/NopPrometheusMetricsReporter.java
@@ -31,7 +31,7 @@ public class NopPrometheusMetricsReporter implements
MetricsReporter {
}
@Override
- public void refreshData() {
+ public void resetIfSamplesChanged() {
}
diff --git
a/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporter.java
b/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporter.java
index ac484ab2a8..111f56634a 100644
---
a/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporter.java
+++
b/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporter.java
@@ -17,10 +17,6 @@
package org.apache.dubbo.metrics.prometheus;
-import io.micrometer.prometheus.PrometheusConfig;
-import io.micrometer.prometheus.PrometheusMeterRegistry;
-import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory;
-import io.prometheus.client.exporter.PushGateway;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -29,6 +25,11 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metrics.report.AbstractMetricsReporter;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import io.micrometer.prometheus.PrometheusConfig;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory;
+import io.prometheus.client.exporter.PushGateway;
+
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -90,7 +91,7 @@ public class PrometheusMetricsReporter extends
AbstractMetricsReporter {
protected void push(PushGateway pushGateway, String job) {
try {
- refreshData();
+ resetIfSamplesChanged();
pushGateway.pushAdd(prometheusRegistry.getPrometheusRegistry(),
job);
} catch (IOException e) {
logger.error(COMMON_METRICS_COLLECTOR_EXCEPTION, "", "", "Error
occurred when pushing metrics to prometheus: ", e);
diff --git
a/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterCmd.java
b/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterCmd.java
index baf50dbd6e..c5df78394c 100644
---
a/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterCmd.java
+++
b/dubbo-metrics/dubbo-metrics-prometheus/src/main/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterCmd.java
@@ -116,7 +116,7 @@ public class PrometheusMetricsReporterCmd implements
BaseCommand {
logger.debug("scrape begin");
}
- metricsReporter.refreshData();
+ metricsReporter.resetIfSamplesChanged();
if (logger.isDebugEnabled()) {
logger.debug(String.format("scrape end,Elapsed Timeļ¼%s",
System.currentTimeMillis() - begin));
diff --git
a/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterTest.java
b/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterTest.java
index 6b4122753e..8badfebab7 100644
---
a/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterTest.java
+++
b/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsReporterTest.java
@@ -17,14 +17,15 @@
package org.apache.dubbo.metrics.prometheus;
-import com.sun.net.httpserver.HttpServer;
-import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.nested.PrometheusConfig;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
+
+import com.sun.net.httpserver.HttpServer;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -143,7 +144,7 @@ class PrometheusMetricsReporterTest {
try {
HttpServer prometheusExporterHttpServer = HttpServer.create(new
InetSocketAddress(port), 0);
prometheusExporterHttpServer.createContext("/metrics",
httpExchange -> {
- reporter.refreshData();
+ reporter.resetIfSamplesChanged();
String response = reporter.getPrometheusRegistry().scrape();
httpExchange.sendResponseHeaders(200,
response.getBytes().length);
try (OutputStream os = httpExchange.getResponseBody()) {
diff --git
a/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java
b/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java
index 107c09ea97..4b30009cf1 100644
---
a/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java
+++
b/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.metrics.prometheus;
-import com.sun.net.httpserver.HttpServer;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.nested.PrometheusConfig;
@@ -25,6 +24,8 @@ import
org.apache.dubbo.metrics.collector.sample.ThreadRejectMetricsCountSampler
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import com.sun.net.httpserver.HttpServer;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -122,7 +123,7 @@ public class PrometheusMetricsThreadPoolTest {
try {
HttpServer prometheusExporterHttpServer = HttpServer.create(new
InetSocketAddress(port), 0);
prometheusExporterHttpServer.createContext("/metrics",
httpExchange -> {
- reporter.refreshData();
+ reporter.resetIfSamplesChanged();
String response = reporter.getPrometheusRegistry().scrape();
httpExchange.sendResponseHeaders(200,
response.getBytes().length);
try (OutputStream os = httpExchange.getResponseBody()) {
diff --git
a/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryMetricsCollector.java
index b510ed541b..6559a49ed2 100644
---
a/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryMetricsCollector.java
@@ -149,4 +149,11 @@ public class RegistryMetricsCollector extends
CombMetricsCollector<RegistryEvent
this.stats.setServiceKey(metricsKey, serviceKey, num, attachments);
}
+ @Override
+ public boolean calSamplesChanged() {
+ // Should ensure that all the stat's samplesChanged have been
compareAndSet, and cannot flip the `or` logic
+ boolean changed = stats.calSamplesChanged();
+ changed = internalStat.calSamplesChanged() || changed;
+ return changed;
+ }
}
diff --git
a/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
b/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
index a60c5c4b49..68bbd2fcba 100644
---
a/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
+++
b/dubbo-metrics/dubbo-metrics-registry/src/main/java/org/apache/dubbo/metrics/registry/collector/RegistryStatComposite.java
@@ -33,6 +33,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.dubbo.metrics.MetricsConstants.SELF_INCREMENT_SIZE;
@@ -41,6 +42,8 @@ public class RegistryStatComposite extends
AbstractMetricsExport {
private final Map<MetricsKey, Map<ApplicationMetric, AtomicLong>> appStats
= new ConcurrentHashMap<>();
+ private final AtomicBoolean samplesChanged = new AtomicBoolean(true);
+
public RegistryStatComposite(ApplicationModel applicationModel) {
super(applicationModel);
init(RegistryMetricsConstants.REGISTER_LEVEL_KEYS);
@@ -50,7 +53,10 @@ public class RegistryStatComposite extends
AbstractMetricsExport {
if (CollectionUtils.isEmpty(appKeys)) {
return;
}
- appKeys.forEach(appKey -> appStats.put(appKey, new
ConcurrentHashMap<>()));
+ appKeys.forEach(appKey -> {
+ appStats.put(appKey, new ConcurrentHashMap<>());
+ });
+ samplesChanged.set(true);
}
@Override
@@ -71,11 +77,23 @@ public class RegistryStatComposite extends
AbstractMetricsExport {
}
ApplicationMetric applicationMetric = new
ApplicationMetric(getApplicationModel());
applicationMetric.setExtraInfo(Collections.singletonMap(RegistryConstants.REGISTRY_CLUSTER_KEY.toLowerCase(),
name));
- appStats.get(metricsKey).computeIfAbsent(applicationMetric, k -> new
AtomicLong(0L)).getAndAdd(SELF_INCREMENT_SIZE);
+ Map<ApplicationMetric, AtomicLong> stats = appStats.get(metricsKey);
+ AtomicLong metrics = stats.get(applicationMetric);
+ if (metrics == null) {
+ metrics = stats.computeIfAbsent(applicationMetric, k -> new
AtomicLong(0L));
+ samplesChanged.set(true);
+ }
+ metrics.getAndAdd(SELF_INCREMENT_SIZE);
MetricsSupport.fillZero(appStats);
}
public Map<MetricsKey, Map<ApplicationMetric, AtomicLong>> getAppStats() {
return appStats;
}
+
+ @Override
+ public boolean calSamplesChanged() {
+ // CAS to get and reset the flag in an atomic operation
+ return samplesChanged.compareAndSet(true, false);
+ }
}
diff --git
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/DefaultMetricsReporterCmd.java
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/DefaultMetricsReporterCmd.java
index de8ec4b904..ceb4d5756b 100644
---
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/DefaultMetricsReporterCmd.java
+++
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/DefaultMetricsReporterCmd.java
@@ -102,7 +102,7 @@ public class DefaultMetricsReporterCmd implements
BaseCommand {
String response = "DefaultMetricsReporter not init";
MetricsReporter metricsReporter =
applicationModel.getBeanFactory().getBean(DefaultMetricsReporter.class);
if (metricsReporter != null) {
- metricsReporter.refreshData();
+ metricsReporter.resetIfSamplesChanged();
response = metricsReporter.getResponseWithName(metricsName);
}
return response;