This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 16b797ad58d KAFKA-20571: Added store unwrapping for publishing
num-keys metric. (#22267)
16b797ad58d is described below
commit 16b797ad58d5ad4fa0c14bbf4b94fa13b0ed3d64
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Tue May 12 23:39:53 2026 -0700
KAFKA-20571: Added store unwrapping for publishing num-keys metric. (#22267)
Added store unwrapping to bypass the check in CachingKeyValueStore(if
it's used) following the same pattern as MeteredSessionStore
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>, Evan Zhou <[email protected]>
---
.../InMemoryStoreMetricsIntegrationTest.java | 268 +++++++++++++++++++++
.../state/internals/MeteredKeyValueStore.java | 6 +-
.../state/internals/MeteredSessionStore.java | 12 +-
.../state/internals/MeteredWindowStore.java | 12 +-
.../streams/state/internals/WrappedStateStore.java | 14 ++
.../state/internals/MeteredKeyValueStoreTest.java | 4 +-
6 files changed, 291 insertions(+), 25 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InMemoryStoreMetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InMemoryStoreMetricsIntegrationTest.java
new file mode 100644
index 00000000000..baf90e91f19
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InMemoryStoreMetricsIntegrationTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemorySessionStore;
+import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Timeout(120)
+@Tag("integration")
+public class InMemoryStoreMetricsIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+ private static final String INPUT_TOPIC = "in-memory-num-keys-input";
+
+ private final EmbeddedKafkaCluster cluster = new
EmbeddedKafkaCluster(NUM_BROKERS);
+ private String safeTestName;
+
+ @BeforeEach
+ public void startCluster(final TestInfo testInfo) throws
InterruptedException {
+ cluster.start();
+ cluster.createTopic(INPUT_TOPIC, 1, 1);
+ safeTestName = safeUniqueTestName(testInfo);
+ }
+
+ @AfterEach
+ public void closeCluster() {
+ cluster.stop();
+ }
+
+ @Test
+ public void
keyValueStoreMetricValueShouldNotThrowIfStoreIsNotInitialized() throws
Exception {
+ final CountDownLatch initLatch = new CountDownLatch(1);
+ final CountDownLatch finishLatch = new CountDownLatch(1);
+
+ final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(
+ new KeyValueBytesStoreSupplier() {
+ @Override
+ public String name() {
+ return "store";
+ }
+
+ @Override
+ public KeyValueStore<Bytes, byte[]> get() {
+ return new InMemoryKeyValueStore(name()) {
+ @Override
+ public void init(final StateStoreContext
stateStoreContext, final StateStore root) {
+ initLatch.countDown();
+ try {
+ finishLatch.await();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.init(stateStoreContext, root);
+ }
+ };
+ }
+
+ @Override
+ public String metricsScope() {
+ return "in-memory";
+ }
+ },
+ Serdes.String(),
+ Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+
+ test(storeBuilder, initLatch, finishLatch);
+ }
+
+ @Test
+ public void sessionStoreMetricValueShouldNotThrowIfStoreIsNotInitialized()
throws Exception {
+ final CountDownLatch initLatch = new CountDownLatch(1);
+ final CountDownLatch finishLatch = new CountDownLatch(1);
+
+ final long retentionMs = 60_000L;
+
+ final StoreBuilder<SessionStore<String, String>> storeBuilder =
Stores.sessionStoreBuilder(
+ new SessionBytesStoreSupplier() {
+ @Override
+ public String name() {
+ return "store";
+ }
+
+ @Override
+ public SessionStore<Bytes, byte[]> get() {
+ return new InMemorySessionStore(name(), retentionMs,
metricsScope()) {
+ @Override
+ public void init(final StateStoreContext
stateStoreContext, final StateStore root) {
+ initLatch.countDown();
+ try {
+ finishLatch.await();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.init(stateStoreContext, root);
+ }
+ };
+ }
+
+ @Override
+ public String metricsScope() {
+ return "in-memory-session";
+ }
+
+ @Override
+ public long segmentIntervalMs() {
+ return 1L;
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return retentionMs;
+ }
+ },
+ Serdes.String(),
+ Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+
+ test(storeBuilder, initLatch, finishLatch);
+ }
+
+ @Test
+ public void windowStoreMetricValueShouldNotThrowIfStoreIsNotInitialized()
throws Exception {
+ final CountDownLatch initLatch = new CountDownLatch(1);
+ final CountDownLatch finishLatch = new CountDownLatch(1);
+
+ final long retentionMs = 60_000L;
+ final long windowMs = 1_000L;
+
+ final StoreBuilder<WindowStore<String, String>> storeBuilder =
Stores.windowStoreBuilder(
+ new WindowBytesStoreSupplier() {
+ @Override
+ public String name() {
+ return "store";
+ }
+
+ @Override
+ public WindowStore<Bytes, byte[]> get() {
+ return new InMemoryWindowStore(name(), retentionMs,
windowMs, false, metricsScope()) {
+ @Override
+ public void init(final StateStoreContext
stateStoreContext, final StateStore root) {
+ initLatch.countDown();
+ try {
+ finishLatch.await();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.init(stateStoreContext, root);
+ }
+ };
+ }
+
+ @Override
+ public String metricsScope() {
+ return "in-memory-window";
+ }
+
+ @Override
+ public long segmentIntervalMs() {
+ return 1L;
+ }
+
+ @Override
+ public long windowSize() {
+ return windowMs;
+ }
+
+ @Override
+ public boolean retainDuplicates() {
+ return false;
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return retentionMs;
+ }
+ },
+ Serdes.String(),
+ Serdes.String())
+ .withCachingEnabled()
+ .withLoggingEnabled(Collections.emptyMap());
+
+ test(storeBuilder, initLatch, finishLatch);
+ }
+
+ private void test(final StoreBuilder<?> storeBuilder,
+ final CountDownLatch initLatch,
+ final CountDownLatch finishLatch) throws Exception {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.addStateStore(storeBuilder);
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(new MockApiProcessorSupplier<>(), "store");
+
+ final Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, safeTestName);
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers());
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ props.put(StreamsConfig.STATE_DIR_CONFIG,
org.apache.kafka.test.TestUtils.tempDirectory().getAbsolutePath());
+
+ try (KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
+ streams.start();
+
+ initLatch.await();
+
+ try {
+ for (final Map.Entry<MetricName, ? extends Metric> entry :
streams.metrics().entrySet()) {
+ entry.getValue().metricValue();
+ }
+ } catch (final Exception e) {
+ fail("Getting metric values on an uninitialized store
shouldn't throw exceptions", e);
+ } finally {
+ finishLatch.countDown();
+ }
+ }
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 452b662733d..9046ffefd7c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -183,7 +183,11 @@ public class MeteredKeyValueStore<K, V>
);
if (!persistent()) {
StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope,
name(), streamsMetrics,
- (config, now) -> wrapped().approximateNumEntries());
+ (config, now) -> {
+ final InMemoryKeyValueStore inMemoryStore =
findInner(InMemoryKeyValueStore.class);
+ return inMemoryStore != null ?
inMemoryStore.approximateNumEntries() : -1L;
+ }
+ );
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index fcc964cc255..fd0ce26a101 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -147,23 +147,13 @@ public class MeteredSessionStore<K, V>
if (!persistent()) {
StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope,
name(), streamsMetrics,
(config, now) -> {
- final InMemorySessionStore inMemoryStore =
findInMemorySessionStore(wrapped());
+ final InMemorySessionStore inMemoryStore =
findInner(InMemorySessionStore.class);
return inMemoryStore != null ?
inMemoryStore.numEntries() : -1L;
}
);
}
}
- private static InMemorySessionStore findInMemorySessionStore(final
StateStore store) {
- if (store instanceof InMemorySessionStore) {
- return (InMemorySessionStore) store;
- } else if (store instanceof WrappedStateStore) {
- return findInMemorySessionStore(((WrappedStateStore<?, ?, ?>)
store).wrapped());
- } else {
- return null;
- }
- }
-
@Override
public void recordRestoreTime(final long restoreTimeNs) {
restoreSensor.record(restoreTimeNs);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index ede391e93bc..87d485da950 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -162,23 +162,13 @@ public class MeteredWindowStore<K, V>
if (!persistent()) {
StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope,
name(), streamsMetrics,
(config, now) -> {
- final InMemoryWindowStore inMemoryStore =
findInMemoryWindowStore(wrapped());
+ final InMemoryWindowStore inMemoryStore =
findInner(InMemoryWindowStore.class);
return inMemoryStore != null ? inMemoryStore.numEntries()
: -1L;
}
);
}
}
- private static InMemoryWindowStore findInMemoryWindowStore(final
StateStore store) {
- if (store instanceof InMemoryWindowStore) {
- return (InMemoryWindowStore) store;
- } else if (store instanceof WrappedStateStore) {
- return findInMemoryWindowStore(((WrappedStateStore<?, ?, ?>)
store).wrapped());
- } else {
- return null;
- }
- }
-
@Override
public void recordRestoreTime(final long restoreTimeNs) {
restoreSensor.record(restoreTimeNs);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index e6e6c92e559..73786170a27 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -72,6 +72,20 @@ public abstract class WrappedStateStore<S extends
StateStore, K, V> implements S
this.wrapped = wrapped;
}
+ /**
+ * Walks the wrapped-store chain rooted at this store's {@code wrapped()}
until it
+ * finds an instance of {@code innerType}, or returns {@code null} if none
is present.
+ */
+ public <T extends StateStore> T findInner(final Class<T> innerType) {
+ if (innerType.isInstance(wrapped)) {
+ return innerType.cast(wrapped);
+ } else if (wrapped instanceof WrappedStateStore) {
+ return ((WrappedStateStore<?, ?, ?>) wrapped).findInner(innerType);
+ } else {
+ return null;
+ }
+ }
+
@Override
public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
wrapped.init(stateStoreContext, root);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index a50d833edff..d4edd9a1c3c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -538,12 +538,12 @@ public class MeteredKeyValueStoreTest {
@Test
public void shouldTrackNumKeysMetric() {
setUp();
- when(inner.approximateNumEntries()).thenReturn(42L);
init();
final KafkaMetric numKeysMetric = metric("num-keys");
assertThat(numKeysMetric, not(nullValue()));
- assertThat((Long) numKeysMetric.metricValue(), equalTo(42L));
+ // inner store is a mock (not InMemoryKeyValueStore), so returns -1
+ assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
}
@SuppressWarnings("unused")