This is an automated email from the ASF dual-hosted git repository.
1996fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8cce8e8f74f [FLINK-39790][state/metrics] Fix State latency/size
tracking metrics not exported without sampling
8cce8e8f74f is described below
commit 8cce8e8f74f84de90416c97059fbe8d94d2a8ba9
Author: Efrat Levitan <[email protected]>
AuthorDate: Tue May 26 12:00:17 2026 +0300
[FLINK-39790][state/metrics] Fix State latency/size tracking metrics not
exported without sampling
Previousely when set to
state.*-track.keyed-state-enabled: true
state.*-track.sample-interval: 1
metrics are not exported because for sampleInterval of 1,
StateMetricBase#loopUpdateCounter will always return 0.
---
.../runtime/state/metrics/StateMetricBase.java | 9 ++++
.../MetricsTrackingAggregatingStateTest.java | 8 ++--
.../metrics/MetricsTrackingListStateTest.java | 8 ++--
.../state/metrics/MetricsTrackingMapStateTest.java | 16 +++----
.../metrics/MetricsTrackingReducingStateTest.java | 8 ++--
.../metrics/MetricsTrackingStateTestBase.java | 15 ++++---
.../metrics/MetricsTrackingValueStateTest.java | 52 ++++++++++++++++++++--
7 files changed, 87 insertions(+), 29 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
index ca600e8018a..48423a96f3c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/StateMetricBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.metrics;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
@@ -61,6 +62,9 @@ class StateMetricBase implements AutoCloseable {
}
protected int loopUpdateCounter(int counter) {
+ if (sampleInterval == 1) {
+ return 1;
+ }
return (counter + 1 < sampleInterval) ? counter + 1 : 0;
}
@@ -84,4 +88,9 @@ class StateMetricBase implements AutoCloseable {
public void close() throws Exception {
histogramMetrics.clear();
}
+
+ @VisibleForTesting
+ Map<String, Histogram> getHistogramMetrics() {
+ return histogramMetrics;
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
index 8f576f6c276..bde33292f15 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingAggregatingStateTest.java
@@ -96,8 +96,8 @@ class MetricsTrackingAggregatingStateTest extends
MetricsTrackingStateTestBase<I
setCurrentKey(keyedBackend);
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
latencyTrackingState.add(random.nextLong());
assertThat(latencyTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
@@ -136,8 +136,8 @@ class MetricsTrackingAggregatingStateTest extends
MetricsTrackingStateTestBase<I
setCurrentKey(keyedBackend);
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
sizeTrackingState.add(random.nextLong());
assertThat(sizeTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
index c1d9bf415e3..719df14d9ac 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingListStateTest.java
@@ -70,8 +70,8 @@ class MetricsTrackingListStateTest extends
MetricsTrackingStateTestBase<Integer>
assertThat(latencyTrackingStateMetric.getMergeNamespaceCount()).isZero();
setCurrentKey(keyedBackend);
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
latencyTrackingState.add(ThreadLocalRandom.current().nextLong());
assertThat(latencyTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
@@ -118,8 +118,8 @@ class MetricsTrackingListStateTest extends
MetricsTrackingStateTestBase<Integer>
assertThat(sizeTrackingStateMetric.getMergeNamespaceCount()).isZero();
setCurrentKey(keyedBackend);
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
sizeTrackingState.add(ThreadLocalRandom.current().nextLong());
assertThat(sizeTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
index 51ff9efdbba..3e0e0d2f213 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingMapStateTest.java
@@ -81,8 +81,8 @@ class MetricsTrackingMapStateTest extends
MetricsTrackingStateTestBase<Integer>
setCurrentKey(keyedBackend);
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
latencyTrackingState.put(random.nextLong(),
random.nextDouble());
assertThat(latencyTrackingStateMetric.getPutCount()).isEqualTo(expectedResult);
@@ -153,8 +153,8 @@ class MetricsTrackingMapStateTest extends
MetricsTrackingStateTestBase<Integer>
setCurrentKey(keyedBackend);
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
sizeTrackingState.put(random.nextLong(), random.nextDouble());
assertThat(sizeTrackingStateMetric.getPutCount()).isEqualTo(expectedResult);
@@ -270,13 +270,13 @@ class MetricsTrackingMapStateTest extends
MetricsTrackingStateTestBase<Integer>
boolean removeIterator)
throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
latencyTrackingState.put((long) index, random.nextDouble());
}
int count = 1;
Iterator<E> iterator = iteratorSupplier.get();
while (iterator.hasNext()) {
- int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
+ int expectedResult = count == DEFAULT_SAMPLE_INTERVAL ? 0 : count;
assertThat(latencyTrackingStateMetric.getIteratorHasNextCount())
.isEqualTo(expectedResult);
@@ -303,13 +303,13 @@ class MetricsTrackingMapStateTest extends
MetricsTrackingStateTestBase<Integer>
boolean removeIterator)
throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
sizeTrackingState.put((long) index, random.nextDouble());
}
int count = 1;
Iterator<E> iterator = iteratorSupplier.get();
while (iterator.hasNext()) {
- int expectedResult = count == SAMPLE_INTERVAL ? 0 : count;
+ int expectedResult = count == DEFAULT_SAMPLE_INTERVAL ? 0 : count;
iterator.next();
assertThat(sizeTrackingStateMetric.getIteratorNextCount()).isEqualTo(expectedResult);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
index 383ef0d7195..2e4cdf645fc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingReducingStateTest.java
@@ -69,8 +69,8 @@ class MetricsTrackingReducingStateTest extends
MetricsTrackingStateTestBase<Inte
setCurrentKey(keyedBackend);
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
latencyTrackingState.add(random.nextLong());
assertThat(latencyTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
@@ -108,8 +108,8 @@ class MetricsTrackingReducingStateTest extends
MetricsTrackingStateTestBase<Inte
setCurrentKey(keyedBackend);
ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
sizeTrackingState.add(random.nextLong());
assertThat(sizeTrackingStateMetric.getAddCount()).isEqualTo(expectedResult);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
index fd686b16d64..d29f2661743 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingStateTestBase.java
@@ -50,10 +50,15 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test base for latency tracking state. */
abstract class MetricsTrackingStateTestBase<K> {
- protected static final int SAMPLE_INTERVAL = 10;
+ protected static final int DEFAULT_SAMPLE_INTERVAL = 10;
protected AbstractKeyedStateBackend<K>
createKeyedBackend(TypeSerializer<K> keySerializer)
throws Exception {
+ return createKeyedBackend(keySerializer, DEFAULT_SAMPLE_INTERVAL);
+ }
+
+ protected AbstractKeyedStateBackend<K> createKeyedBackend(
+ TypeSerializer<K> keySerializer, int sampleInterval) throws
Exception {
Environment env = new DummyEnvironment();
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 127);
@@ -61,8 +66,8 @@ abstract class MetricsTrackingStateTestBase<K> {
Configuration configuration = new Configuration();
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED,
true);
configuration.set(StateSizeTrackOptions.SIZE_TRACK_ENABLED, true);
-
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL,
SAMPLE_INTERVAL);
- configuration.set(StateSizeTrackOptions.SIZE_TRACK_SAMPLE_INTERVAL,
SAMPLE_INTERVAL);
+
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL,
sampleInterval);
+ configuration.set(StateSizeTrackOptions.SIZE_TRACK_SAMPLE_INTERVAL,
sampleInterval);
// use a very large value to not let metrics data overridden.
int historySize = 1000_000;
configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE,
historySize);
@@ -127,8 +132,8 @@ abstract class MetricsTrackingStateTestBase<K> {
assertThat(latencyTrackingStateMetric.getClearCount()).isZero();
setCurrentKey(keyedBackend);
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
latencyTrackingState.clear();
assertThat(latencyTrackingStateMetric.getClearCount()).isEqualTo(expectedResult);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
index 3883b11816e..eaf89c4a31d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/MetricsTrackingValueStateTest.java
@@ -65,8 +65,8 @@ class MetricsTrackingValueStateTest extends
MetricsTrackingStateTestBase<Integer
assertThat(latencyTrackingStateMetric.getGetCount()).isZero();
setCurrentKey(keyedBackend);
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
latencyTrackingState.update(ThreadLocalRandom.current().nextLong());
assertThat(latencyTrackingStateMetric.getUpdateCount()).isEqualTo(expectedResult);
@@ -97,8 +97,8 @@ class MetricsTrackingValueStateTest extends
MetricsTrackingStateTestBase<Integer
assertThat(sizeTrackingStateMetric.getGetCount()).isZero();
setCurrentKey(keyedBackend);
- for (int index = 1; index <= SAMPLE_INTERVAL; index++) {
- int expectedResult = index == SAMPLE_INTERVAL ? 0 : index;
+ for (int index = 1; index <= DEFAULT_SAMPLE_INTERVAL; index++) {
+ int expectedResult = index == DEFAULT_SAMPLE_INTERVAL ? 0 :
index;
sizeTrackingState.update(ThreadLocalRandom.current().nextLong());
assertThat(sizeTrackingStateMetric.getUpdateCount()).isEqualTo(expectedResult);
@@ -112,4 +112,48 @@ class MetricsTrackingValueStateTest extends
MetricsTrackingStateTestBase<Integer
}
}
}
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ void testTrackingWithoutSampling() throws Exception {
+ final int noSamplingSampleInterval = 1;
+ AbstractKeyedStateBackend<Integer> keyedBackend =
+ createKeyedBackend(getKeySerializer(),
noSamplingSampleInterval);
+ try {
+ MetricsTrackingValueState<Integer, VoidNamespace, Long>
latencyTrackingState =
+ (MetricsTrackingValueState)
+ createMetricsTrackingState(keyedBackend,
getStateDescriptor());
+ latencyTrackingState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ MetricsTrackingValueState.ValueStateMetrics
latencyTrackingStateMetric =
+ latencyTrackingState.getLatencyTrackingStateMetric();
+
+ assertThat(latencyTrackingStateMetric.getUpdateCount()).isZero();
+ assertThat(latencyTrackingStateMetric.getGetCount()).isZero();
+
+ setCurrentKey(keyedBackend);
+ for (int index = 1; index <= 3; index++) {
+ int expectedResult = index;
+
latencyTrackingState.update(ThreadLocalRandom.current().nextLong());
+ assertThat(
+ latencyTrackingStateMetric
+ .getHistogramMetrics()
+ .get("valueStateUpdateLatency")
+ .getCount())
+ .isEqualTo(expectedResult);
+
+ latencyTrackingState.value();
+ assertThat(
+ latencyTrackingStateMetric
+ .getHistogramMetrics()
+ .get("valueStateGetLatency")
+ .getCount())
+ .isEqualTo(expectedResult);
+ }
+ } finally {
+ if (keyedBackend != null) {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+ }
}