This is an automated email from the ASF dual-hosted git repository. songxiaosheng pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push: new c57118658c Metrics add sliding time window statistics (#12364) c57118658c is described below commit c57118658c89da09412cbd695f60ea0c3bdc1ec1 Author: jojocodeX <571943...@qq.com> AuthorDate: Thu May 25 13:49:09 2023 +0800 Metrics add sliding time window statistics (#12364) * 添加Jackson序列化器 支持Jackson序列化器扩展配置 代码格式化调整 * Jackson module 注册错误忽略异常 * Jackson module 注册错误忽略异常 * add LdapJackson2Module Jackson module * add spi to dubbo-all pom config * remove unused code * undo remove code * Run CI * merge conflict * ignore codec error * Change log output * Add log error code * Prapare 3.2.2 release * Add sliding window stat * fix checkstyle * rename and version change * MetricsKey rename * MetricsKey rename * change ut --------- Co-authored-by: Albumen Kevin <jhq0...@gmail.com> Co-authored-by: songxiaosheng <songxiaosh...@elastic.link> --- .../metrics/aggregate/SampleAggregatedEntry.java | 68 ++++++++++ .../metrics/aggregate/TimeWindowAggregator.java | 151 +++++++++++++++++++++ .../apache/dubbo/metrics/model/key/MetricsKey.java | 4 +- .../aggregate/TimeWindowAggregatorTest.java | 84 ++++++++++++ .../collector/AggregateMetricsCollector.java | 17 +++ .../collector/AggregateMetricsCollectorTest.java | 42 ++++++ 6 files changed, 365 insertions(+), 1 deletion(-) diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java new file mode 100644 index 0000000000..d4e222b970 --- /dev/null +++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.metrics.aggregate; + + +public class SampleAggregatedEntry { + + private Long count; + private double max; + private double min; + private double avg; + private double total; + + public Long getCount() { + return count; + } + + public void setCount(Long count) { + this.count = count; + } + + public double getMax() { + return max; + } + + public void setMax(double max) { + this.max = max; + } + + public double getMin() { + return min; + } + + public void setMin(double min) { + this.min = min; + } + + public double getAvg() { + return avg; + } + + public void setAvg(double avg) { + this.avg = avg; + } + + public double getTotal() { + return total; + } + + public void setTotal(double total) { + this.total = total; + } +} diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java new file mode 100644 index 0000000000..082f563ebd --- /dev/null +++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.metrics.aggregate; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.DoubleAccumulator; +import java.util.concurrent.atomic.LongAdder; + +public class TimeWindowAggregator { + + private final SnapshotSlidingWindow slidingWindow; + + public TimeWindowAggregator(int bucketNum, int timeWindowSeconds) { + this.slidingWindow = new SnapshotSlidingWindow(bucketNum, TimeUnit.SECONDS.toMillis(timeWindowSeconds)); + } + + public SnapshotSlidingWindow getSlidingWindow() { + return slidingWindow; + } + + public void add(double value) { + SnapshotObservation sample = this.slidingWindow.currentPane().getValue(); + sample.add(value); + } + + public SampleAggregatedEntry get() { + SampleAggregatedEntry aggregatedEntry = new SampleAggregatedEntry(); + + double total = 0L; + long count = 0; + double max = Double.MIN_VALUE; + double min = Double.MAX_VALUE; + + List<SnapshotObservation> windows = this.slidingWindow.values(); + + for (SnapshotObservation window : windows) { + total += window.getTotal(); + count += window.getCount(); + + max = Math.max(max, window.getMax()); + min = Math.min(min, window.getMin()); + } + + if (count > 0) { + double avg = total / count; + aggregatedEntry.setAvg(Math.round(avg * 100.0) / 100.0); + }else { + aggregatedEntry.setAvg(0); + } + + aggregatedEntry.setMax(max == Double.MIN_VALUE ? 0 : max); + aggregatedEntry.setMin(min == Double.MAX_VALUE ? 0 : min); + aggregatedEntry.setTotal(total); + aggregatedEntry.setCount(count); + + return aggregatedEntry; + } + + public static class SnapshotSlidingWindow extends SlidingWindow<SnapshotObservation> { + + public SnapshotSlidingWindow(int sampleCount, long intervalInMs) { + super(sampleCount, intervalInMs); + } + + @Override + public SnapshotObservation newEmptyValue(long timeMillis) { + return new SnapshotObservation(); + } + + @Override + protected Pane<SnapshotObservation> resetPaneTo(final Pane<SnapshotObservation> pane, long startTime) { + pane.setStartInMs(startTime); + pane.getValue().reset(); + return pane; + } + } + + public static class SnapshotObservation { + + private final AtomicReference<Double> min = new AtomicReference<>(Double.MAX_VALUE); + private final AtomicReference<Double> max = new AtomicReference<>(0d); + private final DoubleAccumulator total = new DoubleAccumulator((x, y) -> x + y, 0); + private final LongAdder count = new LongAdder(); + + public void add(double sample) { + total.accumulate(sample); + count.increment(); + updateMin(sample); + updateMax(sample); + } + + private void updateMin(double sample) { + Double curMin; + do { + curMin = min.get(); + } while (sample < curMin && !min.compareAndSet(curMin, sample)); + } + + private void updateMax(double sample) { + Double curMax; + do { + curMax = max.get(); + if (sample <= curMax) { + return; + } + + } while (!max.compareAndSet(curMax, sample)); + } + + public void reset() { + min.set(Double.MAX_VALUE); + max.set(0d); + count.reset(); + total.reset(); + } + + public double getMin() { + return min.get(); + } + + public double getMax() { + return max.get(); + } + + public Double getTotal() { + return total.get(); + } + + public long getCount() { + return count.sum(); + } + } + +} diff --git a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java index 09cb988118..b4d7997a19 100644 --- a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java +++ b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java @@ -57,7 +57,9 @@ public enum MetricsKey { METRIC_RT_P95("dubbo.%s.rt.milliseconds.p95", "Response Time P95"), METRIC_RT_P90("dubbo.%s.rt.milliseconds.p90", "Response Time P90"), METRIC_RT_P50("dubbo.%s.rt.milliseconds.p50", "Response Time P50"), - + METRIC_RT_MIN_AGG("dubbo.%s.rt.min.milliseconds.aggregate", "Aggregated Min Response"), + METRIC_RT_MAX_AGG("dubbo.%s.rt.max.milliseconds.aggregate", "Aggregated Max Response"), + METRIC_RT_AVG_AGG("dubbo.%s.rt.avg.milliseconds.aggregate", "Aggregated Avg Response"), // register metrics key REGISTER_METRIC_REQUESTS("dubbo.registry.register.requests.total", "Total Register Requests"), diff --git a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java new file mode 100644 index 0000000000..fc2d42c614 --- /dev/null +++ b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.metrics.aggregate; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.concurrent.TimeUnit; + +public class TimeWindowAggregatorTest { + @Test + public void testTimeWindowAggregator() { + TimeWindowAggregator aggregator = new TimeWindowAggregator(5, 5); + + // 第一个时间窗口,时间范围:0秒 - 5秒 + aggregator.add(10); + aggregator.add(20); + aggregator.add(30); + + SampleAggregatedEntry entry1 = aggregator.get(); + Assertions.assertEquals(20, entry1.getAvg()); + Assertions.assertEquals(60, entry1.getTotal()); + Assertions.assertEquals(3, entry1.getCount()); + Assertions.assertEquals(30, entry1.getMax()); + Assertions.assertEquals(10, entry1.getMin()); + + // 第二个时间窗口,时间范围:5秒 - 10秒 + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + aggregator.add(15); + aggregator.add(25); + aggregator.add(35); + + SampleAggregatedEntry entry2 = aggregator.get(); + Assertions.assertEquals(25, entry2.getAvg()); + Assertions.assertEquals(75, entry2.getTotal()); + Assertions.assertEquals(3, entry2.getCount()); + Assertions.assertEquals(35, entry2.getMax()); + Assertions.assertEquals(15, entry2.getMin()); + + // 第三个时间窗口,时间范围:10秒 - 15秒 + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + aggregator.add(12); + aggregator.add(22); + aggregator.add(32); + + SampleAggregatedEntry entry3 = aggregator.get(); + Assertions.assertEquals(22, entry3.getAvg()); + Assertions.assertEquals(66, entry3.getTotal()); + Assertions.assertEquals(3, entry3.getCount()); + Assertions.assertEquals(32, entry3.getMax()); + Assertions.assertEquals(12, entry3.getMin()); + } +} + + + + + + + diff --git a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java index 765f33e71d..171395c157 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java +++ b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java @@ -22,6 +22,7 @@ import org.apache.dubbo.config.MetricsConfig; import org.apache.dubbo.config.context.ConfigManager; import org.apache.dubbo.config.nested.AggregationConfig; import org.apache.dubbo.metrics.MetricsConstants; +import org.apache.dubbo.metrics.aggregate.TimeWindowAggregator; import org.apache.dubbo.metrics.aggregate.TimeWindowCounter; import org.apache.dubbo.metrics.aggregate.TimeWindowQuantile; import org.apache.dubbo.metrics.event.MetricsEvent; @@ -65,6 +66,8 @@ public class AggregateMetricsCollector implements MetricsCollector<RequestEvent> private static final Integer DEFAULT_BUCKET_NUM = 10; private static final Integer DEFAULT_TIME_WINDOW_SECONDS = 120; private Boolean collectEnabled = null; + private final ConcurrentMap<MethodMetric, TimeWindowAggregator> rtAgr = new ConcurrentHashMap<>(); + public AggregateMetricsCollector(ApplicationModel applicationModel) { this.applicationModel = applicationModel; @@ -136,6 +139,9 @@ public class AggregateMetricsCollector implements MetricsCollector<RequestEvent> long responseTime = event.getTimePair().calc(); TimeWindowQuantile quantile = ConcurrentHashMapUtils.computeIfAbsent(rt, metric, k -> new TimeWindowQuantile(DEFAULT_COMPRESSION, bucketNum, timeWindowSeconds)); quantile.add(responseTime); + + TimeWindowAggregator timeWindowAggregator = ConcurrentHashMapUtils.computeIfAbsent(rtAgr, metric, methodMetric -> new TimeWindowAggregator(bucketNum, timeWindowSeconds)); + timeWindowAggregator.add(responseTime); } @@ -204,6 +210,17 @@ public class AggregateMetricsCollector implements MetricsCollector<RequestEvent> list.add(new GaugeMetricSample<>(MetricsKey.METRIC_RT_P50.getNameByType(k.getSide()), MetricsKey.METRIC_RT_P50.getDescription(), k.getTags(), RT, v, value -> value.quantile(0.50))); }); + + rtAgr.forEach((k,v)->{ + list.add(new GaugeMetricSample<>(MetricsKey.METRIC_RT_MIN_AGG.getNameByType(k.getSide()), + MetricsKey.METRIC_RT_MIN_AGG.getDescription(), k.getTags(), RT, v, value -> v.get().getMin())); + + list.add(new GaugeMetricSample<>(MetricsKey.METRIC_RT_MAX_AGG.getNameByType(k.getSide()), + MetricsKey.METRIC_RT_MAX_AGG.getDescription(), k.getTags(), RT, v, value -> v.get().getMax())); + + list.add(new GaugeMetricSample<>(MetricsKey.METRIC_RT_AVG_AGG.getNameByType(k.getSide()), + MetricsKey.METRIC_RT_AVG_AGG.getDescription(), k.getTags(), RT, v, value -> v.get().getAvg())); + }); } private void registerListener() { diff --git a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java index 08362c1bf6..383dac76f9 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java +++ b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java @@ -232,6 +232,48 @@ class AggregateMetricsCollectorTest { Assertions.assertEquals(10000, ((TimeWindowCounter) ((GaugeMetricSample<?>) sample).getValue()).get()); } + + @Test + public void testRtAggregation() { + metricsDispatcher.addListener(collector); + ConfigManager configManager = applicationModel.getApplicationConfigManager(); + MetricsConfig config = configManager.getMetrics().orElse(null); + AggregationConfig aggregationConfig = new AggregationConfig(); + aggregationConfig.setEnabled(true); + config.setAggregation(aggregationConfig); + + List<Long> rtList = new ArrayList<>(); + rtList.add(10L); + rtList.add(20L); + rtList.add(30L); + + for (Long requestTime: rtList) { + RequestEvent requestEvent = RequestEvent.toRequestEvent(applicationModel, invocation); + TestRequestEvent testRequestEvent = new TestRequestEvent(requestEvent.getSource(), requestEvent.getTypeWrapper()); + testRequestEvent.putAttachment(MetricsConstants.INVOCATION, invocation); + testRequestEvent.putAttachment(ATTACHMENT_KEY_SERVICE, MetricsSupport.getInterfaceName(invocation)); + testRequestEvent.putAttachment(MetricsConstants.INVOCATION_SIDE, MetricsSupport.getSide(invocation)); + testRequestEvent.setRt(requestTime); + MetricsEventBus.post(testRequestEvent, () -> null); + } + + List<MetricSample> samples = collector.collect(); + for (MetricSample sample : samples) { + GaugeMetricSample gaugeMetricSample = (GaugeMetricSample<?>) sample; + + if(gaugeMetricSample.getName().endsWith("max.milliseconds.aggregate")) { + Assertions.assertEquals(30, gaugeMetricSample.applyAsDouble()); + } + if (gaugeMetricSample.getName().endsWith("min.milliseconds.aggregate")) { + Assertions.assertEquals(10L, gaugeMetricSample.applyAsDouble()); + } + + if (gaugeMetricSample.getName().endsWith("avg.milliseconds.aggregate")) { + Assertions.assertEquals(20L, gaugeMetricSample.applyAsDouble()); + } + } + } + @Test void testP95AndP99() throws InterruptedException {