This is an automated email from the ASF dual-hosted git repository.

songxiaosheng pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new c57118658c Metrics add sliding time window statistics (#12364)
c57118658c is described below

commit c57118658c89da09412cbd695f60ea0c3bdc1ec1
Author: jojocodeX <571943...@qq.com>
AuthorDate: Thu May 25 13:49:09 2023 +0800

    Metrics add sliding time window statistics (#12364)
    
    * 添加Jackson序列化器
    支持Jackson序列化器扩展配置
    代码格式化调整
    
    * Jackson module 注册错误忽略异常
    
    * Jackson module 注册错误忽略异常
    
    * add LdapJackson2Module Jackson module
    
    * add spi to dubbo-all pom config
    
    * remove unused code
    
    * undo remove code
    
    * Run CI
    
    * merge conflict
    
    * ignore codec error
    
    * Change log output
    
    * Add log error code
    
    * Prapare 3.2.2 release
    
    * Add sliding window stat
    
    * fix checkstyle
    
    * rename and version change
    
    * MetricsKey rename
    
    * MetricsKey rename
    
    * change ut
    
    ---------
    
    Co-authored-by: Albumen Kevin <jhq0...@gmail.com>
    Co-authored-by: songxiaosheng <songxiaosh...@elastic.link>
---
 .../metrics/aggregate/SampleAggregatedEntry.java   |  68 ++++++++++
 .../metrics/aggregate/TimeWindowAggregator.java    | 151 +++++++++++++++++++++
 .../apache/dubbo/metrics/model/key/MetricsKey.java |   4 +-
 .../aggregate/TimeWindowAggregatorTest.java        |  84 ++++++++++++
 .../collector/AggregateMetricsCollector.java       |  17 +++
 .../collector/AggregateMetricsCollectorTest.java   |  42 ++++++
 6 files changed, 365 insertions(+), 1 deletion(-)

diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java
 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java
new file mode 100644
index 0000000000..d4e222b970
--- /dev/null
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SampleAggregatedEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+
+public class SampleAggregatedEntry {
+
+    private Long count;
+    private double max;
+    private double min;
+    private double avg;
+    private double total;
+
+    public Long getCount() {
+        return count;
+    }
+
+    public void setCount(Long count) {
+        this.count = count;
+    }
+
+    public double getMax() {
+        return max;
+    }
+
+    public void setMax(double max) {
+        this.max = max;
+    }
+
+    public double getMin() {
+        return min;
+    }
+
+    public void setMin(double min) {
+        this.min = min;
+    }
+
+    public double getAvg() {
+        return avg;
+    }
+
+    public void setAvg(double avg) {
+        this.avg = avg;
+    }
+
+    public double getTotal() {
+        return total;
+    }
+
+    public void setTotal(double total) {
+        this.total = total;
+    }
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java
 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java
new file mode 100644
index 0000000000..082f563ebd
--- /dev/null
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.DoubleAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+
+public class TimeWindowAggregator {
+
+    private final SnapshotSlidingWindow slidingWindow;
+
+    public TimeWindowAggregator(int bucketNum, int timeWindowSeconds) {
+        this.slidingWindow = new SnapshotSlidingWindow(bucketNum, 
TimeUnit.SECONDS.toMillis(timeWindowSeconds));
+    }
+
+    public SnapshotSlidingWindow getSlidingWindow() {
+        return slidingWindow;
+    }
+
+    public void add(double value) {
+        SnapshotObservation sample = 
this.slidingWindow.currentPane().getValue();
+        sample.add(value);
+    }
+
+    public SampleAggregatedEntry get() {
+        SampleAggregatedEntry aggregatedEntry = new SampleAggregatedEntry();
+
+        double total = 0L;
+        long count = 0;
+        double max = Double.MIN_VALUE;
+        double min = Double.MAX_VALUE;
+
+        List<SnapshotObservation> windows = this.slidingWindow.values();
+
+        for (SnapshotObservation window : windows) {
+            total += window.getTotal();
+            count += window.getCount();
+
+            max = Math.max(max, window.getMax());
+            min = Math.min(min, window.getMin());
+        }
+
+        if (count > 0) {
+            double avg = total / count;
+            aggregatedEntry.setAvg(Math.round(avg * 100.0) / 100.0);
+        }else {
+            aggregatedEntry.setAvg(0);
+        }
+
+        aggregatedEntry.setMax(max == Double.MIN_VALUE ? 0 : max);
+        aggregatedEntry.setMin(min == Double.MAX_VALUE ? 0 : min);
+        aggregatedEntry.setTotal(total);
+        aggregatedEntry.setCount(count);
+
+        return aggregatedEntry;
+    }
+
+    public static class SnapshotSlidingWindow extends 
SlidingWindow<SnapshotObservation> {
+
+        public SnapshotSlidingWindow(int sampleCount, long intervalInMs) {
+            super(sampleCount, intervalInMs);
+        }
+
+        @Override
+        public SnapshotObservation newEmptyValue(long timeMillis) {
+            return new SnapshotObservation();
+        }
+
+        @Override
+        protected Pane<SnapshotObservation> resetPaneTo(final 
Pane<SnapshotObservation> pane, long startTime) {
+            pane.setStartInMs(startTime);
+            pane.getValue().reset();
+            return pane;
+        }
+    }
+
+    public static class SnapshotObservation {
+
+        private final AtomicReference<Double> min = new 
AtomicReference<>(Double.MAX_VALUE);
+        private final AtomicReference<Double> max = new AtomicReference<>(0d);
+        private final DoubleAccumulator total = new DoubleAccumulator((x, y) 
-> x + y, 0);
+        private final LongAdder count = new LongAdder();
+
+        public void add(double sample) {
+            total.accumulate(sample);
+            count.increment();
+            updateMin(sample);
+            updateMax(sample);
+        }
+
+        private void updateMin(double sample) {
+            Double curMin;
+            do {
+                curMin = min.get();
+            } while (sample < curMin && !min.compareAndSet(curMin, sample));
+        }
+
+        private void updateMax(double sample) {
+            Double curMax;
+            do {
+                curMax = max.get();
+                if (sample <= curMax) {
+                    return;
+                }
+
+            } while (!max.compareAndSet(curMax, sample));
+        }
+
+        public void reset() {
+            min.set(Double.MAX_VALUE);
+            max.set(0d);
+            count.reset();
+            total.reset();
+        }
+
+        public double getMin() {
+            return min.get();
+        }
+
+        public double getMax() {
+            return max.get();
+        }
+
+        public Double getTotal() {
+            return total.get();
+        }
+
+        public long getCount() {
+            return count.sum();
+        }
+    }
+
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
index 09cb988118..b4d7997a19 100644
--- 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/model/key/MetricsKey.java
@@ -57,7 +57,9 @@ public enum MetricsKey {
     METRIC_RT_P95("dubbo.%s.rt.milliseconds.p95", "Response Time P95"),
     METRIC_RT_P90("dubbo.%s.rt.milliseconds.p90", "Response Time P90"),
     METRIC_RT_P50("dubbo.%s.rt.milliseconds.p50", "Response Time P50"),
-
+    METRIC_RT_MIN_AGG("dubbo.%s.rt.min.milliseconds.aggregate", "Aggregated 
Min Response"),
+    METRIC_RT_MAX_AGG("dubbo.%s.rt.max.milliseconds.aggregate", "Aggregated 
Max Response"),
+    METRIC_RT_AVG_AGG("dubbo.%s.rt.avg.milliseconds.aggregate", "Aggregated 
Avg Response"),
 
     // register metrics key
     REGISTER_METRIC_REQUESTS("dubbo.registry.register.requests.total", "Total 
Register Requests"),
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java
 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java
new file mode 100644
index 0000000000..fc2d42c614
--- /dev/null
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowAggregatorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.concurrent.TimeUnit;
+
+public class TimeWindowAggregatorTest {
+    @Test
+    public void testTimeWindowAggregator() {
+        TimeWindowAggregator aggregator = new TimeWindowAggregator(5, 5);
+
+        // 第一个时间窗口,时间范围:0秒 - 5秒
+        aggregator.add(10);
+        aggregator.add(20);
+        aggregator.add(30);
+
+        SampleAggregatedEntry entry1 = aggregator.get();
+        Assertions.assertEquals(20, entry1.getAvg());
+        Assertions.assertEquals(60, entry1.getTotal());
+        Assertions.assertEquals(3, entry1.getCount());
+        Assertions.assertEquals(30, entry1.getMax());
+        Assertions.assertEquals(10, entry1.getMin());
+
+        // 第二个时间窗口,时间范围:5秒 - 10秒
+        try {
+            TimeUnit.SECONDS.sleep(5);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        aggregator.add(15);
+        aggregator.add(25);
+        aggregator.add(35);
+
+        SampleAggregatedEntry entry2 = aggregator.get();
+        Assertions.assertEquals(25, entry2.getAvg());
+        Assertions.assertEquals(75, entry2.getTotal());
+        Assertions.assertEquals(3, entry2.getCount());
+        Assertions.assertEquals(35, entry2.getMax());
+        Assertions.assertEquals(15, entry2.getMin());
+
+        // 第三个时间窗口,时间范围:10秒 - 15秒
+        try {
+            TimeUnit.SECONDS.sleep(5);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        aggregator.add(12);
+        aggregator.add(22);
+        aggregator.add(32);
+
+        SampleAggregatedEntry entry3 = aggregator.get();
+        Assertions.assertEquals(22, entry3.getAvg());
+        Assertions.assertEquals(66, entry3.getTotal());
+        Assertions.assertEquals(3, entry3.getCount());
+        Assertions.assertEquals(32, entry3.getMax());
+        Assertions.assertEquals(12, entry3.getMin());
+    }
+}
+
+
+
+
+
+
+
diff --git 
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
 
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
index 765f33e71d..171395c157 100644
--- 
a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
+++ 
b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollector.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.config.MetricsConfig;
 import org.apache.dubbo.config.context.ConfigManager;
 import org.apache.dubbo.config.nested.AggregationConfig;
 import org.apache.dubbo.metrics.MetricsConstants;
+import org.apache.dubbo.metrics.aggregate.TimeWindowAggregator;
 import org.apache.dubbo.metrics.aggregate.TimeWindowCounter;
 import org.apache.dubbo.metrics.aggregate.TimeWindowQuantile;
 import org.apache.dubbo.metrics.event.MetricsEvent;
@@ -65,6 +66,8 @@ public class AggregateMetricsCollector implements 
MetricsCollector<RequestEvent>
     private static final Integer DEFAULT_BUCKET_NUM = 10;
     private static final Integer DEFAULT_TIME_WINDOW_SECONDS = 120;
     private Boolean collectEnabled = null;
+    private final ConcurrentMap<MethodMetric, TimeWindowAggregator> rtAgr = 
new ConcurrentHashMap<>();
+
 
     public AggregateMetricsCollector(ApplicationModel applicationModel) {
         this.applicationModel = applicationModel;
@@ -136,6 +139,9 @@ public class AggregateMetricsCollector implements 
MetricsCollector<RequestEvent>
         long responseTime = event.getTimePair().calc();
         TimeWindowQuantile quantile = 
ConcurrentHashMapUtils.computeIfAbsent(rt, metric, k -> new 
TimeWindowQuantile(DEFAULT_COMPRESSION, bucketNum, timeWindowSeconds));
         quantile.add(responseTime);
+
+        TimeWindowAggregator timeWindowAggregator = 
ConcurrentHashMapUtils.computeIfAbsent(rtAgr, metric, methodMetric -> new 
TimeWindowAggregator(bucketNum, timeWindowSeconds));
+        timeWindowAggregator.add(responseTime);
     }
 
 
@@ -204,6 +210,17 @@ public class AggregateMetricsCollector implements 
MetricsCollector<RequestEvent>
             list.add(new 
GaugeMetricSample<>(MetricsKey.METRIC_RT_P50.getNameByType(k.getSide()),
                 MetricsKey.METRIC_RT_P50.getDescription(), k.getTags(), RT, v, 
value -> value.quantile(0.50)));
         });
+
+        rtAgr.forEach((k,v)->{
+            list.add(new 
GaugeMetricSample<>(MetricsKey.METRIC_RT_MIN_AGG.getNameByType(k.getSide()),
+                MetricsKey.METRIC_RT_MIN_AGG.getDescription(), k.getTags(), 
RT, v, value -> v.get().getMin()));
+
+            list.add(new 
GaugeMetricSample<>(MetricsKey.METRIC_RT_MAX_AGG.getNameByType(k.getSide()),
+                MetricsKey.METRIC_RT_MAX_AGG.getDescription(), k.getTags(), 
RT, v, value -> v.get().getMax()));
+
+            list.add(new 
GaugeMetricSample<>(MetricsKey.METRIC_RT_AVG_AGG.getNameByType(k.getSide()),
+                MetricsKey.METRIC_RT_AVG_AGG.getDescription(), k.getTags(), 
RT, v, value -> v.get().getAvg()));
+        });
     }
 
     private void registerListener() {
diff --git 
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
 
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
index 08362c1bf6..383dac76f9 100644
--- 
a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
+++ 
b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/collector/AggregateMetricsCollectorTest.java
@@ -232,6 +232,48 @@ class AggregateMetricsCollectorTest {
         Assertions.assertEquals(10000, ((TimeWindowCounter) 
((GaugeMetricSample<?>) sample).getValue()).get());
     }
 
+
+    @Test
+    public void testRtAggregation() {
+        metricsDispatcher.addListener(collector);
+        ConfigManager configManager = 
applicationModel.getApplicationConfigManager();
+        MetricsConfig config = configManager.getMetrics().orElse(null);
+        AggregationConfig aggregationConfig = new AggregationConfig();
+        aggregationConfig.setEnabled(true);
+        config.setAggregation(aggregationConfig);
+
+        List<Long> rtList = new ArrayList<>();
+        rtList.add(10L);
+        rtList.add(20L);
+        rtList.add(30L);
+
+        for (Long requestTime: rtList) {
+            RequestEvent requestEvent = 
RequestEvent.toRequestEvent(applicationModel, invocation);
+            TestRequestEvent testRequestEvent = new 
TestRequestEvent(requestEvent.getSource(), requestEvent.getTypeWrapper());
+            testRequestEvent.putAttachment(MetricsConstants.INVOCATION, 
invocation);
+            testRequestEvent.putAttachment(ATTACHMENT_KEY_SERVICE, 
MetricsSupport.getInterfaceName(invocation));
+            testRequestEvent.putAttachment(MetricsConstants.INVOCATION_SIDE, 
MetricsSupport.getSide(invocation));
+            testRequestEvent.setRt(requestTime);
+            MetricsEventBus.post(testRequestEvent, () -> null);
+        }
+
+        List<MetricSample> samples = collector.collect();
+        for (MetricSample sample : samples) {
+            GaugeMetricSample gaugeMetricSample = (GaugeMetricSample<?>) 
sample;
+
+            
if(gaugeMetricSample.getName().endsWith("max.milliseconds.aggregate")) {
+                Assertions.assertEquals(30,  
gaugeMetricSample.applyAsDouble());
+            }
+            if 
(gaugeMetricSample.getName().endsWith("min.milliseconds.aggregate")) {
+                Assertions.assertEquals(10L,  
gaugeMetricSample.applyAsDouble());
+            }
+
+            if 
(gaugeMetricSample.getName().endsWith("avg.milliseconds.aggregate")) {
+                Assertions.assertEquals(20L, 
gaugeMetricSample.applyAsDouble());
+            }
+        }
+    }
+
     @Test
     void testP95AndP99() throws InterruptedException {
 

Reply via email to