NicoK commented on a change in pull request #8888: [FLINK-12983][metrics] 
replace descriptive histogram's storage back-end
URL: https://github.com/apache/flink/pull/8888#discussion_r315374624
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 ##########
 @@ -27,27 +27,63 @@
  */
 public class DescriptiveStatisticsHistogram implements 
org.apache.flink.metrics.Histogram {
 
-       private final DescriptiveStatistics descriptiveStatistics;
-
-       private long elementsSeen = 0L;
+       private final CircularDoubleArray descriptiveStatistics;
 
        public DescriptiveStatisticsHistogram(int windowSize) {
-               this.descriptiveStatistics = new 
DescriptiveStatistics(windowSize);
+               this.descriptiveStatistics = new 
CircularDoubleArray(windowSize);
        }
 
        @Override
        public void update(long value) {
-               elementsSeen += 1L;
                this.descriptiveStatistics.addValue(value);
        }
 
        @Override
        public long getCount() {
-               return this.elementsSeen;
+               return this.descriptiveStatistics.getElementsSeen();
        }
 
        @Override
        public HistogramStatistics getStatistics() {
                return new 
DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);
        }
+
+       /**
+        * Fixed-size array that wraps around at the end and has a dynamic 
start position.
+        */
+       static class CircularDoubleArray {
+               private final double[] backingArray;
+               private int nextPos = 0;
+               private boolean fullSize = false;
+               private long elementsSeen = 0;
+
+               CircularDoubleArray(int windowSize) {
+                       this.backingArray = new double[windowSize];
+               }
+
+               synchronized void addValue(double value) {
+                       backingArray[nextPos] = value;
+                       ++elementsSeen;
+                       ++nextPos;
+                       if (nextPos == backingArray.length) {
+                               nextPos = 0;
+                               fullSize = true;
+                       }
+               }
+
+               synchronized double[] toUnsortedArray() {
+                       final int size = getSize();
+                       double[] result = new double[size];
+                       System.arraycopy(backingArray, 0, result, 0, 
result.length);
 
 Review comment:
   `CircularDoubleArray` is a package-private class - I'm wondering who/which 
component would require a sorted array then. This circular array has special 
APIs (making clear that we return an unsorted array) and is only used by 
`org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot`
 which does not require sorting. Whether sorting is required is basically 
defined via 
`org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot`.
   
   Out of curiosity, I added this method and used it instead:
   ```
   public synchronized double[] toArray() {
        if (fullSize) {
                final double[] result = new double[backingArray.length];
                final int firstLength = result.length - nextPos;
                System.arraycopy(backingArray, nextPos, result, 0, firstLength);
                System.arraycopy(backingArray, 0, result, firstLength, nextPos);
                return result;
        } else {
                final double[] result = new double[nextPos];
                System.arraycopy(backingArray, 0, result, 0, nextPos);
                return result;
        }
   }
   ```
   
   First quick results are as follows and show the benefit for not working with 
a sorted array if we don't need it (and are requesting the histogram a lot):
   ```
   Flink 1.9
   Benchmark                                     Mode  Cnt      Score     Error 
  Units
   HistogramBenchmarks.descriptiveHistogram     thrpt   30     89.224 ±   1.974 
 ops/ms
   HistogramBenchmarks.descriptiveHistogramAdd  thrpt   30  56034.903 ± 939.263 
 ops/ms
   
   Flink 1.10 + FLINK-12983
   Benchmark                                     Mode  Cnt       Score      
Error   Units
   HistogramBenchmarks.descriptiveHistogram     thrpt   30     280.240 ±    
3.747  ops/ms
   HistogramBenchmarks.descriptiveHistogramAdd  thrpt   30  207176.894 ± 
2417.831  ops/ms
   
   Flink 1.10 + FLINK-12983 + sortedArray
   Benchmark                                     Mode  Cnt       Score      
Error   Units
   HistogramBenchmarks.descriptiveHistogram     thrpt   30     239.000 ±    
1.506  ops/ms
   HistogramBenchmarks.descriptiveHistogramAdd  thrpt   30  210352.180 ± 
1135.394  ops/ms
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to