This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 559ed4c [FLINK-25904][metrics] Lazily initialize Percentile
559ed4c is described below
commit 559ed4c8e5bf21b525f775d39de767cc87016ed5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Mar 18 15:06:19 2022 +0100
[FLINK-25904][metrics] Lazily initialize Percentile
Percentile serialization doesn't work properly (see MATH-1642), so instead
of serialize the data array and lazily initialize the Percentile as needed.
---
.../DescriptiveStatisticsHistogramStatistics.java | 29 ++++++++----
.../DescriptiveStatisticsHistogramTest.java | 53 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 9 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
index 12975b6..8eb5036 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramStatistics.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.metrics;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.commons.math3.exception.MathIllegalArgumentException;
@@ -90,15 +91,16 @@ public class DescriptiveStatisticsHistogramStatistics
extends HistogramStatistic
* will not return a value but instead populate this class so that further
values can be
* retrieved from it.
*/
- private static class CommonMetricsSnapshot implements UnivariateStatistic,
Serializable {
- private static final long serialVersionUID = 1L;
+ @VisibleForTesting
+ static class CommonMetricsSnapshot implements UnivariateStatistic,
Serializable {
+ private static final long serialVersionUID = 2L;
- private long count = 0;
+ private double[] data;
private double min = Double.NaN;
private double max = Double.NaN;
private double mean = Double.NaN;
private double stddev = Double.NaN;
- private Percentile percentilesImpl = new
Percentile().withNaNStrategy(NaNStrategy.FIXED);
+ private transient Percentile percentilesImpl;
@Override
public double evaluate(final double[] values) throws
MathIllegalArgumentException {
@@ -108,8 +110,7 @@ public class DescriptiveStatisticsHistogramStatistics
extends HistogramStatistic
@Override
public double evaluate(double[] values, int begin, int length)
throws MathIllegalArgumentException {
- this.count = length;
- percentilesImpl.setData(values, begin, length);
+ this.data = values;
SimpleStats secondMoment = new SimpleStats();
secondMoment.evaluate(values, begin, length);
@@ -125,17 +126,16 @@ public class DescriptiveStatisticsHistogramStatistics
extends HistogramStatistic
@Override
public CommonMetricsSnapshot copy() {
CommonMetricsSnapshot result = new CommonMetricsSnapshot();
- result.count = count;
+ result.data = Arrays.copyOf(data, data.length);
result.min = min;
result.max = max;
result.mean = mean;
result.stddev = stddev;
- result.percentilesImpl = percentilesImpl.copy();
return result;
}
long getCount() {
- return count;
+ return data.length;
}
double getMin() {
@@ -155,12 +155,23 @@ public class DescriptiveStatisticsHistogramStatistics
extends HistogramStatistic
}
double getPercentile(double p) {
+ maybeInitPercentile();
return percentilesImpl.evaluate(p);
}
double[] getValues() {
+ maybeInitPercentile();
return percentilesImpl.getData();
}
+
+ private void maybeInitPercentile() {
+ if (percentilesImpl == null) {
+ percentilesImpl = new
Percentile().withNaNStrategy(NaNStrategy.FIXED);
+ }
+ if (data != null) {
+ percentilesImpl.setData(data);
+ }
+ }
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
index a919d70..f0a27b3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogramTest.java
@@ -19,10 +19,15 @@
package org.apache.flink.runtime.metrics;
import org.apache.flink.metrics.AbstractHistogramTest;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLoggerExtension;
+import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.utility.ThrowingFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link DescriptiveStatisticsHistogram} and {@link
@@ -31,10 +36,58 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(TestLoggerExtension.class)
class DescriptiveStatisticsHistogramTest extends AbstractHistogramTest {
+ private static final double[] DATA = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+
/** Tests the histogram functionality of the DropwizardHistogramWrapper. */
@Test
void testDescriptiveHistogram() {
int size = 10;
testHistogram(size, new DescriptiveStatisticsHistogram(size));
}
+
+ /** Tests our workaround for
https://issues.apache.org/jira/browse/MATH-1642. */
+ @Test
+ void testSerialization() throws Exception {
+ testDuplication(
+ original -> {
+ final byte[] bytes =
InstantiationUtil.serializeObject(original);
+ return
(DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot)
+ InstantiationUtil.deserializeObject(bytes,
getClass().getClassLoader());
+ });
+ }
+
+ @Test
+ void testCopy() throws Exception {
+
testDuplication(DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot::copy);
+ }
+
+ private static void testDuplication(
+ ThrowingFunction<
+
DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot,
+
DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot>
+ duplicator)
+ throws Exception {
+
+ DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot
original =
+ new
DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot();
+ original.evaluate(DATA);
+
+ assertOperations(original);
+
+ final DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot
copy =
+ duplicator.apply(original);
+
+ assertOperations(copy);
+ }
+
+ private static void assertOperations(
+ DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot
statistics) {
+ assertThat(statistics.getPercentile(0.5)).isEqualTo(1);
+ assertThat(statistics.getCount()).isEqualTo(9);
+ assertThat(statistics.getMin()).isEqualTo(1);
+ assertThat(statistics.getMax()).isEqualTo(9);
+ assertThat(statistics.getMean()).isEqualTo(5);
+ assertThat(statistics.getStandardDeviation()).isCloseTo(2.7,
Offset.offset(0.5));
+ assertThat(statistics.getValues()).containsExactly(DATA);
+ }
}