This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0dd84711743 KAFKA-19748: fix metrics leak in Kafka Streams (#20633)
0dd84711743 is described below
commit 0dd84711743e8e0bd2e3840d1320896fa38d1a43
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Oct 3 15:27:51 2025 -0700
KAFKA-19748: fix metrics leak in Kafka Streams (#20633)
This PR fixes a leak in StreamsMetricImpl not removing a
store-level-metric correctly, and thus leaking objects.
Reviewers: Eduwer Camacaro <[email protected]>, Bill Bejeck
<[email protected]>
---
.../streams/internals/metrics/OpenIterators.java | 2 +-
.../internals/metrics/StreamsMetricsImpl.java | 26 ++++++++++++++++++++++
.../state/internals/MeteredKeyValueStore.java | 8 ++++---
.../internals/metrics/OpenIteratorsTest.java | 4 ++--
4 files changed, 34 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
index 736af467bd2..b3cb052b104 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
@@ -62,7 +62,7 @@ public class OpenIterators {
public void remove(final MeteredIterator iterator) {
if (openIterators.size() == 1) {
- streamsMetrics.removeMetric(metricName);
+ streamsMetrics.removeStoreLevelMetric(metricName);
}
openIterators.remove(iterator);
updateOldestStartTimestamp();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 7c22c94e9af..a0999a36c60 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -41,12 +41,14 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
public class StreamsMetricsImpl implements StreamsMetrics {
@@ -339,6 +341,30 @@ public class StreamsMetricsImpl implements StreamsMetrics {
metrics.removeMetric(metricName);
}
+ public void removeStoreLevelMetric(final MetricName metricName) {
+ metrics.removeMetric(metricName);
+
+ final List<String> metricsScopeCandidates =
metricName.tags().keySet().stream()
+ .filter(tag -> !tag.equals(THREAD_ID_TAG) &&
!tag.equals(TASK_ID_TAG))
+ .collect(Collectors.toList());
+ if (metricsScopeCandidates.size() != 1) {
+ // should never happen
+ throw new IllegalStateException("Expected exactly one metric scope
tag, but found " + metricsScopeCandidates);
+ }
+
+ final Deque<MetricName> metricsForStore = storeLevelMetrics.get(
+ storeSensorPrefix(
+ metricName.tags().get(THREAD_ID_TAG),
+ metricName.tags().get(TASK_ID_TAG),
+ metricName.tags().get(metricsScopeCandidates.get(0))
+ )
+ );
+
+ if (metricsForStore != null) {
+ metricsForStore.remove(metricName);
+ }
+ }
+
public Map<String, String> taskLevelTagMap(final String threadId, final
String taskId) {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG, threadId);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 9c033d6bbd5..a89bf0c62c1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -487,9 +487,11 @@ public class MeteredKeyValueStore<K, V>
private final long startTimestamp;
private final Function<byte[], V> valueDeserializer;
- private MeteredKeyValueTimestampedIterator(final
KeyValueIterator<Bytes, byte[]> iter,
- final Sensor sensor,
- final Function<byte[], V>
valueDeserializer) {
+ private MeteredKeyValueTimestampedIterator(
+ final KeyValueIterator<Bytes, byte[]> iter,
+ final Sensor sensor,
+ final Function<byte[], V> valueDeserializer
+ ) {
this.iter = iter;
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
index 3464ecbdaee..daaacb7bec6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
@@ -58,11 +58,11 @@ public class OpenIteratorsTest {
assertThat(gauge.value(null, 0), is(2L));
openIterators.remove(meteredIterator2);
- verify(streamsMetrics, never()).removeMetric(any());
+ verify(streamsMetrics, never()).removeStoreLevelMetric(any());
assertThat(gauge.value(null, 0), is(5L));
openIterators.remove(meteredIterator1);
- verify(streamsMetrics).removeMetric(any());
+ verify(streamsMetrics).removeStoreLevelMetric(any());
assertThat(gauge.value(null, 0), is(5L));
openIterators.add(meteredIterator3);