This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 80e681790b3 KAFKA-17895: Add num-keys metric for in-memory state 
stores (#21617)
80e681790b3 is described below

commit 80e681790b3e1a9d7b7f400758bcbbb46d8311a0
Author: Evan Zhou <[email protected]>
AuthorDate: Thu Mar 12 16:46:01 2026 -0500

    KAFKA-17895: Add num-keys metric for in-memory state stores (#21617)
    
    This PR implements KIP-1250: Add metric to track size of in-memory state
    stores. Today in Kafka Streams, we have the `estimate-num-keys` metric
    for RocksDB, which tracks the number of keys in the state store at any
    given moment. An equivalent metric is missing for in-memory state
    stores, which we aim to fill with this KIP/PR.
    
    Reviewers: Bill Bejeck <[email protected]>
    
    Jira: https://issues.apache.org/jira/browse/KAFKA-17895  KIP:
    https://cwiki.apache.org/confluence/x/noTMFw
---
 .../state/internals/InMemorySessionStore.java      |  7 +++
 .../state/internals/InMemoryWindowStore.java       |  6 ++
 .../state/internals/MeteredKeyValueStore.java      |  4 ++
 .../state/internals/MeteredSessionStore.java       | 18 ++++++
 .../state/internals/MeteredWindowStore.java        | 18 ++++++
 .../state/internals/metrics/StateStoreMetrics.java | 20 +++++++
 .../state/internals/InMemorySessionStoreTest.java  | 70 ++++++++++++++++++++--
 .../state/internals/InMemoryWindowStoreTest.java   | 47 +++++++++++++++
 .../state/internals/MeteredKeyValueStoreTest.java  | 11 ++++
 .../state/internals/MeteredSessionStoreTest.java   | 11 ++++
 .../state/internals/MeteredWindowStoreTest.java    | 10 ++++
 .../internals/metrics/StateStoreMetricsTest.java   | 18 ++++++
 12 files changed, 236 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 1a5c3197442..8bd7185139a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -381,6 +381,13 @@ public class InMemorySessionStore implements 
SessionStore<Bytes, byte[]> {
         open = false;
     }
 
+    long numEntries() {
+        return endTimeMap.values().stream()
+            .flatMap(keyMap -> keyMap.values().stream())
+            .mapToLong(Map::size)
+            .sum();
+    }
+
     private void removeExpiredSegments() {
         long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 
1);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index c883576977d..8d2228db5c3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -403,6 +403,12 @@ public class InMemoryWindowStore implements 
WindowStore<Bytes, byte[]> {
         open = false;
     }
 
+    long numEntries() {
+        return segmentMap.values().stream()
+            .mapToLong(Map::size)
+            .sum();
+    }
+
     private void removeExpiredSegments() {
         long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 
1);
         for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 0535e2f89e9..cd6fa23fa53 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -177,6 +177,10 @@ public class MeteredKeyValueStore<K, V>
                 }
             }
         );
+        if (!persistent()) {
+            StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, 
name(), streamsMetrics,
+                    (config, now) -> wrapped().approximateNumEntries());
+        }
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index d27095e19fd..cf5e00d403b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -138,6 +138,24 @@ public class MeteredSessionStore<K, V>
                 }
             }
         );
+        if (!persistent()) {
+            StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, 
name(), streamsMetrics,
+                    (config, now) -> {
+                        final InMemorySessionStore inMemoryStore = 
findInMemorySessionStore(wrapped());
+                        return inMemoryStore != null ? 
inMemoryStore.numEntries() : -1L;
+                    }
+            );
+        }
+    }
+
+    private static InMemorySessionStore findInMemorySessionStore(final 
StateStore store) {
+        if (store instanceof InMemorySessionStore) {
+            return (InMemorySessionStore) store;
+        } else if (store instanceof WrappedStateStore) {
+            return findInMemorySessionStore(((WrappedStateStore<?, ?, ?>) 
store).wrapped());
+        } else {
+            return null;
+        }
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index db016071511..2658772b7bd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -158,6 +158,24 @@ public class MeteredWindowStore<K, V>
                 }
             }
         );
+        if (!persistent()) {
+            StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, 
name(), streamsMetrics,
+                    (config, now) -> {
+                        final InMemoryWindowStore inMemoryStore = 
findInMemoryWindowStore(wrapped());
+                        return inMemoryStore != null ? 
inMemoryStore.numEntries() : -1L;
+                    }
+            );
+        }
+    }
+
+    private static InMemoryWindowStore findInMemoryWindowStore(final 
StateStore store) {
+        if (store instanceof InMemoryWindowStore) {
+            return (InMemoryWindowStore) store;
+        } else if (store instanceof WrappedStateStore) {
+            return findInMemoryWindowStore(((WrappedStateStore<?, ?, ?>) 
store).wrapped());
+        } else {
+            return null;
+        }
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index cfaece063e4..356d26bbe97 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -158,6 +158,10 @@ public class StateStoreMetrics {
     private static final String ITERATOR_DURATION_MAX_DESCRIPTION =
             MAX_DESCRIPTION_PREFIX + ITERATOR_DURATION_DESCRIPTION;
 
+    private static final String NUM_KEYS = "num-keys";
+    private static final String NUM_KEYS_DESCRIPTION =
+            "The current number of keys in the in-memory state store";
+
     private static final String OLDEST_ITERATOR_OPEN_SINCE_MS = 
"oldest-iterator-open-since-ms";
     private static final String OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION =
             "The UNIX timestamp the oldest still open iterator was created, in 
milliseconds";
@@ -515,4 +519,20 @@ public class StateStoreMetrics {
         );
         return sensor;
     }
+
+    public static void addNumKeysGauge(final String taskId,
+                                       final String storeType,
+                                       final String storeName,
+                                       final StreamsMetricsImpl streamsMetrics,
+                                       final Gauge<Long> numKeysGauge) {
+        streamsMetrics.addStoreLevelMutableMetric(
+                taskId,
+                storeType,
+                storeName,
+                NUM_KEYS,
+                NUM_KEYS_DESCRIPTION,
+                RecordingLevel.INFO,
+                numKeysGauge
+        );
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
index 4769fde37e7..eca2fa4502f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -35,17 +36,78 @@ public class InMemorySessionStoreTest extends 
AbstractSessionBytesStoreTest {
         return StoreType.InMemoryStore;
     }
 
+    @Test
+    public void shouldCountNumEntries() {
+        final InMemorySessionStore store = new InMemorySessionStore("test", 
RETENTION_PERIOD, "scope");
+        store.init(context, store);
+
+        assertEquals(0L, store.numEntries());
+
+        store.put(
+                new Windowed<>(
+                        Bytes.wrap("a".getBytes()),
+                        new SessionWindow(0, 0)
+                ),
+                "1".getBytes()
+        );
+        assertEquals(1L, store.numEntries());
+
+        store.put(
+                new Windowed<>(
+                        Bytes.wrap("b".getBytes()),
+                        new SessionWindow(0, 10)
+                ),
+                "2".getBytes()
+        );
+        assertEquals(2L, store.numEntries());
+
+        store.put(
+                new Windowed<>(
+                        Bytes.wrap("a".getBytes()),
+                        new SessionWindow(5, 15)
+                ),
+                "3".getBytes()
+        );
+        assertEquals(3L, store.numEntries());
+
+        // remove one entry
+        store.remove(
+                new Windowed<>(
+                        Bytes.wrap("a".getBytes()),
+                        new SessionWindow(0, 0)
+                )
+        );
+        assertEquals(2L, store.numEntries());
+
+        store.close();
+    }
+
     @Test
     public void shouldNotExpireFromOpenIterator() {
 
-        sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
-        sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
-        sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
+        sessionStore.put(
+                new Windowed<>("a", new SessionWindow(0, 0)),
+                1L
+        );
+        sessionStore.put(
+                new Windowed<>("aa", new SessionWindow(0, 10)),
+                2L
+        );
+        sessionStore.put(
+                new Windowed<>("a", new SessionWindow(10, 20)),
+                3L
+        );
 
         final KeyValueIterator<Windowed<String>, Long> iterator = 
sessionStore.findSessions("a", "b", 0L, RETENTION_PERIOD);
 
         // Advance stream time to expire the first three record
-        sessionStore.put(new Windowed<>("aa", new SessionWindow(100, 2 * 
RETENTION_PERIOD)), 4L);
+        sessionStore.put(
+                new Windowed<>(
+                        "aa",
+                        new SessionWindow(100, 2 * RETENTION_PERIOD)
+                ),
+                4L
+        );
 
         assertEquals(Set.of(1L, 2L, 3L, 4L), valuesToSet(iterator));
         assertFalse(iterator.hasNext());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index b39449cca3a..cdc23218e73 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -84,6 +84,53 @@ public class InMemoryWindowStoreTest extends 
AbstractWindowBytesStoreTest {
             .build();
     }
 
+    @Test
+    public void shouldCountNumEntries() {
+        final InMemoryWindowStore store = new InMemoryWindowStore("test", 
RETENTION_PERIOD, WINDOW_SIZE, false, "scope");
+        store.init(context, store);
+
+        assertEquals(0L, store.numEntries());
+
+        store.put(
+                Bytes.wrap("a".getBytes()),
+                "1".getBytes(),
+                0L
+        );
+        assertEquals(1L, store.numEntries());
+
+        store.put(
+                Bytes.wrap("b".getBytes()),
+                "2".getBytes(),
+                0L
+        );
+        assertEquals(2L, store.numEntries());
+
+        store.put(
+                Bytes.wrap("a".getBytes()),
+                "3".getBytes(),
+                10L
+        );
+        assertEquals(3L, store.numEntries());
+
+        // overwrite existing entry (same key, same timestamp)
+        store.put(
+                Bytes.wrap("a".getBytes()),
+                "4".getBytes(),
+                0L
+        );
+        assertEquals(3L, store.numEntries());
+
+        // delete entry by putting null
+        store.put(
+                Bytes.wrap("b".getBytes()),
+                null,
+                0L
+        );
+        assertEquals(2L, store.numEntries());
+
+        store.close();
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldRestore() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index f4a9efa0800..69f9f710039 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -470,6 +470,17 @@ public class MeteredKeyValueStoreTest {
         assertTrue((Double) metric.metricValue() > 0);
     }
 
+    @Test
+    public void shouldTrackNumKeysMetric() {
+        setUp();
+        when(inner.approximateNumEntries()).thenReturn(42L);
+        init();
+
+        final KafkaMetric numKeysMetric = metric("num-keys");
+        assertThat(numKeysMetric, not(nullValue()));
+        assertThat((Long) numKeysMetric.metricValue(), equalTo(42L));
+    }
+
     @SuppressWarnings("unused")
     @Test
     public void shouldTrackOpenIteratorsMetric() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 06023d6256a..f3ca4cf3786 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -655,6 +655,17 @@ public class MeteredSessionStoreTest {
         assertThat(storeMetrics(), empty());
     }
 
+    @Test
+    public void shouldTrackNumKeysMetric() {
+        setUp();
+        init();
+
+        final KafkaMetric numKeysMetric = metric("num-keys");
+        assertThat(numKeysMetric, not(nullValue()));
+        // inner store is a mock (not InMemorySessionStore), so returns -1
+        assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
+    }
+
     @SuppressWarnings("unused")
     @Test
     public void shouldTrackOpenIteratorsMetric() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 2538873396f..4af34c9bd8d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -442,6 +442,16 @@ public class MeteredWindowStoreTest {
         assertThrows(NullPointerException.class, () -> 
store.backwardFetch(null, 0L, 1L));
     }
 
+    @Test
+    public void shouldTrackNumKeysMetric() {
+        store.init(context, store);
+
+        final KafkaMetric numKeysMetric = metric("num-keys");
+        assertThat(numKeysMetric, not(nullValue()));
+        // inner store is a mock (not InMemoryWindowStore), so returns -1
+        assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
+    }
+
     @SuppressWarnings("unused")
     @Test
     public void shouldTrackOpenIteratorsMetric() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
index 3b814db6679..151d78da06c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
@@ -300,6 +300,24 @@ public class StateStoreMetricsTest {
         );
     }
 
+    @Test
+    public void shouldAddNumKeysGauge() {
+        @SuppressWarnings("unchecked")
+        final org.apache.kafka.common.metrics.Gauge<Long> gauge = 
mock(org.apache.kafka.common.metrics.Gauge.class);
+
+        StateStoreMetrics.addNumKeysGauge(TASK_ID, STORE_TYPE, STORE_NAME, 
streamsMetrics, gauge);
+
+        org.mockito.Mockito.verify(streamsMetrics).addStoreLevelMutableMetric(
+            TASK_ID,
+            STORE_TYPE,
+            STORE_NAME,
+            "num-keys",
+            "The current number of keys in the in-memory state store",
+            RecordingLevel.INFO,
+            gauge
+        );
+    }
+
     @Test
     public void shouldGetRecordE2ELatencySensor() {
         final String metricName = "record-e2e-latency";

Reply via email to