This is an automated email from the ASF dual-hosted git repository.

1996fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cce8e8f74f [FLINK-39790][state/metrics] Fix State latency/size 
tracking metrics not exported without sampling
8cce8e8f74f is described below

commit 8cce8e8f74f84de90416c97059fbe8d94d2a8ba9
Author: Efrat Levitan <[email protected]>
AuthorDate: Tue May 26 12:00:17 2026 +0300

    [FLINK-39790][state/metrics] Fix State latency/size tracking metrics not 
exported without sampling
    
    Previousely when set to
    state.*-track.keyed-state-enabled: true
    state.*-track.sample-interval: 1
    
    metrics are not exported because for sampleInterval of 1, 
StateMetricBase#loopUpdateCounter will always return 0.
---
 .../runtime/state/metrics/StateMetricBase.java     |  9 ++++
 .../MetricsTrackingAggregatingStateTest.java       |  8 ++--
 .../metrics/MetricsTrackingListStateTest.java      |  8 ++--
 .../state/metrics/MetricsTrackingMapStateTest.java | 16 +++----
 .../metrics/MetricsTrackingReducingStateTest.java  |  8 ++--
 .../metrics/MetricsTrackingStateTestBase.java      | 15 ++++---
 .../metrics/MetricsTrackingValueStateTest.java     | 52 ++++++++++++++++++++--
 7 files changed, 87 insertions(+), 29 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
index ca600e8018a..48423a96f3c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.metrics;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
@@ -61,6 +62,9 @@ class StateMetricBase implements AutoCloseable {
     }
 
     protected int loopUpdateCounter(int counter) {
+        if (sampleInterval == 1) {
+            return 1;
+        }
         return (counter + 1 < sampleInterval) ? counter + 1 : 0;
     }
 
@@ -84,4 +88,9 @@ class StateMetricBase implements AutoCloseable {
     public void close() throws Exception {
         histogramMetrics.clear();
     }
+
+    @VisibleForTesting
+    Map<String, Histogram> getHistogramMetrics() {
+        return histogramMetrics;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
index 8f576f6c276..bde33292f15 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
@@ -96,8 +96,8 @@ class MetricsTrackingAggregatingStateTest extends 
MetricsTrackingStateTestBase<I
 
             setCurrentKey(keyedBackend);
             ThreadLocalRandom random = ThreadLocalRandom.current();
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 latencyTrackingState.add(random.nextLong());
                 
assertThat(latencyTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
 
@@ -136,8 +136,8 @@ class MetricsTrackingAggregatingStateTest extends 
MetricsTrackingStateTestBase<I
 
             setCurrentKey(keyedBackend);
             ThreadLocalRandom random = ThreadLocalRandom.current();
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 sizeTrackingState.add(random.nextLong());
                 
assertThat(sizeTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
index c1d9bf415e3..719df14d9ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
@@ -70,8 +70,8 @@ class MetricsTrackingListStateTest extends 
MetricsTrackingStateTestBase<Integer>
             
assertThat(latencyTrackingStateMetric.getMergeNamespaceCount()).isZero();
 
             setCurrentKey(keyedBackend);
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 
latencyTrackingState.add(ThreadLocalRandom.current().nextLong());
                 
assertThat(latencyTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
 
@@ -118,8 +118,8 @@ class MetricsTrackingListStateTest extends 
MetricsTrackingStateTestBase<Integer>
             
assertThat(sizeTrackingStateMetric.getMergeNamespaceCount()).isZero();
 
             setCurrentKey(keyedBackend);
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 sizeTrackingState.add(ThreadLocalRandom.current().nextLong());
                 
assertThat(sizeTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
index 51ff9efdbba..3e0e0d2f213 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
@@ -81,8 +81,8 @@ class MetricsTrackingMapStateTest extends 
MetricsTrackingStateTestBase<Integer>
 
             setCurrentKey(keyedBackend);
             ThreadLocalRandom random = ThreadLocalRandom.current();
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 latencyTrackingState.put(random.nextLong(), 
random.nextDouble());
                 
assertThat(latencyTrackingStateMetric.getPutCount()).isEqualTo(expectedResult);
 
@@ -153,8 +153,8 @@ class MetricsTrackingMapStateTest extends 
MetricsTrackingStateTestBase<Integer>
 
             setCurrentKey(keyedBackend);
             ThreadLocalRandom random = ThreadLocalRandom.current();
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 sizeTrackingState.put(random.nextLong(), random.nextDouble());
                 
assertThat(sizeTrackingStateMetric.getPutCount()).isEqualTo(expectedResult);
 
@@ -270,13 +270,13 @@ class MetricsTrackingMapStateTest extends 
MetricsTrackingStateTestBase<Integer>
             boolean removeIterator)
             throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
+        for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
             latencyTrackingState.put((long) index, random.nextDouble());
         }
         int count = 1;
         Iterator<E> iterator = iteratorSupplier.get();
         while (iterator.hasNext()) {
-            int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
+            int expectedResult = count == DEFAULT_SAMPLE_INTERVAL ? 0 : count;
             assertThat(latencyTrackingStateMetric.getIteratorHasNextCount())
                     .isEqualTo(expectedResult);
 
@@ -303,13 +303,13 @@ class MetricsTrackingMapStateTest extends 
MetricsTrackingStateTestBase<Integer>
             boolean removeIterator)
             throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
+        for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
             sizeTrackingState.put((long) index, random.nextDouble());
         }
         int count = 1;
         Iterator<E> iterator = iteratorSupplier.get();
         while (iterator.hasNext()) {
-            int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
+            int expectedResult = count == DEFAULT_SAMPLE_INTERVAL ? 0 : count;
 
             iterator.next();
             
assertThat(sizeTrackingStateMetric.getIteratorNextCount()).isEqualTo(expectedResult);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
index 383ef0d7195..2e4cdf645fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
@@ -69,8 +69,8 @@ class MetricsTrackingReducingStateTest extends 
MetricsTrackingStateTestBase<Inte
 
             setCurrentKey(keyedBackend);
             ThreadLocalRandom random = ThreadLocalRandom.current();
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 latencyTrackingState.add(random.nextLong());
                 
assertThat(latencyTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
 
@@ -108,8 +108,8 @@ class MetricsTrackingReducingStateTest extends 
MetricsTrackingStateTestBase<Inte
 
             setCurrentKey(keyedBackend);
             ThreadLocalRandom random = ThreadLocalRandom.current();
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 sizeTrackingState.add(random.nextLong());
                 
assertThat(sizeTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
index fd686b16d64..d29f2661743 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
@@ -50,10 +50,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test base for latency tracking state. */
 abstract class MetricsTrackingStateTestBase<K> {
-    protected static final int SAMPLE_INTERVAL = 10;
+    protected static final int DEFAULT_SAMPLE_INTERVAL = 10;
 
     protected AbstractKeyedStateBackend<K> 
createKeyedBackend(TypeSerializer<K> keySerializer)
             throws Exception {
+        return createKeyedBackend(keySerializer, DEFAULT_SAMPLE_INTERVAL);
+    }
+
+    protected AbstractKeyedStateBackend<K> createKeyedBackend(
+            TypeSerializer<K> keySerializer, int sampleInterval) throws 
Exception {
 
         Environment env = new DummyEnvironment();
         KeyGroupRange keyGroupRange = new KeyGroupRange(0, 127);
@@ -61,8 +66,8 @@ abstract class MetricsTrackingStateTestBase<K> {
         Configuration configuration = new Configuration();
         configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, 
true);
         configuration.set(StateSizeTrackOptions.SIZE_TRACK_ENABLED, true);
-        
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 
SAMPLE_INTERVAL);
-        configuration.set(StateSizeTrackOptions.SIZE_TRACK_SAMPLE_INTERVAL, 
SAMPLE_INTERVAL);
+        
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 
sampleInterval);
+        configuration.set(StateSizeTrackOptions.SIZE_TRACK_SAMPLE_INTERVAL, 
sampleInterval);
         // use a very large value to not let metrics data overridden.
         int historySize = 1000_000;
         configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE, 
historySize);
@@ -127,8 +132,8 @@ abstract class MetricsTrackingStateTestBase<K> {
             assertThat(latencyTrackingStateMetric.getClearCount()).isZero();
 
             setCurrentKey(keyedBackend);
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 latencyTrackingState.clear();
                 
assertThat(latencyTrackingStateMetric.getClearCount()).isEqualTo(expectedResult);
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
index 3883b11816e..eaf89c4a31d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
@@ -65,8 +65,8 @@ class MetricsTrackingValueStateTest extends 
MetricsTrackingStateTestBase<Integer
             assertThat(latencyTrackingStateMetric.getGetCount()).isZero();
 
             setCurrentKey(keyedBackend);
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 
latencyTrackingState.update(ThreadLocalRandom.current().nextLong());
                 
assertThat(latencyTrackingStateMetric.getUpdateCount()).isEqualTo(expectedResult);
 
@@ -97,8 +97,8 @@ class MetricsTrackingValueStateTest extends 
MetricsTrackingStateTestBase<Integer
             assertThat(sizeTrackingStateMetric.getGetCount()).isZero();
 
             setCurrentKey(keyedBackend);
-            for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
-                int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+            for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+                int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 : 
index;
                 
sizeTrackingState.update(ThreadLocalRandom.current().nextLong());
                 
assertThat(sizeTrackingStateMetric.getUpdateCount()).isEqualTo(expectedResult);
 
@@ -112,4 +112,48 @@ class MetricsTrackingValueStateTest extends 
MetricsTrackingStateTestBase<Integer
             }
         }
     }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    void testTrackingWithoutSampling() throws Exception {
+        final int noSamplingSampleInterval = 1;
+        AbstractKeyedStateBackend<Integer> keyedBackend =
+                createKeyedBackend(getKeySerializer(), 
noSamplingSampleInterval);
+        try {
+            MetricsTrackingValueState<Integer, VoidNamespace, Long> 
latencyTrackingState =
+                    (MetricsTrackingValueState)
+                            createMetricsTrackingState(keyedBackend, 
getStateDescriptor());
+            latencyTrackingState.setCurrentNamespace(VoidNamespace.INSTANCE);
+            MetricsTrackingValueState.ValueStateMetrics 
latencyTrackingStateMetric =
+                    latencyTrackingState.getLatencyTrackingStateMetric();
+
+            assertThat(latencyTrackingStateMetric.getUpdateCount()).isZero();
+            assertThat(latencyTrackingStateMetric.getGetCount()).isZero();
+
+            setCurrentKey(keyedBackend);
+            for (int index = 1; index <= 3; index++) {
+                int expectedResult = index;
+                
latencyTrackingState.update(ThreadLocalRandom.current().nextLong());
+                assertThat(
+                                latencyTrackingStateMetric
+                                        .getHistogramMetrics()
+                                        .get("valueStateUpdateLatency")
+                                        .getCount())
+                        .isEqualTo(expectedResult);
+
+                latencyTrackingState.value();
+                assertThat(
+                                latencyTrackingStateMetric
+                                        .getHistogramMetrics()
+                                        .get("valueStateGetLatency")
+                                        .getCount())
+                        .isEqualTo(expectedResult);
+            }
+        } finally {
+            if (keyedBackend != null) {
+                keyedBackend.close();
+                keyedBackend.dispose();
+            }
+        }
+    }
 }

Reply via email to