This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ed86869ce [flink] Fix NPE for Flink HistogramStatistics that wraps
Fluss histogram (#2195)
ed86869ce is described below
commit ed86869ce7cfca3e049e9655bc6a16b3e4277faa
Author: Rion Williams <[email protected]>
AuthorDate: Thu Dec 25 00:12:34 2025 -0600
[flink] Fix NPE for Flink HistogramStatistics that wraps Fluss histogram
(#2195)
---
.../apache/fluss/flink/metrics/FlinkHistogram.java | 5 +-
.../fluss/flink/metrics/FlinkHistogramTest.java | 216 +++++++++++++++++++++
2 files changed, 217 insertions(+), 4 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
index a4e524f5b..27093dac0 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
@@ -41,10 +41,7 @@ public class FlinkHistogram implements Histogram {
@Override
public HistogramStatistics getStatistics() {
-
- wrapped.getStatistics();
-
- return null;
+ return new FlinkHistogramStatistics(wrapped.getStatistics());
}
private static class FlinkHistogramStatistics extends HistogramStatistics {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkHistogramTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkHistogramTest.java
new file mode 100644
index 000000000..1221ad40e
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkHistogramTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.metrics;
+
+import org.apache.fluss.metrics.Histogram;
+
+import org.apache.flink.metrics.HistogramStatistics;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlinkHistogram}. */
+class FlinkHistogramTest {
+
+ private FlinkHistogram flinkHistogram;
+ private TestFlussHistogram testFlussHistogram;
+
+ @BeforeEach
+ void setUp() {
+ testFlussHistogram = new TestFlussHistogram();
+ flinkHistogram = new FlinkHistogram(testFlussHistogram);
+ }
+
+ @Test
+ void testUpdate() {
+ flinkHistogram.update(100L);
+ assertThat(testFlussHistogram.getUpdateCount()).isEqualTo(1);
+ assertThat(testFlussHistogram.getLastUpdateValue()).isEqualTo(100L);
+ }
+
+ @Test
+ void testGetCount() {
+ testFlussHistogram.setCount(5L);
+ assertThat(flinkHistogram.getCount()).isEqualTo(5L);
+ }
+
+ @Test
+ void testGetStatisticsReturnsNonNull() {
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics).isNotNull();
+ }
+
+ @Test
+ void testGetStatisticsGetMin() {
+ testFlussHistogram.setMin(10L);
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics.getMin()).isEqualTo(10L);
+ }
+
+ @Test
+ void testGetStatisticsGetMax() {
+ testFlussHistogram.setMax(100L);
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics.getMax()).isEqualTo(100L);
+ }
+
+ @Test
+ void testGetStatisticsGetMean() {
+ testFlussHistogram.setMean(50.5);
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics.getMean()).isEqualTo(50.5);
+ }
+
+ @Test
+ void testGetStatisticsGetStdDev() {
+ testFlussHistogram.setStdDev(15.2);
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics.getStdDev()).isEqualTo(15.2);
+ }
+
+ @Test
+ void testGetStatisticsGetQuantile() {
+ testFlussHistogram.setQuantile(0.5, 25.0);
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics.getQuantile(0.5)).isEqualTo(25.0);
+ }
+
+ @Test
+ void testGetStatisticsGetValues() {
+ long[] expectedValues = {1L, 2L, 3L};
+ testFlussHistogram.setValues(expectedValues);
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics.getValues()).isEqualTo(expectedValues);
+ }
+
+ @Test
+ void testGetStatisticsSize() {
+ testFlussHistogram.setSize(42);
+ HistogramStatistics statistics = flinkHistogram.getStatistics();
+ assertThat(statistics.size()).isEqualTo(42);
+ }
+
+ /** Test implementation of Fluss Histogram for unit testing. */
+ private static class TestFlussHistogram implements Histogram {
+ private long count = 0;
+ private long lastUpdateValue = 0;
+ private int updateCount = 0;
+ private long min = 0;
+ private long max = 0;
+ private double mean = 0.0;
+ private double stdDev = 0.0;
+ private long[] values = new long[0];
+ private int size = 0;
+ private final Map<Double, Double> quantiles = new HashMap<>();
+
+ @Override
+ public void update(long value) {
+ lastUpdateValue = value;
+ updateCount++;
+ }
+
+ @Override
+ public long getCount() {
+ return count;
+ }
+
+ public void setCount(long count) {
+ this.count = count;
+ }
+
+ @Override
+ public org.apache.fluss.metrics.HistogramStatistics getStatistics() {
+ return new org.apache.fluss.metrics.HistogramStatistics() {
+ @Override
+ public double getQuantile(double quantile) {
+ return quantiles.getOrDefault(quantile, quantile);
+ }
+
+ @Override
+ public long[] getValues() {
+ return values;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public double getMean() {
+ return mean;
+ }
+
+ @Override
+ public double getStdDev() {
+ return stdDev;
+ }
+
+ @Override
+ public long getMax() {
+ return max;
+ }
+
+ @Override
+ public long getMin() {
+ return min;
+ }
+ };
+ }
+
+ public long getLastUpdateValue() {
+ return lastUpdateValue;
+ }
+
+ public int getUpdateCount() {
+ return updateCount;
+ }
+
+ public void setMin(long min) {
+ this.min = min;
+ }
+
+ public void setMax(long max) {
+ this.max = max;
+ }
+
+ public void setMean(double mean) {
+ this.mean = mean;
+ }
+
+ public void setStdDev(double stdDev) {
+ this.stdDev = stdDev;
+ }
+
+ public void setValues(long[] values) {
+ this.values = values;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public void setQuantile(double quantile, double value) {
+ this.quantiles.put(quantile, value);
+ }
+ }
+}