This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ed86869ce [flink] Fix NPE for Flink HistogramStatistics that wraps 
Fluss histogram (#2195)
ed86869ce is described below

commit ed86869ce7cfca3e049e9655bc6a16b3e4277faa
Author: Rion Williams <[email protected]>
AuthorDate: Thu Dec 25 00:12:34 2025 -0600

    [flink] Fix NPE for Flink HistogramStatistics that wraps Fluss histogram 
(#2195)
---
 .../apache/fluss/flink/metrics/FlinkHistogram.java |   5 +-
 .../fluss/flink/metrics/FlinkHistogramTest.java    | 216 +++++++++++++++++++++
 2 files changed, 217 insertions(+), 4 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
index a4e524f5b..27093dac0 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/metrics/FlinkHistogram.java
@@ -41,10 +41,7 @@ public class FlinkHistogram implements Histogram {
 
     @Override
     public HistogramStatistics getStatistics() {
-
-        wrapped.getStatistics();
-
-        return null;
+        return new FlinkHistogramStatistics(wrapped.getStatistics());
     }
 
     private static class FlinkHistogramStatistics extends HistogramStatistics {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkHistogramTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkHistogramTest.java
new file mode 100644
index 000000000..1221ad40e
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkHistogramTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.metrics;
+
+import org.apache.fluss.metrics.Histogram;
+
+import org.apache.flink.metrics.HistogramStatistics;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlinkHistogram}. */
+class FlinkHistogramTest {
+
+    private FlinkHistogram flinkHistogram;
+    private TestFlussHistogram testFlussHistogram;
+
+    @BeforeEach
+    void setUp() {
+        testFlussHistogram = new TestFlussHistogram();
+        flinkHistogram = new FlinkHistogram(testFlussHistogram);
+    }
+
+    @Test
+    void testUpdate() {
+        flinkHistogram.update(100L);
+        assertThat(testFlussHistogram.getUpdateCount()).isEqualTo(1);
+        assertThat(testFlussHistogram.getLastUpdateValue()).isEqualTo(100L);
+    }
+
+    @Test
+    void testGetCount() {
+        testFlussHistogram.setCount(5L);
+        assertThat(flinkHistogram.getCount()).isEqualTo(5L);
+    }
+
+    @Test
+    void testGetStatisticsReturnsNonNull() {
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics).isNotNull();
+    }
+
+    @Test
+    void testGetStatisticsGetMin() {
+        testFlussHistogram.setMin(10L);
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics.getMin()).isEqualTo(10L);
+    }
+
+    @Test
+    void testGetStatisticsGetMax() {
+        testFlussHistogram.setMax(100L);
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics.getMax()).isEqualTo(100L);
+    }
+
+    @Test
+    void testGetStatisticsGetMean() {
+        testFlussHistogram.setMean(50.5);
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics.getMean()).isEqualTo(50.5);
+    }
+
+    @Test
+    void testGetStatisticsGetStdDev() {
+        testFlussHistogram.setStdDev(15.2);
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics.getStdDev()).isEqualTo(15.2);
+    }
+
+    @Test
+    void testGetStatisticsGetQuantile() {
+        testFlussHistogram.setQuantile(0.5, 25.0);
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics.getQuantile(0.5)).isEqualTo(25.0);
+    }
+
+    @Test
+    void testGetStatisticsGetValues() {
+        long[] expectedValues = {1L, 2L, 3L};
+        testFlussHistogram.setValues(expectedValues);
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics.getValues()).isEqualTo(expectedValues);
+    }
+
+    @Test
+    void testGetStatisticsSize() {
+        testFlussHistogram.setSize(42);
+        HistogramStatistics statistics = flinkHistogram.getStatistics();
+        assertThat(statistics.size()).isEqualTo(42);
+    }
+
+    /** Test implementation of Fluss Histogram for unit testing. */
+    private static class TestFlussHistogram implements Histogram {
+        private long count = 0;
+        private long lastUpdateValue = 0;
+        private int updateCount = 0;
+        private long min = 0;
+        private long max = 0;
+        private double mean = 0.0;
+        private double stdDev = 0.0;
+        private long[] values = new long[0];
+        private int size = 0;
+        private final Map<Double, Double> quantiles = new HashMap<>();
+
+        @Override
+        public void update(long value) {
+            lastUpdateValue = value;
+            updateCount++;
+        }
+
+        @Override
+        public long getCount() {
+            return count;
+        }
+
+        public void setCount(long count) {
+            this.count = count;
+        }
+
+        @Override
+        public org.apache.fluss.metrics.HistogramStatistics getStatistics() {
+            return new org.apache.fluss.metrics.HistogramStatistics() {
+                @Override
+                public double getQuantile(double quantile) {
+                    return quantiles.getOrDefault(quantile, quantile);
+                }
+
+                @Override
+                public long[] getValues() {
+                    return values;
+                }
+
+                @Override
+                public int size() {
+                    return size;
+                }
+
+                @Override
+                public double getMean() {
+                    return mean;
+                }
+
+                @Override
+                public double getStdDev() {
+                    return stdDev;
+                }
+
+                @Override
+                public long getMax() {
+                    return max;
+                }
+
+                @Override
+                public long getMin() {
+                    return min;
+                }
+            };
+        }
+
+        public long getLastUpdateValue() {
+            return lastUpdateValue;
+        }
+
+        public int getUpdateCount() {
+            return updateCount;
+        }
+
+        public void setMin(long min) {
+            this.min = min;
+        }
+
+        public void setMax(long max) {
+            this.max = max;
+        }
+
+        public void setMean(double mean) {
+            this.mean = mean;
+        }
+
+        public void setStdDev(double stdDev) {
+            this.stdDev = stdDev;
+        }
+
+        public void setValues(long[] values) {
+            this.values = values;
+        }
+
+        public void setSize(int size) {
+            this.size = size;
+        }
+
+        public void setQuantile(double quantile, double value) {
+            this.quantiles.put(quantile, value);
+        }
+    }
+}

Reply via email to