Myasuka commented on a change in pull request #15119:
URL: https://github.com/apache/flink/pull/15119#discussion_r600240648



##########
File path: flink-runtime/pom.xml
##########
@@ -56,6 +56,12 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${dropwizard.version}</version>
+               </dependency>
+

Review comment:
       To be honest, this feature to track state access latency is first 
introduced in our own internal fork of Flink-1.5, in which 
`org.apache.flink.metrics.DescriptiveStatisticsHistogram` does not have good 
performance. And after your doubt, I found Flink had improved the performance 
after FLINK-10241. To get the real result, I verified the performance 
difference between `com.codahale.metrics.Histogram + 
SlidingTimeWindowReservoir` with `DescriptiveStatisticsHistogram + 
CircularDoubleArray`:
   ~~~java
   public class PerformanceTests {
       public static void main(String[] args) throws InterruptedException {
           final Histogram histogram = getHistogramWrapper();
   //        final Histogram histogram = getFlinkHistogram();
   
           final ScheduledExecutorService ioExecutor =
                   Executors.newScheduledThreadPool(
                           Hardware.getNumberCPUCores(),
                           new ExecutorThreadFactory("ioExecutor"));
           ioExecutor.scheduleWithFixedDelay(() -> {
               System.out.println("histogram, 0.99:" + 
histogram.getStatistics().getQuantile(0.99) + ", mean: " + 
histogram.getStatistics().getMean() + ", max:" + 
histogram.getStatistics().getMax());
           }, 1, 5, TimeUnit.SECONDS);
   
           long value = 1L;
           long start = System.nanoTime();
           int mod = (1 << 21) - 1;
           while (true) {
               histogram.update(value);
               if ((value & mod) == 0) {
                   double duration = (System.nanoTime() - start) / 1000000000.0;
                   System.out.println("QPS: " + value / duration);
                   if (duration > 40) {
                       break;
                   }
               }
               value += 1L;
           }
       }
   
       private static Histogram getHistogramWrapper() {
           return new HistogramWrapper(new com.codahale.metrics.Histogram(
                   new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)));
       }
   
       private static Histogram getFlinkHistogram() {
           return new DescriptiveStatisticsHistogram(128);
       }
   
       static class HistogramWrapper implements Histogram {
           private final com.codahale.metrics.Histogram histogram;
   
           public HistogramWrapper(com.codahale.metrics.Histogram histogram) {
               this.histogram = histogram;
           }
   
           @Override
           public void update(long value) {
               histogram.update(value);
           }
   
           @Override
           public long getCount() {
               return histogram.getCount();
           }
   
           @Override
           public HistogramStatistics getStatistics() {
               return new 
SnapshotHistogramStatistics(this.histogram.getSnapshot());
           }
       }
   
       private static class SnapshotHistogramStatistics extends 
HistogramStatistics {
   
           private final Snapshot snapshot;
   
           SnapshotHistogramStatistics(com.codahale.metrics.Snapshot snapshot) {
               this.snapshot = snapshot;
           }
   
           @Override
           public double getQuantile(double quantile) {
               return snapshot.getValue(quantile);
           }
   
           @Override
           public long[] getValues() {
               return snapshot.getValues();
           }
   
           @Override
           public int size() {
               return snapshot.size();
           }
   
           @Override
           public double getMean() {
               return snapshot.getMean();
           }
   
           @Override
           public double getStdDev() {
               return snapshot.getStdDev();
           }
   
           @Override
           public long getMax() {
               return snapshot.getMax();
           }
   
           @Override
           public long getMin() {
               return snapshot.getMin();
           }
       }
   }
   ~~~
   
    By doing so, I found current Flink's implementation show better 
performance, and I decided to change current implementation of my PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to