This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 6d6e3a5e9e Refactor metrics TimeWindow (#11638)
6d6e3a5e9e is described below
commit 6d6e3a5e9e1caa712308ea25ed6b8fef04d15c92
Author: Mengyang Tang <[email protected]>
AuthorDate: Thu Mar 2 06:42:47 2023 +0800
Refactor metrics TimeWindow (#11638)
* Add TimeUtils
* Abstract sliding window
* Refactor TimeWindow Counter and Quantile
* Add license header
* Fix sonar problem
* Fix sonar problem
* Fix sonar check bug
* Start the ticker on demand
* Fallback when ticker exception
---
.../org/apache/dubbo/common/utils/TimeUtils.java | 75 ++++++
.../apache/dubbo/common/utils/TimeUtilsTest.java | 21 +-
.../org/apache/dubbo/metrics/aggregate/Pane.java | 126 +++++++++
.../dubbo/metrics/aggregate/SlidingWindow.java | 300 +++++++++++++++++++++
.../dubbo/metrics/aggregate/TimeWindowCounter.java | 81 +++---
.../metrics/aggregate/TimeWindowQuantile.java | 69 ++---
.../apache/dubbo/metrics/aggregate/PaneTest.java | 72 +++++
.../dubbo/metrics/aggregate/SlidingWindowTest.java | 137 ++++++++++
.../metrics/aggregate/TimeWindowCounterTest.java | 4 +-
.../metrics/aggregate/TimeWindowQuantileTest.java | 4 +-
10 files changed, 795 insertions(+), 94 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/TimeUtils.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/TimeUtils.java
new file mode 100644
index 0000000000..79974d1864
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/TimeUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.utils;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provide currentTimeMillis acquisition for high-frequency access scenarios.
+ */
+public final class TimeUtils {
+
+ private static volatile long currentTimeMillis;
+
+ private static volatile boolean isTickerAlive = false;
+
+ private static volatile boolean isFallback = false;
+
+ private TimeUtils() {
+ }
+
+ public static long currentTimeMillis() {
+ // When an exception occurs in the Ticker mechanism, fall back.
+ if (isFallback) {
+ return System.currentTimeMillis();
+ }
+
+ if (!isTickerAlive) {
+ try {
+ startTicker();
+ } catch (Exception e) {
+ isFallback = true;
+ }
+ }
+ return currentTimeMillis;
+ }
+
+ private static synchronized void startTicker() {
+ if (!isTickerAlive) {
+ currentTimeMillis = System.currentTimeMillis();
+ Thread ticker = new Thread(() -> {
+ while (isTickerAlive) {
+ currentTimeMillis = System.currentTimeMillis();
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ isTickerAlive = false;
+ Thread.currentThread().interrupt();
+ } catch (Exception ignored) {
+ //
+ }
+ }
+ });
+ ticker.setDaemon(true);
+ ticker.setName("time-millis-ticker-thread");
+ ticker.start();
+ Runtime.getRuntime().addShutdownHook(new
Thread(ticker::interrupt));
+ isTickerAlive = true;
+ }
+ }
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/TimeUtilsTest.java
similarity index 61%
copy from
dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
copy to
dubbo-common/src/test/java/org/apache/dubbo/common/utils/TimeUtilsTest.java
index 7e1d416652..c51e8786a7 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/TimeUtilsTest.java
@@ -15,23 +15,16 @@
* limitations under the License.
*/
-package org.apache.dubbo.metrics.aggregate;
+package org.apache.dubbo.common.utils;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-class TimeWindowQuantileTest {
+import static org.junit.jupiter.api.Assertions.assertTrue;
- @Test
- void test() throws Exception {
- TimeWindowQuantile quantile = new TimeWindowQuantile(100, 12, 1);
- for (int i = 1; i <= 100; i++) {
- quantile.add(i);
- }
+class TimeUtilsTest {
- Assertions.assertEquals(quantile.quantile(0.01), 2);
- Assertions.assertEquals(quantile.quantile(0.99), 100);
- Thread.sleep(1000);
- Assertions.assertEquals(quantile.quantile(0.99), Double.NaN);
+ @Test
+ void testCurrentTimeMillis() {
+ assertTrue(0 < TimeUtils.currentTimeMillis());
}
-}
\ No newline at end of file
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/Pane.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/Pane.java
new file mode 100755
index 0000000000..af7908d026
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/Pane.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+/**
+ * The pane represents a window over a period of time.
+ *
+ * @param <T> The type of value the pane statistics.
+ */
+public class Pane<T> {
+
+ /**
+ * Time interval of the pane in milliseconds.
+ */
+ private final long intervalInMs;
+
+ /**
+ * Start timestamp of the pane in milliseconds.
+ */
+ private volatile long startInMs;
+
+ /**
+ * End timestamp of the pane in milliseconds.
+ * <p>
+ * endInMs = startInMs + intervalInMs
+ */
+ private volatile long endInMs;
+
+ /**
+ * Pane statistics value.
+ */
+ private T value;
+
+ /**
+ * @param intervalInMs interval of the pane in milliseconds.
+ * @param startInMs start timestamp of the pane in milliseconds.
+ * @param value the pane value.
+ */
+ public Pane(long intervalInMs, long startInMs, T value) {
+ this.intervalInMs = intervalInMs;
+ this.startInMs = startInMs;
+ this.endInMs = this.startInMs + this.intervalInMs;
+ this.value = value;
+ }
+
+ /**
+ * Get the interval of the pane in milliseconds.
+ *
+ * @return the interval of the pane in milliseconds.
+ */
+ public long getIntervalInMs() {
+ return this.intervalInMs;
+ }
+
+ /**
+ * Get start timestamp of the pane in milliseconds.
+ *
+ * @return the start timestamp of the pane in milliseconds.
+ */
+ public long getStartInMs() {
+ return this.startInMs;
+ }
+
+ /**
+ * Get end timestamp of the pane in milliseconds.
+ *
+ * @return the end timestamp of the pane in milliseconds.
+ */
+ public long getEndInMs() {
+ return this.endInMs;
+ }
+
+ /**
+ * Get the pane statistics value.
+ *
+ * @return the pane statistics value.
+ */
+ public T getValue() {
+ return this.value;
+ }
+
+ /**
+ * Set the new start timestamp to the pane, for reset the instance.
+ *
+ * @param newStartInMs the new start timestamp.
+ */
+ public void setStartInMs(long newStartInMs) {
+ this.startInMs = newStartInMs;
+ this.endInMs = this.startInMs + this.intervalInMs;
+ }
+
+ /**
+ * Set new value to the pane, for reset the instance.
+ *
+ * @param newData the new value.
+ */
+ public void setValue(T newData) {
+ this.value = newData;
+ }
+
+ /**
+ * Check whether given timestamp is in current pane.
+ *
+ * @param timeMillis timestamp in milliseconds.
+ * @return true if the given time is in current pane, otherwise false
+ */
+ public boolean isTimeInWindow(long timeMillis) {
+ // [)
+ return startInMs <= timeMillis && timeMillis < endInMs;
+ }
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SlidingWindow.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SlidingWindow.java
new file mode 100755
index 0000000000..de1fd09583
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/SlidingWindow.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.apache.dubbo.common.utils.Assert;
+import org.apache.dubbo.common.utils.TimeUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * SlidingWindow adopts sliding window algorithm for statistics.
+ * <p>
+ * A window contains {@code paneCount} panes,
+ * {@code intervalInMs} = {@code paneCount} * {@code paneIntervalInMs}
+ *
+ * @param <T> Value type for window statistics.
+ */
+public abstract class SlidingWindow<T> {
+
+ /**
+ * The number of panes the sliding window contains.
+ */
+ protected int paneCount;
+
+ /**
+ * Total time interval of the sliding window in milliseconds.
+ */
+ protected long intervalInMs;
+
+ /**
+ * Time interval of a pane in milliseconds.
+ */
+ protected long paneIntervalInMs;
+
+ /**
+ * The panes reference, supports atomic operations.
+ */
+ protected final AtomicReferenceArray<Pane<T>> referenceArray;
+
+ /**
+ * The lock is used only when current pane is deprecated.
+ */
+ private final ReentrantLock updateLock = new ReentrantLock();
+
+ protected SlidingWindow(int paneCount, long intervalInMs) {
+ Assert.assertTrue(paneCount > 0, "pane count is invalid: " +
paneCount);
+ Assert.assertTrue(intervalInMs > 0, "total time interval of the
sliding window should be positive");
+ Assert.assertTrue(intervalInMs % paneCount == 0, "total time interval
needs to be evenly divided");
+
+ this.paneCount = paneCount;
+ this.intervalInMs = intervalInMs;
+ this.paneIntervalInMs = intervalInMs / paneCount;
+ this.referenceArray = new AtomicReferenceArray<>(paneCount);
+ }
+
+ /**
+ * Get the pane at the current timestamp.
+ *
+ * @return the pane at current timestamp.
+ */
+ public Pane<T> currentPane() {
+ return currentPane(TimeUtils.currentTimeMillis());
+ }
+
+ /**
+ * Get the pane at the specified timestamp in milliseconds.
+ *
+ * @param timeMillis a timestamp in milliseconds.
+ * @return the pane at the specified timestamp if the time is valid; null
if time is invalid.
+ */
+ public Pane<T> currentPane(long timeMillis) {
+ if (timeMillis < 0) {
+ return null;
+ }
+
+ int paneIdx = calculatePaneIdx(timeMillis);
+ long paneStartInMs = calculatePaneStart(timeMillis);
+
+ while (true) {
+ Pane<T> oldPane = referenceArray.get(paneIdx);
+
+ // Create a pane instance when the pane does not exist.
+ if (oldPane == null) {
+ Pane<T> pane = new Pane<>(paneIntervalInMs, paneStartInMs,
newEmptyValue(timeMillis));
+ if (referenceArray.compareAndSet(paneIdx, null, pane)) {
+ return pane;
+ } else {
+ // Contention failed, the thread will yield its time slice
to wait for pane available.
+ Thread.yield();
+ }
+ }
+ //
+ else if (paneStartInMs == oldPane.getStartInMs()) {
+ return oldPane;
+ }
+ // The pane has deprecated. To avoid the overhead of creating a
new instance, reset the original pane directly.
+ else if (paneStartInMs > oldPane.getStartInMs()) {
+ if (updateLock.tryLock()) {
+ try {
+ return resetPaneTo(oldPane, paneStartInMs);
+ } finally {
+ updateLock.unlock();
+ }
+ } else {
+ // Contention failed, the thread will yield its time slice
to wait for pane available.
+ Thread.yield();
+ }
+ }
+ // The specified timestamp has passed.
+ else if (paneStartInMs < oldPane.getStartInMs()) {
+ return new Pane<>(paneIntervalInMs, paneStartInMs,
newEmptyValue(timeMillis));
+ }
+ }
+ }
+
+ /**
+ * Get statistic value from pane at the specified timestamp.
+ *
+ * @param timeMillis the specified timestamp in milliseconds.
+ * @return the statistic value if pane at the specified timestamp is
up-to-date; otherwise null.
+ */
+ public T getPaneValue(long timeMillis) {
+ if (timeMillis < 0) {
+ return null;
+ }
+
+ int paneIdx = calculatePaneIdx(timeMillis);
+
+ Pane<T> pane = referenceArray.get(paneIdx);
+
+ if (pane == null || !pane.isTimeInWindow(timeMillis)) {
+ return null;
+ }
+
+ return pane.getValue();
+ }
+
+ /**
+ * Create a new statistic value for pane.
+ *
+ * @param timeMillis the specified timestamp in milliseconds.
+ * @return new empty statistic value.
+ */
+ public abstract T newEmptyValue(long timeMillis);
+
+ /**
+ * Reset given pane to the specified start time and reset the value.
+ *
+ * @param pane the given pane.
+ * @param startInMs the start timestamp of the pane in milliseconds.
+ * @return reset pane.
+ */
+ protected abstract Pane<T> resetPaneTo(final Pane<T> pane, long startInMs);
+
+ /**
+ * Calculate the pane index corresponding to the specified timestamp.
+ *
+ * @param timeMillis the specified timestamp.
+ * @return the pane index corresponding to the specified timestamp.
+ */
+ private int calculatePaneIdx(long timeMillis) {
+ return (int) ((timeMillis / paneIntervalInMs) % paneCount);
+ }
+
+ /**
+ * Calculate the pane start corresponding to the specified timestamp.
+ *
+ * @param timeMillis the specified timestamp.
+ * @return the pane start corresponding to the specified timestamp.
+ */
+ protected long calculatePaneStart(long timeMillis) {
+ return timeMillis - timeMillis % paneIntervalInMs;
+ }
+
+ /**
+ * Checks if the specified pane is deprecated at the current timestamp.
+ *
+ * @param pane the specified pane.
+ * @return true if the pane is deprecated; otherwise false.
+ */
+ public boolean isPaneDeprecated(final Pane<T> pane) {
+ return isPaneDeprecated(TimeUtils.currentTimeMillis(), pane);
+ }
+
+ /**
+ * Checks if the specified pane is deprecated at the specified timestamp.
+ *
+ * @param timeMillis the specified time.
+ * @param pane the specified pane.
+ * @return true if the pane is deprecated; otherwise false.
+ */
+ public boolean isPaneDeprecated(long timeMillis, final Pane<T> pane) {
+ // the pane is '[)'
+ return (timeMillis - pane.getStartInMs()) > intervalInMs;
+ }
+
+ /**
+ * Get valid pane list for entire sliding window at the current time.
+ * The list will only contain "valid" panes.
+ *
+ * @return valid pane list for entire sliding window.
+ */
+ public List<Pane<T>> list() {
+ return list(TimeUtils.currentTimeMillis());
+ }
+
+ /**
+ * Get valid pane list for entire sliding window at the specified time.
+ * The list will only contain "valid" panes.
+ *
+ * @param timeMillis the specified time.
+ * @return valid pane list for entire sliding window.
+ */
+ public List<Pane<T>> list(long timeMillis) {
+ if (timeMillis < 0) {
+ return new ArrayList<>();
+ }
+
+ List<Pane<T>> result = new ArrayList<>(paneCount);
+
+ for (int idx = 0; idx < paneCount; idx++) {
+ Pane<T> pane = referenceArray.get(idx);
+ if (pane == null || isPaneDeprecated(timeMillis, pane)) {
+ continue;
+ }
+ result.add(pane);
+ }
+
+ return result;
+ }
+
+ /**
+ * Get aggregated value list for entire sliding window at the current time.
+ * The list will only contain value from "valid" panes.
+ *
+ * @return aggregated value list for entire sliding window.
+ */
+ public List<T> values() {
+ return values(TimeUtils.currentTimeMillis());
+ }
+
+ /**
+ * Get aggregated value list for entire sliding window at the specified
time.
+ * The list will only contain value from "valid" panes.
+ *
+ * @return aggregated value list for entire sliding window.
+ */
+ public List<T> values(long timeMillis) {
+ if (timeMillis < 0) {
+ return new ArrayList<>();
+ }
+
+ List<T> result = new ArrayList<>(paneCount);
+
+ for (int idx = 0; idx < paneCount; idx++) {
+ Pane<T> pane = referenceArray.get(idx);
+ if (pane == null || isPaneDeprecated(timeMillis, pane)) {
+ continue;
+ }
+ result.add(pane.getValue());
+ }
+ return result;
+ }
+
+ /**
+ * Get total interval of the sliding window in milliseconds.
+ *
+ * @return the total interval in milliseconds.
+ */
+ public long getIntervalInMs() {
+ return intervalInMs;
+ }
+
+ /**
+ * Get pane interval of the sliding window in milliseconds.
+ *
+ * @return the interval of a pane in milliseconds.
+ */
+ public long getPaneIntervalInMs() {
+ return paneIntervalInMs;
+ }
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
index 3dc3666602..ee15210066 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounter.java
@@ -17,74 +17,69 @@
package org.apache.dubbo.metrics.aggregate;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
/**
* Wrapper around Counter like Long and Integer.
- * <p>
- * Maintains a ring buffer of Counter to provide count over a sliding windows
of time.
*/
public class TimeWindowCounter {
- private final Long[] ringBuffer;
- private final Long[] bucketStartTimeMillis;
- private int currentBucket;
- private long lastRotateTimestampMillis;
- private final long durationBetweenRotatesMillis;
- public TimeWindowCounter(int bucketNum, int timeWindowSeconds) {
- this.ringBuffer = new Long[bucketNum];
- this.bucketStartTimeMillis = new Long[bucketNum];
- for (int i = 0; i < bucketNum; i++) {
- this.ringBuffer[i] = 0L;
- this.bucketStartTimeMillis[i] = System.currentTimeMillis();
- }
+ private final LongAdderSlidingWindow slidingWindow;
- this.currentBucket = 0;
- this.lastRotateTimestampMillis = System.currentTimeMillis();
- this.durationBetweenRotatesMillis =
TimeUnit.SECONDS.toMillis(timeWindowSeconds) / bucketNum;
+ public TimeWindowCounter(int bucketNum, int timeWindowSeconds) {
+ this.slidingWindow = new LongAdderSlidingWindow(bucketNum,
TimeUnit.SECONDS.toMillis(timeWindowSeconds));
}
- public synchronized double get() {
- return rotate();
+ public double get() {
+ double result = 0.0;
+ List<LongAdder> windows = this.slidingWindow.values();
+ for (LongAdder window : windows) {
+ result += window.sum();
+ }
+ return result;
}
public long bucketLivedSeconds() {
- return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
bucketStartTimeMillis[currentBucket]);
+ return
TimeUnit.MILLISECONDS.toSeconds(this.slidingWindow.values().size() *
this.slidingWindow.getPaneIntervalInMs());
}
- public synchronized void increment() {
+ public void increment() {
this.increment(1L);
}
- public synchronized void increment(Long step) {
- rotate();
- for (int i = 0; i < ringBuffer.length; i++) {
- ringBuffer[i] = ringBuffer[i] + step;
- }
+ public void increment(Long step) {
+ this.slidingWindow.currentPane().getValue().add(step);
}
- public synchronized void decrement() {
+ public void decrement() {
this.decrement(1L);
}
- public synchronized void decrement(Long step) {
- rotate();
- for (int i = 0; i < ringBuffer.length; i++) {
- ringBuffer[i] = ringBuffer[i] - step;
- }
+ public void decrement(Long step) {
+ this.slidingWindow.currentPane().getValue().add(-step);
}
- private Long rotate() {
- long timeSinceLastRotateMillis = System.currentTimeMillis() -
lastRotateTimestampMillis;
- while (timeSinceLastRotateMillis > durationBetweenRotatesMillis) {
- ringBuffer[currentBucket] = 0L;
- bucketStartTimeMillis[currentBucket] = lastRotateTimestampMillis +
durationBetweenRotatesMillis;
- if (++currentBucket >= ringBuffer.length) {
- currentBucket = 0;
- }
- timeSinceLastRotateMillis -= durationBetweenRotatesMillis;
- lastRotateTimestampMillis += durationBetweenRotatesMillis;
+ /**
+ * Sliding window of type LongAdder.
+ */
+ private static class LongAdderSlidingWindow extends
SlidingWindow<LongAdder> {
+
+ public LongAdderSlidingWindow(int sampleCount, long intervalInMs) {
+ super(sampleCount, intervalInMs);
+ }
+
+ @Override
+ public LongAdder newEmptyValue(long timeMillis) {
+ return new LongAdder();
+ }
+
+ @Override
+ protected Pane<LongAdder> resetPaneTo(final Pane<LongAdder> pane, long
startTime) {
+ pane.setStartInMs(startTime);
+ pane.getValue().reset();
+ return pane;
}
- return ringBuffer[currentBucket];
}
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
index 86249c88a2..823024344d 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/main/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantile.java
@@ -19,57 +19,60 @@ package org.apache.dubbo.metrics.aggregate;
import com.tdunning.math.stats.TDigest;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Wrapper around TDigest.
- * <p>
- * Maintains a ring buffer of TDigest to provide quantiles over a sliding
windows of time.
*/
public class TimeWindowQuantile {
+
private final double compression;
- private final TDigest[] ringBuffer;
- private int currentBucket;
- private long lastRotateTimestampMillis;
- private final long durationBetweenRotatesMillis;
+
+ private final DigestSlidingWindow slidingWindow;
public TimeWindowQuantile(double compression, int bucketNum, int
timeWindowSeconds) {
this.compression = compression;
- this.ringBuffer = new TDigest[bucketNum];
- for (int i = 0; i < bucketNum; i++) {
- this.ringBuffer[i] = TDigest.createDigest(compression);
- }
-
- this.currentBucket = 0;
- this.lastRotateTimestampMillis = System.currentTimeMillis();
- this.durationBetweenRotatesMillis =
TimeUnit.SECONDS.toMillis(timeWindowSeconds) / bucketNum;
+ this.slidingWindow = new DigestSlidingWindow(compression, bucketNum,
TimeUnit.SECONDS.toMillis(timeWindowSeconds));
}
- public synchronized double quantile(double q) {
- TDigest currentBucket = rotate();
-
+ public double quantile(double q) {
+ TDigest mergeDigest = TDigest.createDigest(compression);
+ List<TDigest> validWindows = this.slidingWindow.values();
+ for (TDigest window : validWindows) {
+ mergeDigest.add(window);
+ }
// This may return Double.NaN, and it's correct behavior.
// see: https://github.com/prometheus/client_golang/issues/85
- return currentBucket.quantile(q);
+ return mergeDigest.quantile(q);
}
- public synchronized void add(double value) {
- rotate();
- for (TDigest bucket : ringBuffer) {
- bucket.add(value);
- }
+ public void add(double value) {
+ this.slidingWindow.currentPane().getValue().add(value);
}
- private TDigest rotate() {
- long timeSinceLastRotateMillis = System.currentTimeMillis() -
lastRotateTimestampMillis;
- while (timeSinceLastRotateMillis > durationBetweenRotatesMillis) {
- ringBuffer[currentBucket] = TDigest.createDigest(compression);
- if (++currentBucket >= ringBuffer.length) {
- currentBucket = 0;
- }
- timeSinceLastRotateMillis -= durationBetweenRotatesMillis;
- lastRotateTimestampMillis += durationBetweenRotatesMillis;
+ /**
+ * Sliding window of type TDigest.
+ */
+ private static class DigestSlidingWindow extends SlidingWindow<TDigest> {
+
+ private final double compression;
+
+ public DigestSlidingWindow(double compression, int sampleCount, long
intervalInMs) {
+ super(sampleCount, intervalInMs);
+ this.compression = compression;
+ }
+
+ @Override
+ public TDigest newEmptyValue(long timeMillis) {
+ return TDigest.createDigest(compression);
+ }
+
+ @Override
+ protected Pane<TDigest> resetPaneTo(final Pane<TDigest> pane, long
startTime) {
+ pane.setStartInMs(startTime);
+ pane.setValue(TDigest.createDigest(compression));
+ return pane;
}
- return ringBuffer[currentBucket];
}
}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/PaneTest.java
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/PaneTest.java
new file mode 100644
index 0000000000..bbce54ec84
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/PaneTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class PaneTest {
+
+ @Test
+ void testIntervalInMs() {
+ Pane<?> pane = mock(Pane.class);
+ when(pane.getIntervalInMs()).thenReturn(100L);
+ assertEquals(100L, pane.getIntervalInMs());
+ }
+
+ @Test
+ void testStartInMs() {
+ Pane<?> pane = mock(Pane.class);
+ long startTime = System.currentTimeMillis();
+ when(pane.getStartInMs()).thenReturn(startTime);
+ assertEquals(startTime, pane.getStartInMs());
+ }
+
+ @Test
+ void testEndInMs() {
+ long startTime = System.currentTimeMillis();
+ Pane<?> pane = new Pane<>(10, startTime, new Object());
+ assertEquals(startTime + 10, pane.getEndInMs());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testValue() {
+ Pane<LongAdder> pane = mock(Pane.class);
+ LongAdder count = new LongAdder();
+ when(pane.getValue()).thenReturn(count);
+ assertEquals(count, pane.getValue());
+ when(pane.getValue()).thenReturn(null);
+ assertNotEquals(count, pane.getValue());
+ }
+
+ @Test
+ void testIsTimeInWindow() {
+ Pane<?> pane = new Pane<>(10, System.currentTimeMillis(), new
Object());
+ assertTrue(pane.isTimeInWindow(System.currentTimeMillis()));
+ assertFalse(pane.isTimeInWindow(System.currentTimeMillis() + 10));
+ }
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/SlidingWindowTest.java
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/SlidingWindowTest.java
new file mode 100644
index 0000000000..58b74b604f
--- /dev/null
+++
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/SlidingWindowTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.metrics.aggregate;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SlidingWindowTest {
+
+ static final int paneCount = 10;
+
+ static final long intervalInMs = 2000;
+
+ TestSlidingWindow window;
+
+ @BeforeEach
+ void setup() {
+ window = new TestSlidingWindow(paneCount, intervalInMs);
+ }
+
+ @Test
+ void testCurrentPane() {
+ assertNull(window.currentPane(/* invalid time*/-1L));
+ long timeInMs = System.currentTimeMillis();
+ Pane<LongAdder> currentPane = window.currentPane(timeInMs);
+ assertNotNull(currentPane);
+ // reuse test
+ assertEquals(currentPane,
+ window.currentPane(1 + timeInMs + window.getPaneIntervalInMs() *
paneCount));
+ }
+
+ @Test
+ void testGetPaneData() {
+ assertNull(window.getPaneValue(/* invalid time*/-1L));
+ window.currentPane();
+ assertNotNull(window.getPaneValue(System.currentTimeMillis()));
+ assertNull(window.getPaneValue(System.currentTimeMillis() +
window.getPaneIntervalInMs()));
+ }
+
+ @Test
+ void testNewEmptyValue() {
+ assertEquals(0L,
window.newEmptyValue(System.currentTimeMillis()).sum());
+ }
+
+ @Test
+ void testResetPaneTo() {
+ Pane<LongAdder> currentPane = window.currentPane();
+ currentPane.getValue().add(2);
+ currentPane.getValue().add(1);
+ assertEquals(3, currentPane.getValue().sum());
+ window.resetPaneTo(currentPane, System.currentTimeMillis());
+ assertEquals(0, currentPane.getValue().sum());
+ currentPane.getValue().add(1);
+ assertEquals(1, currentPane.getValue().sum());
+ }
+
+ @Test
+ void testCalculatePaneStart() {
+ long time = System.currentTimeMillis();
+ assertTrue(window.calculatePaneStart(time) <= time);
+ assertTrue(time < window.calculatePaneStart(time) +
window.getPaneIntervalInMs());
+ }
+
+ @Test
+ void testIsPaneDeprecated() {
+ Pane<LongAdder> currentPane = window.currentPane();
+ currentPane.setStartInMs(1000000L);
+ assertTrue(window.isPaneDeprecated(currentPane));
+ }
+
+ @Test
+ void testList() {
+ window.currentPane();
+ assertTrue(0 < window.list().size());
+ }
+
+ @Test
+ void testValues() {
+ window.currentPane().getValue().add(10);
+ long sum = 0;
+ for (LongAdder value : window.values()) {
+ sum += value.sum();
+ }
+ assertEquals(10, sum);
+ }
+
+ @Test
+ void testGetIntervalInMs() {
+ assertEquals(intervalInMs, window.getIntervalInMs());
+ }
+
+ @Test
+ void testGetPaneIntervalInMs() {
+ assertEquals(intervalInMs / paneCount, window.getPaneIntervalInMs());
+ }
+
+ private static class TestSlidingWindow extends SlidingWindow<LongAdder> {
+
+ public TestSlidingWindow(int paneCount, long intervalInMs) {
+ super(paneCount, intervalInMs);
+ }
+
+ @Override
+ public LongAdder newEmptyValue(long timeMillis) {
+ return new LongAdder();
+ }
+
+ @Override
+ protected Pane<LongAdder> resetPaneTo(Pane<LongAdder> pane, long
startInMs) {
+ pane.setStartInMs(startInMs);
+ pane.getValue().reset();
+ return pane;
+ }
+ }
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
index 4ff6758020..6cac0d60d8 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowCounterTest.java
@@ -24,7 +24,7 @@ class TimeWindowCounterTest {
@Test
void test() throws Exception {
- TimeWindowCounter counter = new TimeWindowCounter(12, 1);
+ TimeWindowCounter counter = new TimeWindowCounter(10, 1);
counter.increment();
Assertions.assertEquals(counter.get(), 1);
counter.decrement();
@@ -34,4 +34,4 @@ class TimeWindowCounterTest {
Assertions.assertEquals(counter.get(), 0);
Assertions.assertTrue(counter.bucketLivedSeconds() < 1);
}
-}
\ No newline at end of file
+}
diff --git
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
index 7e1d416652..36ee690b01 100644
---
a/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
+++
b/dubbo-metrics/dubbo-metrics-api/src/test/java/org/apache/dubbo/metrics/aggregate/TimeWindowQuantileTest.java
@@ -24,7 +24,7 @@ class TimeWindowQuantileTest {
@Test
void test() throws Exception {
- TimeWindowQuantile quantile = new TimeWindowQuantile(100, 12, 1);
+ TimeWindowQuantile quantile = new TimeWindowQuantile(100, 10, 1);
for (int i = 1; i <= 100; i++) {
quantile.add(i);
}
@@ -34,4 +34,4 @@ class TimeWindowQuantileTest {
Thread.sleep(1000);
Assertions.assertEquals(quantile.quantile(0.99), Double.NaN);
}
-}
\ No newline at end of file
+}