This is an automated email from the ASF dual-hosted git repository.
songxiaosheng pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new c57118658c Metrics add sliding time window statistics (#12364)
c57118658c is described below
commit c57118658c89da09412cbd695f60ea0c3bdc1ec1
Author: jojocodeX <[email protected]>
AuthorDate: Thu May 25 13:49:09 2023 +0800
Metrics add sliding time window statistics (#12364)
* 添加Jackson序列化器
支持Jackson序列化器扩展配置
代码格式化调整
* Jackson module 注册错误忽略异常
* Jackson module 注册错误忽略异常
* add LdapJackson2Module Jackson module
* add spi to dubbo-all pom config
* remove unused code
* undo remove code
* Run CI
* merge conflict
* ignore codec error
* Change log output
* Add log error code
* Prapare 3.2.2 release
* Add sliding window stat
* fix checkstyle
* rename and version change
* MetricsKey rename
* MetricsKey rename
* change ut
---------
Co-authored-by: Albumen Kevin <[email protected]>
Co-authored-by: songxiaosheng <[email protected]>
---
.../metrics/aggregate/SampleAggregatedEntry.java | 68 ++++++++++
.../metrics/aggregate/TimeWindowAggregator.java | 151 +++++++++++++++++++++
.../apache/dubbo/metrics/model/key/MetricsKey.java | 4 +-
.../aggregate/TimeWindowAggregatorTest.java | 84 ++++++++++++
.../collector/AggregateMetricsCollector.java | 17 +++
.../collector/AggregateMetricsCollectorTest.java | 42 ++++++
6 files changed, 365 insertions(+), 1 deletion(-)
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java
new file mode 100644
index 0000000000..d4e222b970
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+
+public class SampleAggregatedEntry {
+
+ private Long count;
+ private double max;
+ private double min;
+ private double avg;
+ private double total;
+
+ public Long getCount() {
+ return count;
+ }
+
+ public void setCount(Long count) {
+ this.count = count;
+ }
+
+ public double getMax() {
+ return max;
+ }
+
+ public void setMax(double max) {
+ this.max = max;
+ }
+
+ public double getMin() {
+ return min;
+ }
+
+ public void setMin(double min) {
+ this.min = min;
+ }
+
+ public double getAvg() {
+ return avg;
+ }
+
+ public void setAvg(double avg) {
+ this.avg = avg;
+ }
+
+ public double getTotal() {
+ return total;
+ }
+
+ public void setTotal(double total) {
+ this.total = total;
+ }
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java
new file mode 100644
index 0000000000..082f563ebd
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.DoubleAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+
+public class TimeWindowAggregator {
+
+ private final SnapshotSlidingWindow slidingWindow;
+
+ public TimeWindowAggregator(int bucketNum, int timeWindowSeconds) {
+ this.slidingWindow = new SnapshotSlidingWindow(bucketNum,
TimeUnit.SECONDS.toMillis(timeWindowSeconds));
+ }
+
+ public SnapshotSlidingWindow getSlidingWindow() {
+ return slidingWindow;
+ }
+
+ public void add(double value) {
+ SnapshotObservation sample =
this.slidingWindow.currentPane().getValue();
+ sample.add(value);
+ }
+
+ public SampleAggregatedEntry get() {
+ SampleAggregatedEntry aggregatedEntry = new SampleAggregatedEntry();
+
+ double total = 0L;
+ long count = 0;
+ double max = Double.MIN_VALUE;
+ double min = Double.MAX_VALUE;
+
+ List<SnapshotObservation> windows = this.slidingWindow.values();
+
+ for (SnapshotObservation window : windows) {
+ total += window.getTotal();
+ count += window.getCount();
+
+ max = Math.max(max, window.getMax());
+ min = Math.min(min, window.getMin());
+ }
+
+ if (count > 0) {
+ double avg = total / count;
+ aggregatedEntry.setAvg(Math.round(avg * 100.0) / 100.0);
+ }else {
+ aggregatedEntry.setAvg(0);
+ }
+
+ aggregatedEntry.setMax(max == Double.MIN_VALUE ? 0 : max);
+ aggregatedEntry.setMin(min == Double.MAX_VALUE ? 0 : min);
+ aggregatedEntry.setTotal(total);
+ aggregatedEntry.setCount(count);
+
+ return aggregatedEntry;
+ }
+
+ public static class SnapshotSlidingWindow extends
SlidingWindow<SnapshotObservation> {
+
+ public SnapshotSlidingWindow(int sampleCount, long intervalInMs) {
+ super(sampleCount, intervalInMs);
+ }
+
+ @Override
+ public SnapshotObservation newEmptyValue(long timeMillis) {
+ return new SnapshotObservation();
+ }
+
+ @Override
+ protected Pane<SnapshotObservation> resetPaneTo(final
Pane<SnapshotObservation> pane, long startTime) {
+ pane.setStartInMs(startTime);
+ pane.getValue().reset();
+ return pane;
+ }
+ }
+
+ public static class SnapshotObservation {
+
+ private final AtomicReference<Double> min = new
AtomicReference<>(Double.MAX_VALUE);
+ private final AtomicReference<Double> max = new AtomicReference<>(0d);
+ private final DoubleAccumulator total = new DoubleAccumulator((x, y)
-> x + y, 0);
+ private final LongAdder count = new LongAdder();
+
+ public void add(double sample) {
+ total.accumulate(sample);
+ count.increment();
+ updateMin(sample);
+ updateMax(sample);
+ }
+
+ private void updateMin(double sample) {
+ Double curMin;
+ do {
+ curMin = min.get();
+ } while (sample < curMin && !min.compareAndSet(curMin, sample));
+ }
+
+ private void updateMax(double sample) {
+ Double curMax;
+ do {
+ curMax = max.get();
+ if (sample <= curMax) {
+ return;
+ }
+
+ } while (!max.compareAndSet(curMax, sample));
+ }
+
+ public void reset() {
+ min.set(Double.MAX_VALUE);
+ max.set(0d);
+ count.reset();
+ total.reset();
+ }
+
+ public double getMin() {
+ return min.get();
+ }
+
+ public double getMax() {
+ return max.get();
+ }
+
+ public Double getTotal() {
+ return total.get();
+ }
+
+ public long getCount() {
+ return count.sum();
+ }
+ }
+
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
index 09cb988118..b4d7997a19 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
@@ -57,7 +57,9 @@ public enum MetricsKey {
METRIC_RT_P95("dubbo.%s.rt.milliseconds.p95", "Response Time P95"),
METRIC_RT_P90("dubbo.%s.rt.milliseconds.p90", "Response Time P90"),
METRIC_RT_P50("dubbo.%s.rt.milliseconds.p50", "Response Time P50"),
-
+ METRIC_RT_MIN_AGG("dubbo.%s.rt.min.milliseconds.aggregate", "Aggregated
Min Response"),
+ METRIC_RT_MAX_AGG("dubbo.%s.rt.max.milliseconds.aggregate", "Aggregated
Max Response"),
+ METRIC_RT_AVG_AGG("dubbo.%s.rt.avg.milliseconds.aggregate", "Aggregated
Avg Response"),
// register metrics key
REGISTER_METRIC_REQUESTS("dubbo.registry.register.requests.total", "Total
Register Requests"),
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java
new file mode 100644
index 0000000000..fc2d42c614
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.concurrent.TimeUnit;
+
+public class TimeWindowAggregatorTest {
+ @Test
+ public void testTimeWindowAggregator() {
+ TimeWindowAggregator aggregator = new TimeWindowAggregator(5, 5);
+
+ // 第一个时间窗口,时间范围:0秒 - 5秒
+ aggregator.add(10);
+ aggregator.add(20);
+ aggregator.add(30);
+
+ SampleAggregatedEntry entry1 = aggregator.get();
+ Assertions.assertEquals(20, entry1.getAvg());
+ Assertions.assertEquals(60, entry1.getTotal());
+ Assertions.assertEquals(3, entry1.getCount());
+ Assertions.assertEquals(30, entry1.getMax());
+ Assertions.assertEquals(10, entry1.getMin());
+
+ // 第二个时间窗口,时间范围:5秒 - 10秒
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ aggregator.add(15);
+ aggregator.add(25);
+ aggregator.add(35);
+
+ SampleAggregatedEntry entry2 = aggregator.get();
+ Assertions.assertEquals(25, entry2.getAvg());
+ Assertions.assertEquals(75, entry2.getTotal());
+ Assertions.assertEquals(3, entry2.getCount());
+ Assertions.assertEquals(35, entry2.getMax());
+ Assertions.assertEquals(15, entry2.getMin());
+
+ // 第三个时间窗口,时间范围:10秒 - 15秒
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ aggregator.add(12);
+ aggregator.add(22);
+ aggregator.add(32);
+
+ SampleAggregatedEntry entry3 = aggregator.get();
+ Assertions.assertEquals(22, entry3.getAvg());
+ Assertions.assertEquals(66, entry3.getTotal());
+ Assertions.assertEquals(3, entry3.getCount());
+ Assertions.assertEquals(32, entry3.getMax());
+ Assertions.assertEquals(12, entry3.getMin());
+ }
+}
+
+
+
+
+
+
+
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
index 765f33e71d..171395c157 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.config.nested.AggregationConfig;
import org.apache.dubbo.metrics.MetricsConstants;
+import org.apache.dubbo.metrics.aggregate.TimeWindowAggregator;
import org.apache.dubbo.metrics.aggregate.TimeWindowCounter;
import org.apache.dubbo.metrics.aggregate.TimeWindowQuantile;
import org.apache.dubbo.metrics.event.MetricsEvent;
@@ -65,6 +66,8 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
private static final Integer DEFAULT_BUCKET_NUM = 10;
private static final Integer DEFAULT_TIME_WINDOW_SECONDS = 120;
private Boolean collectEnabled = null;
+ private final ConcurrentMap<MethodMetric, TimeWindowAggregator> rtAgr =
new ConcurrentHashMap<>();
+
public AggregateMetricsCollector(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
@@ -136,6 +139,9 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
long responseTime = event.getTimePair().calc();
TimeWindowQuantile quantile =
ConcurrentHashMapUtils.computeIfAbsent(rt, metric, k -> new
TimeWindowQuantile(DEFAULT_COMPRESSION, bucketNum, timeWindowSeconds));
quantile.add(responseTime);
+
+ TimeWindowAggregator timeWindowAggregator =
ConcurrentHashMapUtils.computeIfAbsent(rtAgr, metric, methodMetric -> new
TimeWindowAggregator(bucketNum, timeWindowSeconds));
+ timeWindowAggregator.add(responseTime);
}
@@ -204,6 +210,17 @@ public class AggregateMetricsCollector implements
MetricsCollector<RequestEvent>
list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_P50.getNameByType(k.getSide()),
MetricsKey.METRIC_RT_P50.getDescription(), k.getTags(), RT, v,
value -> value.quantile(0.50)));
});
+
+ rtAgr.forEach((k,v)->{
+ list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_MIN_AGG.getNameByType(k.getSide()),
+ MetricsKey.METRIC_RT_MIN_AGG.getDescription(), k.getTags(),
RT, v, value -> v.get().getMin()));
+
+ list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_MAX_AGG.getNameByType(k.getSide()),
+ MetricsKey.METRIC_RT_MAX_AGG.getDescription(), k.getTags(),
RT, v, value -> v.get().getMax()));
+
+ list.add(new
GaugeMetricSample<>(MetricsKey.METRIC_RT_AVG_AGG.getNameByType(k.getSide()),
+ MetricsKey.METRIC_RT_AVG_AGG.getDescription(), k.getTags(),
RT, v, value -> v.get().getAvg()));
+ });
}
private void registerListener() {
diff --git
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
index 08362c1bf6..383dac76f9 100644
---
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
+++
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
@@ -232,6 +232,48 @@ class AggregateMetricsCollectorTest {
Assertions.assertEquals(10000, ((TimeWindowCounter)
((GaugeMetricSample<?>) sample).getValue()).get());
}
+
+ @Test
+ public void testRtAggregation() {
+ metricsDispatcher.addListener(collector);
+ ConfigManager configManager =
applicationModel.getApplicationConfigManager();
+ MetricsConfig config = configManager.getMetrics().orElse(null);
+ AggregationConfig aggregationConfig = new AggregationConfig();
+ aggregationConfig.setEnabled(true);
+ config.setAggregation(aggregationConfig);
+
+ List<Long> rtList = new ArrayList<>();
+ rtList.add(10L);
+ rtList.add(20L);
+ rtList.add(30L);
+
+ for (Long requestTime: rtList) {
+ RequestEvent requestEvent =
RequestEvent.toRequestEvent(applicationModel, invocation);
+ TestRequestEvent testRequestEvent = new
TestRequestEvent(requestEvent.getSource(), requestEvent.getTypeWrapper());
+ testRequestEvent.putAttachment(MetricsConstants.INVOCATION,
invocation);
+ testRequestEvent.putAttachment(ATTACHMENT_KEY_SERVICE,
MetricsSupport.getInterfaceName(invocation));
+ testRequestEvent.putAttachment(MetricsConstants.INVOCATION_SIDE,
MetricsSupport.getSide(invocation));
+ testRequestEvent.setRt(requestTime);
+ MetricsEventBus.post(testRequestEvent, () -> null);
+ }
+
+ List<MetricSample> samples = collector.collect();
+ for (MetricSample sample : samples) {
+ GaugeMetricSample gaugeMetricSample = (GaugeMetricSample<?>)
sample;
+
+
if(gaugeMetricSample.getName().endsWith("max.milliseconds.aggregate")) {
+ Assertions.assertEquals(30,
gaugeMetricSample.applyAsDouble());
+ }
+ if
(gaugeMetricSample.getName().endsWith("min.milliseconds.aggregate")) {
+ Assertions.assertEquals(10L,
gaugeMetricSample.applyAsDouble());
+ }
+
+ if
(gaugeMetricSample.getName().endsWith("avg.milliseconds.aggregate")) {
+ Assertions.assertEquals(20L,
gaugeMetricSample.applyAsDouble());
+ }
+ }
+ }
+
@Test
void testP95AndP99() throws InterruptedException {