This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 6d6e3a5e9e Refactor metrics TimeWindow  (#11638)
6d6e3a5e9e is described below

commit 6d6e3a5e9e1caa712308ea25ed6b8fef04d15c92
Author: Mengyang Tang <[email protected]>
AuthorDate: Thu Mar 2 06:42:47 2023 +0800

    Refactor metrics TimeWindow  (#11638)
    
    * Add TimeUtils
    
    * Abstract sliding window
    
    * Refactor TimeWindow Counter and Quantile
    
    * Add license header
    
    * Fix sonar problem
    
    * Fix sonar problem
    
    * Fix sonar check bug
    
    * Start the ticker on demand
    
    * Fallback when ticker exception
---
 .../org/apache/dubbo/common/utils/TimeUtils.java   |  75 ++++++
 .../apache/dubbo/common/utils/TimeUtilsTest.java   |  21 +-
 .../org/apache/dubbo/metrics/aggregate/Pane.java   | 126 +++++++++
 .../dubbo/metrics/aggregate/SlidingWindow.java     | 300 +++++++++++++++++++++
 .../dubbo/metrics/aggregate/TimeWindowCounter.java |  81 +++---
 .../metrics/aggregate/TimeWindowQuantile.java      |  69 ++---
 .../apache/dubbo/metrics/aggregate/PaneTest.java   |  72 +++++
 .../dubbo/metrics/aggregate/SlidingWindowTest.java | 137 ++++++++++
 .../metrics/aggregate/TimeWindowCounterTest.java   |   4 +-
 .../metrics/aggregate/TimeWindowQuantileTest.java  |   4 +-
 10 files changed, 795 insertions(+), 94 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/TimeUtils.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/TimeUtils.java
new file mode 100644
index 0000000000..79974d1864
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/TimeUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.utils;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provide currentTimeMillis acquisition for high-frequency access scenarios.
+ */
+public final class TimeUtils {
+
+    private static volatile long currentTimeMillis;
+
+    private static volatile boolean isTickerAlive = false;
+
+    private static volatile boolean isFallback = false;
+
+    private TimeUtils() {
+    }
+
+    public static long currentTimeMillis() {
+        // When an exception occurs in the Ticker mechanism, fall back.
+        if (isFallback) {
+            return System.currentTimeMillis();
+        }
+
+        if (!isTickerAlive) {
+            try {
+                startTicker();
+            } catch (Exception e) {
+                isFallback = true;
+            }
+        }
+        return currentTimeMillis;
+    }
+
+    private static synchronized void startTicker() {
+        if (!isTickerAlive) {
+            currentTimeMillis = System.currentTimeMillis();
+            Thread ticker = new Thread(() -> {
+                while (isTickerAlive) {
+                    currentTimeMillis = System.currentTimeMillis();
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(1);
+                    } catch (InterruptedException e) {
+                        isTickerAlive = false;
+                        Thread.currentThread().interrupt();
+                    } catch (Exception ignored) {
+                        //
+                    }
+                }
+            });
+            ticker.setDaemon(true);
+            ticker.setName("time-millis-ticker-thread");
+            ticker.start();
+            Runtime.getRuntime().addShutdownHook(new 
Thread(ticker::interrupt));
+            isTickerAlive = true;
+        }
+    }
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
 b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/TimeUtilsTest.java
similarity index 61%
copy from 
dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
copy to 
dubbo-common/src/test/java/org/apache/dubbo/common/utils/TimeUtilsTest.java
index 7e1d416652..c51e8786a7 100644
--- 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/TimeUtilsTest.java
@@ -15,23 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.dubbo.metrics.aggregate;
+package org.apache.dubbo.common.utils;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-class TimeWindowQuantileTest {
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-    @Test
-    void test() throws Exception {
-        TimeWindowQuantile quantile = new TimeWindowQuantile(100, 12, 1);
-        for (int i = 1; i <= 100; i++) {
-            quantile.add(i);
-        }
+class TimeUtilsTest {
 
-        Assertions.assertEquals(quantile.quantile(0.01), 2);
-        Assertions.assertEquals(quantile.quantile(0.99), 100);
-        Thread.sleep(1000);
-        Assertions.assertEquals(quantile.quantile(0.99), Double.NaN);
+    @Test
+    void testCurrentTimeMillis() {
+        assertTrue(0 < TimeUtils.currentTimeMillis());
     }
-}
\ No newline at end of file
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/Pane.java
 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/Pane.java
new file mode 100755
index 0000000000..af7908d026
--- /dev/null
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/Pane.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+/**
+ * The pane represents a window over a period of time.
+ *
+ * @param <T> The type of value the pane statistics.
+ */
+public class Pane<T> {
+
+    /**
+     * Time interval of the pane in milliseconds.
+     */
+    private final long intervalInMs;
+
+    /**
+     * Start timestamp of the pane in milliseconds.
+     */
+    private volatile long startInMs;
+
+    /**
+     * End timestamp of the pane in milliseconds.
+     * <p>
+     * endInMs = startInMs + intervalInMs
+     */
+    private volatile long endInMs;
+
+    /**
+     * Pane statistics value.
+     */
+    private T value;
+
+    /**
+     * @param intervalInMs interval of the pane in milliseconds.
+     * @param startInMs    start timestamp of the pane in milliseconds.
+     * @param value        the pane value.
+     */
+    public Pane(long intervalInMs, long startInMs, T value) {
+        this.intervalInMs = intervalInMs;
+        this.startInMs = startInMs;
+        this.endInMs = this.startInMs + this.intervalInMs;
+        this.value = value;
+    }
+
+    /**
+     * Get the interval of the pane in milliseconds.
+     *
+     * @return the interval of the pane in milliseconds.
+     */
+    public long getIntervalInMs() {
+        return this.intervalInMs;
+    }
+
+    /**
+     * Get start timestamp of the pane in milliseconds.
+     *
+     * @return the start timestamp of the pane in milliseconds.
+     */
+    public long getStartInMs() {
+        return this.startInMs;
+    }
+
+    /**
+     * Get end timestamp of the pane in milliseconds.
+     *
+     * @return the end timestamp of the pane in milliseconds.
+     */
+    public long getEndInMs() {
+        return this.endInMs;
+    }
+
+    /**
+     * Get the pane statistics value.
+     *
+     * @return the pane statistics value.
+     */
+    public T getValue() {
+        return this.value;
+    }
+
+    /**
+     * Set the new start timestamp to the pane, for reset the instance.
+     *
+     * @param newStartInMs the new start timestamp.
+     */
+    public void setStartInMs(long newStartInMs) {
+        this.startInMs = newStartInMs;
+        this.endInMs = this.startInMs + this.intervalInMs;
+    }
+
+    /**
+     * Set new value to the pane, for reset the instance.
+     *
+     * @param newData the new value.
+     */
+    public void setValue(T newData) {
+        this.value = newData;
+    }
+
+    /**
+     * Check whether given timestamp is in current pane.
+     *
+     * @param timeMillis timestamp in milliseconds.
+     * @return true if the given time is in current pane, otherwise false
+     */
+    public boolean isTimeInWindow(long timeMillis) {
+        // [)
+        return startInMs <= timeMillis && timeMillis < endInMs;
+    }
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SlidingWindow.java
 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SlidingWindow.java
new file mode 100755
index 0000000000..de1fd09583
--- /dev/null
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SlidingWindow.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.apache.dubbo.common.utils.Assert;
+import org.apache.dubbo.common.utils.TimeUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * SlidingWindow adopts sliding window algorithm for statistics.
+ * <p>
+ * A window contains {@code paneCount} panes,
+ * {@code intervalInMs} = {@code paneCount} * {@code paneIntervalInMs}
+ *
+ * @param <T> Value type for window statistics.
+ */
+public abstract class SlidingWindow<T> {
+
+    /**
+     * The number of panes the sliding window contains.
+     */
+    protected int paneCount;
+
+    /**
+     * Total time interval of the sliding window in milliseconds.
+     */
+    protected long intervalInMs;
+
+    /**
+     * Time interval of a pane in milliseconds.
+     */
+    protected long paneIntervalInMs;
+
+    /**
+     * The panes reference, supports atomic operations.
+     */
+    protected final AtomicReferenceArray<Pane<T>> referenceArray;
+
+    /**
+     * The lock is used only when current pane is deprecated.
+     */
+    private final ReentrantLock updateLock = new ReentrantLock();
+
+    protected SlidingWindow(int paneCount, long intervalInMs) {
+        Assert.assertTrue(paneCount > 0, "pane count is invalid: " + 
paneCount);
+        Assert.assertTrue(intervalInMs > 0, "total time interval of the 
sliding window should be positive");
+        Assert.assertTrue(intervalInMs % paneCount == 0, "total time interval 
needs to be evenly divided");
+
+        this.paneCount = paneCount;
+        this.intervalInMs = intervalInMs;
+        this.paneIntervalInMs = intervalInMs / paneCount;
+        this.referenceArray = new AtomicReferenceArray<>(paneCount);
+    }
+
+    /**
+     * Get the pane at the current timestamp.
+     *
+     * @return the pane at current timestamp.
+     */
+    public Pane<T> currentPane() {
+        return currentPane(TimeUtils.currentTimeMillis());
+    }
+
+    /**
+     * Get the pane at the specified timestamp in milliseconds.
+     *
+     * @param timeMillis a timestamp in milliseconds.
+     * @return the pane at the specified timestamp if the time is valid; null 
if time is invalid.
+     */
+    public Pane<T> currentPane(long timeMillis) {
+        if (timeMillis < 0) {
+            return null;
+        }
+
+        int paneIdx = calculatePaneIdx(timeMillis);
+        long paneStartInMs = calculatePaneStart(timeMillis);
+
+        while (true) {
+            Pane<T> oldPane = referenceArray.get(paneIdx);
+
+            // Create a pane instance when the pane does not exist.
+            if (oldPane == null) {
+                Pane<T> pane = new Pane<>(paneIntervalInMs, paneStartInMs, 
newEmptyValue(timeMillis));
+                if (referenceArray.compareAndSet(paneIdx, null, pane)) {
+                    return pane;
+                } else {
+                    // Contention failed, the thread will yield its time slice 
to wait for pane available.
+                    Thread.yield();
+                }
+            }
+            //
+            else if (paneStartInMs == oldPane.getStartInMs()) {
+                return oldPane;
+            }
+            // The pane has deprecated. To avoid the overhead of creating a 
new instance, reset the original pane directly.
+            else if (paneStartInMs > oldPane.getStartInMs()) {
+                if (updateLock.tryLock()) {
+                    try {
+                        return resetPaneTo(oldPane, paneStartInMs);
+                    } finally {
+                        updateLock.unlock();
+                    }
+                } else {
+                    // Contention failed, the thread will yield its time slice 
to wait for pane available.
+                    Thread.yield();
+                }
+            }
+            // The specified timestamp has passed.
+            else if (paneStartInMs < oldPane.getStartInMs()) {
+                return new Pane<>(paneIntervalInMs, paneStartInMs, 
newEmptyValue(timeMillis));
+            }
+        }
+    }
+
+    /**
+     * Get statistic value from pane at the specified timestamp.
+     *
+     * @param timeMillis the specified timestamp in milliseconds.
+     * @return the statistic value if pane at the specified timestamp is 
up-to-date; otherwise null.
+     */
+    public T getPaneValue(long timeMillis) {
+        if (timeMillis < 0) {
+            return null;
+        }
+
+        int paneIdx = calculatePaneIdx(timeMillis);
+
+        Pane<T> pane = referenceArray.get(paneIdx);
+
+        if (pane == null || !pane.isTimeInWindow(timeMillis)) {
+            return null;
+        }
+
+        return pane.getValue();
+    }
+
+    /**
+     * Create a new statistic value for pane.
+     *
+     * @param timeMillis the specified timestamp in milliseconds.
+     * @return new empty statistic value.
+     */
+    public abstract T newEmptyValue(long timeMillis);
+
+    /**
+     * Reset given pane to the specified start time and reset the value.
+     *
+     * @param pane      the given pane.
+     * @param startInMs the start timestamp of the pane in milliseconds.
+     * @return reset pane.
+     */
+    protected abstract Pane<T> resetPaneTo(final Pane<T> pane, long startInMs);
+
+    /**
+     * Calculate the pane index corresponding to the specified timestamp.
+     *
+     * @param timeMillis the specified timestamp.
+     * @return the pane index corresponding to the specified timestamp.
+     */
+    private int calculatePaneIdx(long timeMillis) {
+        return (int) ((timeMillis / paneIntervalInMs) % paneCount);
+    }
+
+    /**
+     * Calculate the pane start corresponding to the specified timestamp.
+     *
+     * @param timeMillis the specified timestamp.
+     * @return the pane start corresponding to the specified timestamp.
+     */
+    protected long calculatePaneStart(long timeMillis) {
+        return timeMillis - timeMillis % paneIntervalInMs;
+    }
+
+    /**
+     * Checks if the specified pane is deprecated at the current timestamp.
+     *
+     * @param pane the specified pane.
+     * @return true if the pane is deprecated; otherwise false.
+     */
+    public boolean isPaneDeprecated(final Pane<T> pane) {
+        return isPaneDeprecated(TimeUtils.currentTimeMillis(), pane);
+    }
+
+    /**
+     * Checks if the specified pane is deprecated at the specified timestamp.
+     *
+     * @param timeMillis the specified time.
+     * @param pane       the specified pane.
+     * @return true if the pane is deprecated; otherwise false.
+     */
+    public boolean isPaneDeprecated(long timeMillis, final Pane<T> pane) {
+        // the pane is '[)'
+        return (timeMillis - pane.getStartInMs()) > intervalInMs;
+    }
+
+    /**
+     * Get valid pane list for entire sliding window at the current time.
+     * The list will only contain "valid" panes.
+     *
+     * @return valid pane list for entire sliding window.
+     */
+    public List<Pane<T>> list() {
+        return list(TimeUtils.currentTimeMillis());
+    }
+
+    /**
+     * Get valid pane list for entire sliding window at the specified time.
+     * The list will only contain "valid" panes.
+     *
+     * @param timeMillis the specified time.
+     * @return valid pane list for entire sliding window.
+     */
+    public List<Pane<T>> list(long timeMillis) {
+        if (timeMillis < 0) {
+            return new ArrayList<>();
+        }
+
+        List<Pane<T>> result = new ArrayList<>(paneCount);
+
+        for (int idx = 0; idx < paneCount; idx++) {
+            Pane<T> pane = referenceArray.get(idx);
+            if (pane == null || isPaneDeprecated(timeMillis, pane)) {
+                continue;
+            }
+            result.add(pane);
+        }
+
+        return result;
+    }
+
+    /**
+     * Get aggregated value list for entire sliding window at the current time.
+     * The list will only contain value from "valid" panes.
+     *
+     * @return aggregated value list for entire sliding window.
+     */
+    public List<T> values() {
+        return values(TimeUtils.currentTimeMillis());
+    }
+
+    /**
+     * Get aggregated value list for entire sliding window at the specified 
time.
+     * The list will only contain value from "valid" panes.
+     *
+     * @return aggregated value list for entire sliding window.
+     */
+    public List<T> values(long timeMillis) {
+        if (timeMillis < 0) {
+            return new ArrayList<>();
+        }
+
+        List<T> result = new ArrayList<>(paneCount);
+
+        for (int idx = 0; idx < paneCount; idx++) {
+            Pane<T> pane = referenceArray.get(idx);
+            if (pane == null || isPaneDeprecated(timeMillis, pane)) {
+                continue;
+            }
+            result.add(pane.getValue());
+        }
+        return result;
+    }
+
+    /**
+     * Get total interval of the sliding window in milliseconds.
+     *
+     * @return the total interval in milliseconds.
+     */
+    public long getIntervalInMs() {
+        return intervalInMs;
+    }
+
+    /**
+     * Get pane interval of the sliding window in milliseconds.
+     *
+     * @return the interval of a pane in milliseconds.
+     */
+    public long getPaneIntervalInMs() {
+        return paneIntervalInMs;
+    }
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
index 3dc3666602..ee15210066 100644
--- 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
@@ -17,74 +17,69 @@
 
 package org.apache.dubbo.metrics.aggregate;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 
 /**
  * Wrapper around Counter like Long and Integer.
- * <p>
- * Maintains a ring buffer of Counter to provide count over a sliding windows 
of time.
  */
 public class TimeWindowCounter {
-    private final Long[] ringBuffer;
-    private final Long[] bucketStartTimeMillis;
-    private int currentBucket;
-    private long lastRotateTimestampMillis;
-    private final long durationBetweenRotatesMillis;
 
-    public TimeWindowCounter(int bucketNum, int timeWindowSeconds) {
-        this.ringBuffer = new Long[bucketNum];
-        this.bucketStartTimeMillis = new Long[bucketNum];
-        for (int i = 0; i < bucketNum; i++) {
-            this.ringBuffer[i] = 0L;
-            this.bucketStartTimeMillis[i] = System.currentTimeMillis();
-        }
+    private final LongAdderSlidingWindow slidingWindow;
 
-        this.currentBucket = 0;
-        this.lastRotateTimestampMillis = System.currentTimeMillis();
-        this.durationBetweenRotatesMillis = 
TimeUnit.SECONDS.toMillis(timeWindowSeconds) / bucketNum;
+    public TimeWindowCounter(int bucketNum, int timeWindowSeconds) {
+        this.slidingWindow = new LongAdderSlidingWindow(bucketNum, 
TimeUnit.SECONDS.toMillis(timeWindowSeconds));
     }
 
-    public synchronized double get() {
-        return rotate();
+    public double get() {
+        double result = 0.0;
+        List<LongAdder> windows = this.slidingWindow.values();
+        for (LongAdder window : windows) {
+            result += window.sum();
+        }
+        return result;
     }
 
     public long bucketLivedSeconds() {
-        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
bucketStartTimeMillis[currentBucket]);
+        return 
TimeUnit.MILLISECONDS.toSeconds(this.slidingWindow.values().size() * 
this.slidingWindow.getPaneIntervalInMs());
     }
 
-    public synchronized void increment() {
+    public void increment() {
         this.increment(1L);
     }
 
-    public synchronized void increment(Long step) {
-        rotate();
-        for (int i = 0; i < ringBuffer.length; i++) {
-            ringBuffer[i] = ringBuffer[i] + step;
-        }
+    public void increment(Long step) {
+        this.slidingWindow.currentPane().getValue().add(step);
     }
 
-    public synchronized void decrement() {
+    public void decrement() {
         this.decrement(1L);
     }
 
-    public synchronized void decrement(Long step) {
-        rotate();
-        for (int i = 0; i < ringBuffer.length; i++) {
-            ringBuffer[i] = ringBuffer[i] - step;
-        }
+    public void decrement(Long step) {
+        this.slidingWindow.currentPane().getValue().add(-step);
     }
 
-    private Long rotate() {
-        long timeSinceLastRotateMillis = System.currentTimeMillis() - 
lastRotateTimestampMillis;
-        while (timeSinceLastRotateMillis > durationBetweenRotatesMillis) {
-            ringBuffer[currentBucket] = 0L;
-            bucketStartTimeMillis[currentBucket] = lastRotateTimestampMillis + 
durationBetweenRotatesMillis;
-            if (++currentBucket >= ringBuffer.length) {
-                currentBucket = 0;
-            }
-            timeSinceLastRotateMillis -= durationBetweenRotatesMillis;
-            lastRotateTimestampMillis += durationBetweenRotatesMillis;
+    /**
+     * Sliding window of type LongAdder.
+     */
+    private static class LongAdderSlidingWindow extends 
SlidingWindow<LongAdder> {
+
+        public LongAdderSlidingWindow(int sampleCount, long intervalInMs) {
+            super(sampleCount, intervalInMs);
+        }
+
+        @Override
+        public LongAdder newEmptyValue(long timeMillis) {
+            return new LongAdder();
+        }
+
+        @Override
+        protected Pane<LongAdder> resetPaneTo(final Pane<LongAdder> pane, long 
startTime) {
+            pane.setStartInMs(startTime);
+            pane.getValue().reset();
+            return pane;
         }
-        return ringBuffer[currentBucket];
     }
 }
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
index 86249c88a2..823024344d 100644
--- 
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
@@ -19,57 +19,60 @@ package org.apache.dubbo.metrics.aggregate;
 
 import com.tdunning.math.stats.TDigest;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Wrapper around TDigest.
- * <p>
- * Maintains a ring buffer of TDigest to provide quantiles over a sliding 
windows of time.
  */
 public class TimeWindowQuantile {
+
     private final double compression;
-    private final TDigest[] ringBuffer;
-    private int currentBucket;
-    private long lastRotateTimestampMillis;
-    private final long durationBetweenRotatesMillis;
+
+    private final DigestSlidingWindow slidingWindow;
 
     public TimeWindowQuantile(double compression, int bucketNum, int 
timeWindowSeconds) {
         this.compression = compression;
-        this.ringBuffer = new TDigest[bucketNum];
-        for (int i = 0; i < bucketNum; i++) {
-            this.ringBuffer[i] = TDigest.createDigest(compression);
-        }
-
-        this.currentBucket = 0;
-        this.lastRotateTimestampMillis = System.currentTimeMillis();
-        this.durationBetweenRotatesMillis = 
TimeUnit.SECONDS.toMillis(timeWindowSeconds) / bucketNum;
+        this.slidingWindow = new DigestSlidingWindow(compression, bucketNum, 
TimeUnit.SECONDS.toMillis(timeWindowSeconds));
     }
 
-    public synchronized double quantile(double q) {
-        TDigest currentBucket = rotate();
-
+    public double quantile(double q) {
+        TDigest mergeDigest = TDigest.createDigest(compression);
+        List<TDigest> validWindows = this.slidingWindow.values();
+        for (TDigest window : validWindows) {
+            mergeDigest.add(window);
+        }
         // This may return Double.NaN, and it's correct behavior.
         // see: https://github.com/prometheus/client_golang/issues/85
-        return currentBucket.quantile(q);
+        return mergeDigest.quantile(q);
     }
 
-    public synchronized void add(double value) {
-        rotate();
-        for (TDigest bucket : ringBuffer) {
-            bucket.add(value);
-        }
+    public void add(double value) {
+        this.slidingWindow.currentPane().getValue().add(value);
     }
 
-    private TDigest rotate() {
-        long timeSinceLastRotateMillis = System.currentTimeMillis() - 
lastRotateTimestampMillis;
-        while (timeSinceLastRotateMillis > durationBetweenRotatesMillis) {
-            ringBuffer[currentBucket] = TDigest.createDigest(compression);
-            if (++currentBucket >= ringBuffer.length) {
-                currentBucket = 0;
-            }
-            timeSinceLastRotateMillis -= durationBetweenRotatesMillis;
-            lastRotateTimestampMillis += durationBetweenRotatesMillis;
+    /**
+     * Sliding window of type TDigest.
+     */
+    private static class DigestSlidingWindow extends SlidingWindow<TDigest> {
+
+        private final double compression;
+
+        public DigestSlidingWindow(double compression, int sampleCount, long 
intervalInMs) {
+            super(sampleCount, intervalInMs);
+            this.compression = compression;
+        }
+
+        @Override
+        public TDigest newEmptyValue(long timeMillis) {
+            return TDigest.createDigest(compression);
+        }
+
+        @Override
+        protected Pane<TDigest> resetPaneTo(final Pane<TDigest> pane, long 
startTime) {
+            pane.setStartInMs(startTime);
+            pane.setValue(TDigest.createDigest(compression));
+            return pane;
         }
-        return ringBuffer[currentBucket];
     }
 }
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/PaneTest.java
 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/PaneTest.java
new file mode 100644
index 0000000000..bbce54ec84
--- /dev/null
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/PaneTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class PaneTest {
+
+    @Test
+    void testIntervalInMs() {
+        Pane<?> pane = mock(Pane.class);
+        when(pane.getIntervalInMs()).thenReturn(100L);
+        assertEquals(100L, pane.getIntervalInMs());
+    }
+
+    @Test
+    void testStartInMs() {
+        Pane<?> pane = mock(Pane.class);
+        long startTime = System.currentTimeMillis();
+        when(pane.getStartInMs()).thenReturn(startTime);
+        assertEquals(startTime, pane.getStartInMs());
+    }
+
+    @Test
+    void testEndInMs() {
+        long startTime = System.currentTimeMillis();
+        Pane<?> pane = new Pane<>(10, startTime, new Object());
+        assertEquals(startTime + 10, pane.getEndInMs());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testValue() {
+        Pane<LongAdder> pane = mock(Pane.class);
+        LongAdder count = new LongAdder();
+        when(pane.getValue()).thenReturn(count);
+        assertEquals(count, pane.getValue());
+        when(pane.getValue()).thenReturn(null);
+        assertNotEquals(count, pane.getValue());
+    }
+
+    @Test
+    void testIsTimeInWindow() {
+        Pane<?> pane = new Pane<>(10, System.currentTimeMillis(), new 
Object());
+        assertTrue(pane.isTimeInWindow(System.currentTimeMillis()));
+        assertFalse(pane.isTimeInWindow(System.currentTimeMillis() + 10));
+    }
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/SlidingWindowTest.java
 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/SlidingWindowTest.java
new file mode 100644
index 0000000000..58b74b604f
--- /dev/null
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/SlidingWindowTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SlidingWindowTest {
+
+    static final int paneCount = 10;
+
+    static final long intervalInMs = 2000;
+
+    TestSlidingWindow window;
+
+    @BeforeEach
+    void setup() {
+        window = new TestSlidingWindow(paneCount, intervalInMs);
+    }
+
+    @Test
+    void testCurrentPane() {
+        assertNull(window.currentPane(/* invalid time*/-1L));
+        long timeInMs = System.currentTimeMillis();
+        Pane<LongAdder> currentPane = window.currentPane(timeInMs);
+        assertNotNull(currentPane);
+        // reuse test
+        assertEquals(currentPane,
+            window.currentPane(1 + timeInMs + window.getPaneIntervalInMs() * 
paneCount));
+    }
+
+    @Test
+    void testGetPaneData() {
+        assertNull(window.getPaneValue(/* invalid time*/-1L));
+        window.currentPane();
+        assertNotNull(window.getPaneValue(System.currentTimeMillis()));
+        assertNull(window.getPaneValue(System.currentTimeMillis() + 
window.getPaneIntervalInMs()));
+    }
+
+    @Test
+    void testNewEmptyValue() {
+        assertEquals(0L, 
window.newEmptyValue(System.currentTimeMillis()).sum());
+    }
+
+    @Test
+    void testResetPaneTo() {
+        Pane<LongAdder> currentPane = window.currentPane();
+        currentPane.getValue().add(2);
+        currentPane.getValue().add(1);
+        assertEquals(3, currentPane.getValue().sum());
+        window.resetPaneTo(currentPane, System.currentTimeMillis());
+        assertEquals(0, currentPane.getValue().sum());
+        currentPane.getValue().add(1);
+        assertEquals(1, currentPane.getValue().sum());
+    }
+
+    @Test
+    void testCalculatePaneStart() {
+        long time = System.currentTimeMillis();
+        assertTrue(window.calculatePaneStart(time) <= time);
+        assertTrue(time < window.calculatePaneStart(time) + 
window.getPaneIntervalInMs());
+    }
+
+    @Test
+    void testIsPaneDeprecated() {
+        Pane<LongAdder> currentPane = window.currentPane();
+        currentPane.setStartInMs(1000000L);
+        assertTrue(window.isPaneDeprecated(currentPane));
+    }
+
+    @Test
+    void testList() {
+        window.currentPane();
+        assertTrue(0 < window.list().size());
+    }
+
+    @Test
+    void testValues() {
+        window.currentPane().getValue().add(10);
+        long sum = 0;
+        for (LongAdder value : window.values()) {
+            sum += value.sum();
+        }
+        assertEquals(10, sum);
+    }
+
+    @Test
+    void testGetIntervalInMs() {
+        assertEquals(intervalInMs, window.getIntervalInMs());
+    }
+
+    @Test
+    void testGetPaneIntervalInMs() {
+        assertEquals(intervalInMs / paneCount, window.getPaneIntervalInMs());
+    }
+
+    private static class TestSlidingWindow extends SlidingWindow<LongAdder> {
+
+        public TestSlidingWindow(int paneCount, long intervalInMs) {
+            super(paneCount, intervalInMs);
+        }
+
+        @Override
+        public LongAdder newEmptyValue(long timeMillis) {
+            return new LongAdder();
+        }
+
+        @Override
+        protected Pane<LongAdder> resetPaneTo(Pane<LongAdder> pane, long 
startInMs) {
+            pane.setStartInMs(startInMs);
+            pane.getValue().reset();
+            return pane;
+        }
+    }
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
index 4ff6758020..6cac0d60d8 100644
--- 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
@@ -24,7 +24,7 @@ class TimeWindowCounterTest {
 
     @Test
     void test() throws Exception {
-        TimeWindowCounter counter = new TimeWindowCounter(12, 1);
+        TimeWindowCounter counter = new TimeWindowCounter(10, 1);
         counter.increment();
         Assertions.assertEquals(counter.get(), 1);
         counter.decrement();
@@ -34,4 +34,4 @@ class TimeWindowCounterTest {
         Assertions.assertEquals(counter.get(), 0);
         Assertions.assertTrue(counter.bucketLivedSeconds() < 1);
     }
-}
\ No newline at end of file
+}
diff --git 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
index 7e1d416652..36ee690b01 100644
--- 
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
+++ 
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
@@ -24,7 +24,7 @@ class TimeWindowQuantileTest {
 
     @Test
     void test() throws Exception {
-        TimeWindowQuantile quantile = new TimeWindowQuantile(100, 12, 1);
+        TimeWindowQuantile quantile = new TimeWindowQuantile(100, 10, 1);
         for (int i = 1; i <= 100; i++) {
             quantile.add(i);
         }
@@ -34,4 +34,4 @@ class TimeWindowQuantileTest {
         Thread.sleep(1000);
         Assertions.assertEquals(quantile.quantile(0.99), Double.NaN);
     }
-}
\ No newline at end of file
+}


Reply via email to