This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 80e681790b3 KAFKA-17895: Add num-keys metric for in-memory state
stores (#21617)
80e681790b3 is described below
commit 80e681790b3e1a9d7b7f400758bcbbb46d8311a0
Author: Evan Zhou <[email protected]>
AuthorDate: Thu Mar 12 16:46:01 2026 -0500
KAFKA-17895: Add num-keys metric for in-memory state stores (#21617)
This PR implements KIP-1250: Add metric to track size of in-memory state
stores. Today in Kafka Streams, we have the `estimate-num-keys` metric
for RocksDB, which tracks the number of keys in the state store at any
given moment. An equivalent metric is missing for in-memory state
stores, which we aim to fill with this KIP/PR.
Reviewers: Bill Bejeck <[email protected]>
Jira: https://issues.apache.org/jira/browse/KAFKA-17895 KIP:
https://cwiki.apache.org/confluence/x/noTMFw
---
.../state/internals/InMemorySessionStore.java | 7 +++
.../state/internals/InMemoryWindowStore.java | 6 ++
.../state/internals/MeteredKeyValueStore.java | 4 ++
.../state/internals/MeteredSessionStore.java | 18 ++++++
.../state/internals/MeteredWindowStore.java | 18 ++++++
.../state/internals/metrics/StateStoreMetrics.java | 20 +++++++
.../state/internals/InMemorySessionStoreTest.java | 70 ++++++++++++++++++++--
.../state/internals/InMemoryWindowStoreTest.java | 47 +++++++++++++++
.../state/internals/MeteredKeyValueStoreTest.java | 11 ++++
.../state/internals/MeteredSessionStoreTest.java | 11 ++++
.../state/internals/MeteredWindowStoreTest.java | 10 ++++
.../internals/metrics/StateStoreMetricsTest.java | 18 ++++++
12 files changed, 236 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 1a5c3197442..8bd7185139a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -381,6 +381,13 @@ public class InMemorySessionStore implements
SessionStore<Bytes, byte[]> {
open = false;
}
+ long numEntries() {
+ return endTimeMap.values().stream()
+ .flatMap(keyMap -> keyMap.values().stream())
+ .mapToLong(Map::size)
+ .sum();
+ }
+
private void removeExpiredSegments() {
long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod +
1);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index c883576977d..8d2228db5c3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -403,6 +403,12 @@ public class InMemoryWindowStore implements
WindowStore<Bytes, byte[]> {
open = false;
}
+ long numEntries() {
+ return segmentMap.values().stream()
+ .mapToLong(Map::size)
+ .sum();
+ }
+
private void removeExpiredSegments() {
long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod +
1);
for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 0535e2f89e9..cd6fa23fa53 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -177,6 +177,10 @@ public class MeteredKeyValueStore<K, V>
}
}
);
+ if (!persistent()) {
+ StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope,
name(), streamsMetrics,
+ (config, now) -> wrapped().approximateNumEntries());
+ }
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index d27095e19fd..cf5e00d403b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -138,6 +138,24 @@ public class MeteredSessionStore<K, V>
}
}
);
+ if (!persistent()) {
+ StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope,
name(), streamsMetrics,
+ (config, now) -> {
+ final InMemorySessionStore inMemoryStore =
findInMemorySessionStore(wrapped());
+ return inMemoryStore != null ?
inMemoryStore.numEntries() : -1L;
+ }
+ );
+ }
+ }
+
+ private static InMemorySessionStore findInMemorySessionStore(final
StateStore store) {
+ if (store instanceof InMemorySessionStore) {
+ return (InMemorySessionStore) store;
+ } else if (store instanceof WrappedStateStore) {
+ return findInMemorySessionStore(((WrappedStateStore<?, ?, ?>)
store).wrapped());
+ } else {
+ return null;
+ }
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index db016071511..2658772b7bd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -158,6 +158,24 @@ public class MeteredWindowStore<K, V>
}
}
);
+ if (!persistent()) {
+ StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope,
name(), streamsMetrics,
+ (config, now) -> {
+ final InMemoryWindowStore inMemoryStore =
findInMemoryWindowStore(wrapped());
+ return inMemoryStore != null ?
inMemoryStore.numEntries() : -1L;
+ }
+ );
+ }
+ }
+
+ private static InMemoryWindowStore findInMemoryWindowStore(final
StateStore store) {
+ if (store instanceof InMemoryWindowStore) {
+ return (InMemoryWindowStore) store;
+ } else if (store instanceof WrappedStateStore) {
+ return findInMemoryWindowStore(((WrappedStateStore<?, ?, ?>)
store).wrapped());
+ } else {
+ return null;
+ }
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index cfaece063e4..356d26bbe97 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -158,6 +158,10 @@ public class StateStoreMetrics {
private static final String ITERATOR_DURATION_MAX_DESCRIPTION =
MAX_DESCRIPTION_PREFIX + ITERATOR_DURATION_DESCRIPTION;
+ private static final String NUM_KEYS = "num-keys";
+ private static final String NUM_KEYS_DESCRIPTION =
+ "The current number of keys in the in-memory state store";
+
private static final String OLDEST_ITERATOR_OPEN_SINCE_MS =
"oldest-iterator-open-since-ms";
private static final String OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION =
"The UNIX timestamp the oldest still open iterator was created, in
milliseconds";
@@ -515,4 +519,20 @@ public class StateStoreMetrics {
);
return sensor;
}
+
+ public static void addNumKeysGauge(final String taskId,
+ final String storeType,
+ final String storeName,
+ final StreamsMetricsImpl streamsMetrics,
+ final Gauge<Long> numKeysGauge) {
+ streamsMetrics.addStoreLevelMutableMetric(
+ taskId,
+ storeType,
+ storeName,
+ NUM_KEYS,
+ NUM_KEYS_DESCRIPTION,
+ RecordingLevel.INFO,
+ numKeysGauge
+ );
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
index 4769fde37e7..eca2fa4502f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -35,17 +36,78 @@ public class InMemorySessionStoreTest extends
AbstractSessionBytesStoreTest {
return StoreType.InMemoryStore;
}
+ @Test
+ public void shouldCountNumEntries() {
+ final InMemorySessionStore store = new InMemorySessionStore("test",
RETENTION_PERIOD, "scope");
+ store.init(context, store);
+
+ assertEquals(0L, store.numEntries());
+
+ store.put(
+ new Windowed<>(
+ Bytes.wrap("a".getBytes()),
+ new SessionWindow(0, 0)
+ ),
+ "1".getBytes()
+ );
+ assertEquals(1L, store.numEntries());
+
+ store.put(
+ new Windowed<>(
+ Bytes.wrap("b".getBytes()),
+ new SessionWindow(0, 10)
+ ),
+ "2".getBytes()
+ );
+ assertEquals(2L, store.numEntries());
+
+ store.put(
+ new Windowed<>(
+ Bytes.wrap("a".getBytes()),
+ new SessionWindow(5, 15)
+ ),
+ "3".getBytes()
+ );
+ assertEquals(3L, store.numEntries());
+
+ // remove one entry
+ store.remove(
+ new Windowed<>(
+ Bytes.wrap("a".getBytes()),
+ new SessionWindow(0, 0)
+ )
+ );
+ assertEquals(2L, store.numEntries());
+
+ store.close();
+ }
+
@Test
public void shouldNotExpireFromOpenIterator() {
- sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
- sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
- sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
+ sessionStore.put(
+ new Windowed<>("a", new SessionWindow(0, 0)),
+ 1L
+ );
+ sessionStore.put(
+ new Windowed<>("aa", new SessionWindow(0, 10)),
+ 2L
+ );
+ sessionStore.put(
+ new Windowed<>("a", new SessionWindow(10, 20)),
+ 3L
+ );
final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", "b", 0L, RETENTION_PERIOD);
// Advance stream time to expire the first three record
- sessionStore.put(new Windowed<>("aa", new SessionWindow(100, 2 *
RETENTION_PERIOD)), 4L);
+ sessionStore.put(
+ new Windowed<>(
+ "aa",
+ new SessionWindow(100, 2 * RETENTION_PERIOD)
+ ),
+ 4L
+ );
assertEquals(Set.of(1L, 2L, 3L, 4L), valuesToSet(iterator));
assertFalse(iterator.hasNext());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index b39449cca3a..cdc23218e73 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -84,6 +84,53 @@ public class InMemoryWindowStoreTest extends
AbstractWindowBytesStoreTest {
.build();
}
+ @Test
+ public void shouldCountNumEntries() {
+ final InMemoryWindowStore store = new InMemoryWindowStore("test",
RETENTION_PERIOD, WINDOW_SIZE, false, "scope");
+ store.init(context, store);
+
+ assertEquals(0L, store.numEntries());
+
+ store.put(
+ Bytes.wrap("a".getBytes()),
+ "1".getBytes(),
+ 0L
+ );
+ assertEquals(1L, store.numEntries());
+
+ store.put(
+ Bytes.wrap("b".getBytes()),
+ "2".getBytes(),
+ 0L
+ );
+ assertEquals(2L, store.numEntries());
+
+ store.put(
+ Bytes.wrap("a".getBytes()),
+ "3".getBytes(),
+ 10L
+ );
+ assertEquals(3L, store.numEntries());
+
+ // overwrite existing entry (same key, same timestamp)
+ store.put(
+ Bytes.wrap("a".getBytes()),
+ "4".getBytes(),
+ 0L
+ );
+ assertEquals(3L, store.numEntries());
+
+ // delete entry by putting null
+ store.put(
+ Bytes.wrap("b".getBytes()),
+ null,
+ 0L
+ );
+ assertEquals(2L, store.numEntries());
+
+ store.close();
+ }
+
@SuppressWarnings("unchecked")
@Test
public void shouldRestore() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index f4a9efa0800..69f9f710039 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -470,6 +470,17 @@ public class MeteredKeyValueStoreTest {
assertTrue((Double) metric.metricValue() > 0);
}
+ @Test
+ public void shouldTrackNumKeysMetric() {
+ setUp();
+ when(inner.approximateNumEntries()).thenReturn(42L);
+ init();
+
+ final KafkaMetric numKeysMetric = metric("num-keys");
+ assertThat(numKeysMetric, not(nullValue()));
+ assertThat((Long) numKeysMetric.metricValue(), equalTo(42L));
+ }
+
@SuppressWarnings("unused")
@Test
public void shouldTrackOpenIteratorsMetric() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 06023d6256a..f3ca4cf3786 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -655,6 +655,17 @@ public class MeteredSessionStoreTest {
assertThat(storeMetrics(), empty());
}
+ @Test
+ public void shouldTrackNumKeysMetric() {
+ setUp();
+ init();
+
+ final KafkaMetric numKeysMetric = metric("num-keys");
+ assertThat(numKeysMetric, not(nullValue()));
+ // inner store is a mock (not InMemorySessionStore), so returns -1
+ assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
+ }
+
@SuppressWarnings("unused")
@Test
public void shouldTrackOpenIteratorsMetric() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 2538873396f..4af34c9bd8d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -442,6 +442,16 @@ public class MeteredWindowStoreTest {
assertThrows(NullPointerException.class, () ->
store.backwardFetch(null, 0L, 1L));
}
+ @Test
+ public void shouldTrackNumKeysMetric() {
+ store.init(context, store);
+
+ final KafkaMetric numKeysMetric = metric("num-keys");
+ assertThat(numKeysMetric, not(nullValue()));
+ // inner store is a mock (not InMemoryWindowStore), so returns -1
+ assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
+ }
+
@SuppressWarnings("unused")
@Test
public void shouldTrackOpenIteratorsMetric() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
index 3b814db6679..151d78da06c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
@@ -300,6 +300,24 @@ public class StateStoreMetricsTest {
);
}
+ @Test
+ public void shouldAddNumKeysGauge() {
+ @SuppressWarnings("unchecked")
+ final org.apache.kafka.common.metrics.Gauge<Long> gauge =
mock(org.apache.kafka.common.metrics.Gauge.class);
+
+ StateStoreMetrics.addNumKeysGauge(TASK_ID, STORE_TYPE, STORE_NAME,
streamsMetrics, gauge);
+
+ org.mockito.Mockito.verify(streamsMetrics).addStoreLevelMutableMetric(
+ TASK_ID,
+ STORE_TYPE,
+ STORE_NAME,
+ "num-keys",
+ "The current number of keys in the in-memory state store",
+ RecordingLevel.INFO,
+ gauge
+ );
+ }
+
@Test
public void shouldGetRecordE2ELatencySensor() {
final String metricName = "record-e2e-latency";