NicoK commented on a change in pull request #8888: [FLINK-12983][metrics]
replace descriptive histogram's storage back-end
URL: https://github.com/apache/flink/pull/8888#discussion_r315374624
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
##########
@@ -27,27 +27,63 @@
*/
public class DescriptiveStatisticsHistogram implements
org.apache.flink.metrics.Histogram {
- private final DescriptiveStatistics descriptiveStatistics;
-
- private long elementsSeen = 0L;
+ private final CircularDoubleArray descriptiveStatistics;
public DescriptiveStatisticsHistogram(int windowSize) {
- this.descriptiveStatistics = new
DescriptiveStatistics(windowSize);
+ this.descriptiveStatistics = new
CircularDoubleArray(windowSize);
}
@Override
public void update(long value) {
- elementsSeen += 1L;
this.descriptiveStatistics.addValue(value);
}
@Override
public long getCount() {
- return this.elementsSeen;
+ return this.descriptiveStatistics.getElementsSeen();
}
@Override
public HistogramStatistics getStatistics() {
return new
DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);
}
+
+ /**
+ * Fixed-size array that wraps around at the end and has a dynamic
start position.
+ */
+ static class CircularDoubleArray {
+ private final double[] backingArray;
+ private int nextPos = 0;
+ private boolean fullSize = false;
+ private long elementsSeen = 0;
+
+ CircularDoubleArray(int windowSize) {
+ this.backingArray = new double[windowSize];
+ }
+
+ synchronized void addValue(double value) {
+ backingArray[nextPos] = value;
+ ++elementsSeen;
+ ++nextPos;
+ if (nextPos == backingArray.length) {
+ nextPos = 0;
+ fullSize = true;
+ }
+ }
+
+ synchronized double[] toUnsortedArray() {
+ final int size = getSize();
+ double[] result = new double[size];
+ System.arraycopy(backingArray, 0, result, 0,
result.length);
Review comment:
`CircularDoubleArray` is a package-private class - I'm wondering who/which
component would require a sorted array then. This circular array has special
APIs (making clear that we return an unsorted array) and is only used by
`org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot`
which does not require sorting. Whether sorting is required is basically
defined via
`org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot`.
Out of curiosity, I added this method and used it instead:
```
public synchronized double[] toArray() {
if (fullSize) {
final double[] result = new
double[backingArray.length];
final int firstLength = result.length - nextPos;
System.arraycopy(backingArray, nextPos, result,
0, firstLength);
System.arraycopy(backingArray, 0, result,
firstLength, nextPos);
return result;
} else {
final double[] result = new double[nextPos];
System.arraycopy(backingArray, 0, result, 0,
nextPos);
return result;
}
}
```
First quick results are as follows and show the benefit for not working with
a sorted array if we don't need it (and are requesting the histogram a lot):
```
Flink 1.9
Benchmark Mode Cnt Score Error
Units
HistogramBenchmarks.descriptiveHistogram thrpt 30 89.224 ± 1.974
ops/ms
HistogramBenchmarks.descriptiveHistogramAdd thrpt 30 56034.903 ± 939.263
ops/ms
Flink 1.10 + FLINK-12983
Benchmark Mode Cnt Score
Error Units
HistogramBenchmarks.descriptiveHistogram thrpt 30 280.240 ±
3.747 ops/ms
HistogramBenchmarks.descriptiveHistogramAdd thrpt 30 207176.894 ±
2417.831 ops/ms
Flink 1.10 + FLINK-12983 + sortedArray
Benchmark Mode Cnt Score
Error Units
HistogramBenchmarks.descriptiveHistogram thrpt 30 239.000 ±
1.506 ops/ms
HistogramBenchmarks.descriptiveHistogramAdd thrpt 30 210352.180 ±
1135.394 ops/ms
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services