This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new ce248ab0d6c KAFKA-19748: fix metrics leak in Kafka Streams (#20633)
ce248ab0d6c is described below

commit ce248ab0d6c1590054e02c40eca7f8bb216ed05a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Oct 3 15:27:51 2025 -0700

    KAFKA-19748: fix metrics leak in Kafka Streams (#20633)
    
    This PR fixes a leak in StreamsMetricImpl not removing a
    store-level-metric correctly, and thus leaking objects.
    
    Reviewers: Eduwer Camacaro <[email protected]>, Bill Bejeck
     <[email protected]>
---
 .../streams/internals/metrics/OpenIterators.java   |  1 +
 .../internals/metrics/StreamsMetricsImpl.java      | 26 ++++++++++++++++++++++
 .../state/internals/MeteredKeyValueStore.java      |  8 ++++---
 .../internals/metrics/OpenIteratorsTest.java       |  4 ++--
 4 files changed, 34 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
index 2ef50d54abf..bdeed2f8978 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
@@ -66,6 +66,7 @@ public class OpenIterators {
     public void remove(final MeteredIterator iterator) {
         if (numOpenIterators.intValue() == 1) {
             streamsMetrics.removeMetric(metricName);
+            streamsMetrics.removeStoreLevelMetric(metricName);
         }
         numOpenIterators.decrement();
         openIterators.remove(iterator);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 7c22c94e9af..a0999a36c60 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -41,12 +41,14 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 public class StreamsMetricsImpl implements StreamsMetrics {
 
@@ -339,6 +341,30 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         metrics.removeMetric(metricName);
     }
 
+    public void removeStoreLevelMetric(final MetricName metricName) {
+        metrics.removeMetric(metricName);
+
+        final List<String> metricsScopeCandidates = 
metricName.tags().keySet().stream()
+            .filter(tag -> !tag.equals(THREAD_ID_TAG) && 
!tag.equals(TASK_ID_TAG))
+            .collect(Collectors.toList());
+        if (metricsScopeCandidates.size() != 1) {
+            // should never happen
+            throw new IllegalStateException("Expected exactly one metric scope 
tag, but found " + metricsScopeCandidates);
+        }
+
+        final Deque<MetricName> metricsForStore = storeLevelMetrics.get(
+            storeSensorPrefix(
+                metricName.tags().get(THREAD_ID_TAG),
+                metricName.tags().get(TASK_ID_TAG),
+                metricName.tags().get(metricsScopeCandidates.get(0))
+            )
+        );
+
+        if (metricsForStore != null) {
+            metricsForStore.remove(metricName);
+        }
+    }
+
     public Map<String, String> taskLevelTagMap(final String threadId, final 
String taskId) {
         final Map<String, String> tagMap = new LinkedHashMap<>();
         tagMap.put(THREAD_ID_TAG, threadId);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 8678111f989..32d1ee91437 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -482,9 +482,11 @@ public class MeteredKeyValueStore<K, V>
         private final long startTimestamp;
         private final Function<byte[], V> valueDeserializer;
 
-        private MeteredKeyValueTimestampedIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
-                                        final Sensor sensor,
-                                        final Function<byte[], V> 
valueDeserializer) {
+        private MeteredKeyValueTimestampedIterator(
+            final KeyValueIterator<Bytes, byte[]> iter,
+            final Sensor sensor,
+            final Function<byte[], V> valueDeserializer
+        ) {
             this.iter = iter;
             this.sensor = sensor;
             this.valueDeserializer = valueDeserializer;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
index 3464ecbdaee..daaacb7bec6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
@@ -58,11 +58,11 @@ public class OpenIteratorsTest {
         assertThat(gauge.value(null, 0), is(2L));
 
         openIterators.remove(meteredIterator2);
-        verify(streamsMetrics, never()).removeMetric(any());
+        verify(streamsMetrics, never()).removeStoreLevelMetric(any());
         assertThat(gauge.value(null, 0), is(5L));
 
         openIterators.remove(meteredIterator1);
-        verify(streamsMetrics).removeMetric(any());
+        verify(streamsMetrics).removeStoreLevelMetric(any());
         assertThat(gauge.value(null, 0), is(5L));
 
         openIterators.add(meteredIterator3);

Reply via email to