Myasuka commented on a change in pull request #15119:
URL: https://github.com/apache/flink/pull/15119#discussion_r600240648
##########
File path: flink-runtime/pom.xml
##########
@@ -56,6 +56,12 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+
Review comment:
To be honest, this feature to track state access latency is first
introduced in our own internal fork of Flink-1.5, in which
`org.apache.flink.metrics.DescriptiveStatisticsHistogram` does not have good
performance. And after your doubt, I found Flink had improved the performance
after FLINK-10241. To get the real result, I verified the performance
difference between `com.codahale.metrics.Histogram +
SlidingTimeWindowReservoir` with `DescriptiveStatisticsHistogram +
CircularDoubleArray`:
~~~java
public class PerformanceTests {
public static void main(String[] args) throws InterruptedException {
final Histogram histogram = getHistogramWrapper();
// final Histogram histogram = getFlinkHistogram();
final ScheduledExecutorService ioExecutor =
Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("ioExecutor"));
ioExecutor.scheduleWithFixedDelay(() -> {
System.out.println("histogram, 0.99:" +
histogram.getStatistics().getQuantile(0.99) + ", mean: " +
histogram.getStatistics().getMean() + ", max:" +
histogram.getStatistics().getMax());
}, 1, 5, TimeUnit.SECONDS);
long value = 1L;
long start = System.nanoTime();
int mod = (1 << 21) - 1;
while (true) {
histogram.update(value);
if ((value & mod) == 0) {
double duration = (System.nanoTime() - start) / 1000000000.0;
System.out.println("QPS: " + value / duration);
if (duration > 40) {
break;
}
}
value += 1L;
}
}
private static Histogram getHistogramWrapper() {
return new HistogramWrapper(new com.codahale.metrics.Histogram(
new SlidingTimeWindowReservoir(5, TimeUnit.SECONDS)));
}
private static Histogram getFlinkHistogram() {
return new DescriptiveStatisticsHistogram(128);
}
static class HistogramWrapper implements Histogram {
private final com.codahale.metrics.Histogram histogram;
public HistogramWrapper(com.codahale.metrics.Histogram histogram) {
this.histogram = histogram;
}
@Override
public void update(long value) {
histogram.update(value);
}
@Override
public long getCount() {
return histogram.getCount();
}
@Override
public HistogramStatistics getStatistics() {
return new
SnapshotHistogramStatistics(this.histogram.getSnapshot());
}
}
private static class SnapshotHistogramStatistics extends
HistogramStatistics {
private final Snapshot snapshot;
SnapshotHistogramStatistics(com.codahale.metrics.Snapshot snapshot) {
this.snapshot = snapshot;
}
@Override
public double getQuantile(double quantile) {
return snapshot.getValue(quantile);
}
@Override
public long[] getValues() {
return snapshot.getValues();
}
@Override
public int size() {
return snapshot.size();
}
@Override
public double getMean() {
return snapshot.getMean();
}
@Override
public double getStdDev() {
return snapshot.getStdDev();
}
@Override
public long getMax() {
return snapshot.getMax();
}
@Override
public long getMin() {
return snapshot.getMin();
}
}
}
~~~
By doing so, I found current Flink's implementation show better
performance, and I decided to change current implementation of my PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]