This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d766d554aee KAFKA-20497: Add readOnly(IsolationLevel) to 
MeteredSessionStore (#22317)
d766d554aee is described below

commit d766d554aee1674ca7ea528e3f6f8b8b0f1335c9
Author: Nick Telford <[email protected]>
AuthorDate: Tue Jun 2 15:50:18 2026 +0100

    KAFKA-20497: Add readOnly(IsolationLevel) to MeteredSessionStore (#22317)
    
    The metered layer must wrap both isolation levels rather than delegating
    the inner store's readOnly view directly: it holds the serde context
    needed to deserialise keys and values, and it owns the sensor
    registration for iterator metrics. The ReadOnlyView applies the same
    key/value deserialisation and wrapIterator/sensor instrumentation as the
    public session fetch methods so that IQ operations through a readOnly
    view are measured identically to direct queries.
    
    KAFKA-20497
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../state/internals/MeteredSessionStore.java       | 262 ++++++++++-----------
 .../state/internals/MeteredSessionStoreTest.java   | 145 ++++++++++++
 2 files changed, 270 insertions(+), 137 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index a8ae70068a4..a56fd4c9642 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
@@ -40,6 +41,7 @@ import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
@@ -254,172 +256,158 @@ public class MeteredSessionStore<K, V>
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().fetch(serializeKey(key)),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return meteredWindowedIterator(wrapped().fetch(serializeKey(key)));
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardFetch(serializeKey(key)),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return 
meteredWindowedIterator(wrapped().backwardFetch(serializeKey(key)));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(
-        final K keyFrom,
-        final K keyTo
-    ) {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().fetch(serializeKey(keyFrom), serializeKey(keyTo)),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+    public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K 
keyTo) {
+        return meteredWindowedIterator(wrapped().fetch(serializeKey(keyFrom), 
serializeKey(keyTo)));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFetch(
-        final K keyFrom,
-        final K keyTo
-    ) {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardFetch(serializeKey(keyFrom), 
serializeKey(keyTo)),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom, 
final K keyTo) {
+        return 
meteredWindowedIterator(wrapped().backwardFetch(serializeKey(keyFrom), 
serializeKey(keyTo)));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> findSessions(
-        final K key,
-        final long earliestSessionEndTime,
-        final long latestSessionStartTime
-    ) {
+    public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
+                                                         final long 
earliestSessionEndTime,
+                                                         final long 
latestSessionStartTime) {
         Objects.requireNonNull(key, "key cannot be null");
-        final Bytes bytesKey = serializeKey(key);
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().findSessions(
-                bytesKey,
-                earliestSessionEndTime,
-                latestSessionStartTime),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return 
meteredWindowedIterator(wrapped().findSessions(serializeKey(key), 
earliestSessionEndTime, latestSessionStartTime));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
-        final K key,
-        final long earliestSessionEndTime,
-        final long latestSessionStartTime
-    ) {
+    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key,
+                                                                 final long 
earliestSessionEndTime,
+                                                                 final long 
latestSessionStartTime) {
         Objects.requireNonNull(key, "key cannot be null");
-        final Bytes bytesKey = serializeKey(key);
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardFindSessions(
-                bytesKey,
-                earliestSessionEndTime,
-                latestSessionStartTime
-            ),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return 
meteredWindowedIterator(wrapped().backwardFindSessions(serializeKey(key), 
earliestSessionEndTime, latestSessionStartTime));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> findSessions(
-        final K keyFrom,
-        final K keyTo,
-        final long earliestSessionEndTime,
-        final long latestSessionStartTime
-    ) {
-        final Bytes bytesKeyFrom = serializeKey(keyFrom);
-        final Bytes bytesKeyTo = serializeKey(keyTo);
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().findSessions(
-                bytesKeyFrom,
-                bytesKeyTo,
-                earliestSessionEndTime,
-                latestSessionStartTime),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+    public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
+                                                         final K keyTo,
+                                                         final long 
earliestSessionEndTime,
+                                                         final long 
latestSessionStartTime) {
+        return 
meteredWindowedIterator(wrapped().findSessions(serializeKey(keyFrom), 
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> findSessions(
-        final long earliestSessionEndTime,
-        final long latestSessionEndTime
-    ) {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().findSessions(earliestSessionEndTime, 
latestSessionEndTime),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+    public KeyValueIterator<Windowed<K>, V> findSessions(final long 
earliestSessionEndTime,
+                                                         final long 
latestSessionEndTime) {
+        return 
meteredWindowedIterator(wrapped().findSessions(earliestSessionEndTime, 
latestSessionEndTime));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
-        final K keyFrom,
-        final K keyTo,
-        final long earliestSessionEndTime,
-        final long latestSessionStartTime
-    ) {
-        final Bytes bytesKeyFrom = serializeKey(keyFrom);
-        final Bytes bytesKeyTo = serializeKey(keyTo);
+    public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K 
keyFrom,
+                                                                 final K keyTo,
+                                                                 final long 
earliestSessionEndTime,
+                                                                 final long 
latestSessionStartTime) {
+        return 
meteredWindowedIterator(wrapped().backwardFindSessions(serializeKey(keyFrom), 
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
+    }
+
+    @Override
+    public ReadOnlySessionStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel cannot be 
null");
+        return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+    }
+
+    private final class ReadOnlyView implements ReadOnlySessionStore<K, V> {
+
+        private final ReadOnlySessionStore<Bytes, byte[]> underlying;
+
+        ReadOnlyView(final ReadOnlySessionStore<Bytes, byte[]> underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public V fetchSession(
+            final K key,
+            final long earliestSessionEndTime,
+            final long latestSessionStartTime
+        ) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return maybeMeasureLatency(
+                () -> 
deserializeValue(underlying.fetchSession(serializeKey(key), 
earliestSessionEndTime, latestSessionStartTime)),
+                time,
+                fetchSensor
+            );
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return 
meteredWindowedIterator(underlying.fetch(serializeKey(key)));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return 
meteredWindowedIterator(underlying.backwardFetch(serializeKey(key)));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K 
keyTo) {
+            return 
meteredWindowedIterator(underlying.fetch(serializeKey(keyFrom), 
serializeKey(keyTo)));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom, 
final K keyTo) {
+            return 
meteredWindowedIterator(underlying.backwardFetch(serializeKey(keyFrom), 
serializeKey(keyTo)));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> findSessions(
+            final K key,
+            final long earliestSessionEndTime,
+            final long latestSessionStartTime
+        ) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return 
meteredWindowedIterator(underlying.findSessions(serializeKey(key), 
earliestSessionEndTime, latestSessionStartTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
+            final K key,
+            final long earliestSessionEndTime,
+            final long latestSessionStartTime
+        ) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return 
meteredWindowedIterator(underlying.backwardFindSessions(serializeKey(key), 
earliestSessionEndTime, latestSessionStartTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> findSessions(
+            final K keyFrom,
+            final K keyTo,
+            final long earliestSessionEndTime,
+            final long latestSessionStartTime
+        ) {
+            return 
meteredWindowedIterator(underlying.findSessions(serializeKey(keyFrom), 
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
+            final K keyFrom,
+            final K keyTo,
+            final long earliestSessionEndTime,
+            final long latestSessionStartTime
+        ) {
+            return 
meteredWindowedIterator(underlying.backwardFindSessions(serializeKey(keyFrom), 
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
+        }
+
+    }
+
+    private KeyValueIterator<Windowed<K>, V> meteredWindowedIterator(final 
KeyValueIterator<Windowed<Bytes>, byte[]> iter) {
         return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardFindSessions(
-                bytesKeyFrom,
-                bytesKeyTo,
-                earliestSessionEndTime,
-                latestSessionStartTime
-            ),
+            iter,
             fetchSensor,
             iteratorDurationSensor,
             this::deserializeKey,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 28982f9e229..11735047dfb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -45,6 +46,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.KeyValueIteratorStub;
@@ -820,6 +822,149 @@ public class MeteredSessionStoreTest {
         assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFetchSessionApplySerdesAndRecordMetric() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.fetchSession(KEY_BYTES, 0, 0)).thenReturn(VALUE_BYTES);
+        init();
+
+        final ReadOnlySessionStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertThat(view.fetchSession(KEY, 0, 0), equalTo(VALUE));
+        assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFindSessionsApplySerdesAndRecordMetric() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.findSessions(KEY_BYTES, 0, 0))
+            .thenReturn(new KeyValueIteratorStub<>(
+                Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
+        init();
+
+        final ReadOnlySessionStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<Windowed<String>, String> it = 
view.findSessions(KEY, 0, 0)) {
+            assertThat(it.next().value, equalTo(VALUE));
+        }
+        assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldReadOnlyViewBackwardFindSessionsApplySerdesAndRecordMetric() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.backwardFindSessions(KEY_BYTES, 0, 0))
+            .thenReturn(new KeyValueIteratorStub<>(
+                Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
+        init();
+
+        final ReadOnlySessionStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<Windowed<String>, String> it = 
view.backwardFindSessions(KEY, 0, 0)) {
+            assertThat(it.next().value, equalTo(VALUE));
+        }
+        assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldReadOnlyViewFindSessionsRangeApplySerdesAndRecordMetric() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.findSessions(KEY_BYTES, KEY_BYTES, 0, 0))
+            .thenReturn(new KeyValueIteratorStub<>(
+                Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
+        init();
+
+        final ReadOnlySessionStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<Windowed<String>, String> it = 
view.findSessions(KEY, KEY, 0, 0)) {
+            assertThat(it.next().value, equalTo(VALUE));
+        }
+        assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFetchApplySerdesAndRecordMetric() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.fetch(KEY_BYTES))
+            .thenReturn(new KeyValueIteratorStub<>(
+                Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
+        init();
+
+        final ReadOnlySessionStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<Windowed<String>, String> it = 
view.fetch(KEY)) {
+            assertThat(it.next().value, equalTo(VALUE));
+        }
+        assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewBackwardFetchApplySerdesAndRecordMetric() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.backwardFetch(KEY_BYTES))
+            .thenReturn(new KeyValueIteratorStub<>(
+                Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
+        init();
+
+        final ReadOnlySessionStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<Windowed<String>, String> it = 
view.backwardFetch(KEY)) {
+            assertThat(it.next().value, equalTo(VALUE));
+        }
+        assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFetchRangeApplySerdesAndRecordMetric() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.fetch(KEY_BYTES, KEY_BYTES))
+            .thenReturn(new KeyValueIteratorStub<>(
+                Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
+        init();
+
+        final ReadOnlySessionStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<Windowed<String>, String> it = 
view.fetch(KEY, KEY)) {
+            assertThat(it.next().value, equalTo(VALUE));
+        }
+        assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPassReadCommittedThroughToInner() {
+        setUp();
+        final ReadOnlySessionStore<Bytes, byte[]> innerView = 
mock(ReadOnlySessionStore.class);
+        
when(innerStore.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(innerView);
+        init();
+
+        store.readOnly(IsolationLevel.READ_COMMITTED);
+
+        verify(innerStore).readOnly(IsolationLevel.READ_COMMITTED);
+    }
+
+    @Test
+    public void shouldThrowNpeOnNullIsolationLevel() {
+        setUp();
+        init();
+
+        assertThrows(NullPointerException.class, () -> store.readOnly(null));
+    }
+
     private KafkaMetric metric(final String name) {
         return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", 
this.tags));
     }

Reply via email to