This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88a955d8354 KAFKA-20497: Add readOnly(IsolationLevel) to ChangeLogging 
stores (#22311)
88a955d8354 is described below

commit 88a955d8354182399e599fc34d36d9267408c9d1
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 29 19:30:01 2026 +0100

    KAFKA-20497: Add readOnly(IsolationLevel) to ChangeLogging stores (#22311)
    
    The ChangeLogging wrappers have no cache of their own — they simply
    intercept  writes to emit changelog records and delegate reads straight
    through. There is  no uncommitted state held at this layer, so the
    isolation semantics belong  entirely to the inner store. Pure delegation
    to wrapped().readOnly(level) is  therefore the only sensible
    implementation.
    
    KAFKA-20497
    
    Reviewers: Murali Basani  <[email protected]>, Bill Bejeck
     <[email protected]>
---
 .../internals/ChangeLoggingKeyValueBytesStore.java     |  7 +++++++
 .../internals/ChangeLoggingSessionBytesStore.java      |  7 +++++++
 .../state/internals/ChangeLoggingWindowBytesStore.java |  7 +++++++
 .../internals/ChangeLoggingKeyValueBytesStoreTest.java | 14 ++++++++++++++
 .../internals/ChangeLoggingSessionBytesStoreTest.java  | 18 ++++++++++++++++++
 .../internals/ChangeLoggingWindowBytesStoreTest.java   | 18 ++++++++++++++++++
 6 files changed, 71 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index d6f31629c50..1654ef6380a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serializer;
@@ -26,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.util.List;
 
@@ -64,6 +66,11 @@ public class ChangeLoggingKeyValueBytesStore
         return wrapped().approximateNumEntries();
     }
 
+    @Override
+    public ReadOnlyKeyValueStore<Bytes, byte[]> readOnly(final IsolationLevel 
isolationLevel) {
+        return wrapped().readOnly(isolationLevel);
+    }
+
     @Override
     public void put(final Bytes key,
                     final byte[] value) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index dcd85eab489..7ffa1d5c22f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.SessionStore;
 
 import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
@@ -47,6 +49,11 @@ public class ChangeLoggingSessionBytesStore
         super.init(stateStoreContext, root);
     }
 
+    @Override
+    public ReadOnlySessionStore<Bytes, byte[]> readOnly(final IsolationLevel 
isolationLevel) {
+        return wrapped().readOnly(isolationLevel);
+    }
+
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key, final long earliestSessionEndTime, final long latestSessionStartTime) {
         return wrapped().findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 74a213f2ec1..d8f273a9c26 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -67,6 +69,11 @@ class ChangeLoggingWindowBytesStore
         return wrapped().fetch(key, timestamp);
     }
 
+    @Override
+    public ReadOnlyWindowStore<Bytes, byte[]> readOnly(final IsolationLevel 
isolationLevel) {
+        return wrapped().readOnly(isolationLevel);
+    }
+
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key,
                                              final long from,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 0c288a0ddd6..65ffd0cffba 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
@@ -61,6 +62,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.mockito.Mockito.mock;
@@ -264,6 +266,18 @@ public class ChangeLoggingKeyValueBytesStoreTest {
 
     }
 
+    @Test
+    public void shouldDelegateReadOnlyUncommittedToInner() {
+        assertThat(store.readOnly(IsolationLevel.READ_UNCOMMITTED),
+            sameInstance(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)));
+    }
+
+    @Test
+    public void shouldDelegateReadOnlyCommittedToInner() {
+        assertThat(store.readOnly(IsolationLevel.READ_COMMITTED),
+            sameInstance(inner.readOnly(IsolationLevel.READ_COMMITTED)));
+    }
+
     private StreamsConfig streamsConfigMock() {
         final StreamsConfig streamsConfig = mock(StreamsConfig.class);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index b2695f766dc..a92d59d2743 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -24,6 +25,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.SessionStore;
 
 import org.junit.jupiter.api.AfterEach;
@@ -39,6 +41,8 @@ import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -51,6 +55,8 @@ public class ChangeLoggingSessionBytesStoreTest {
     private SessionStore<Bytes, byte[]> inner;
     @Mock
     private ProcessorContextImpl context;
+    @Mock
+    private ReadOnlySessionStore<Bytes, byte[]> view;
 
     private ChangeLoggingSessionBytesStore store;
     private final byte[] value1 = {0};
@@ -189,4 +195,16 @@ public class ChangeLoggingSessionBytesStoreTest {
 
         verify(inner).close();
     }
+
+    @Test
+    public void shouldDelegateReadOnlyUncommittedToInner() {
+        when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(view);
+        assertThat(store.readOnly(IsolationLevel.READ_UNCOMMITTED), 
sameInstance(view));
+    }
+
+    @Test
+    public void shouldDelegateReadOnlyCommittedToInner() {
+        when(inner.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(view);
+        assertThat(store.readOnly(IsolationLevel.READ_COMMITTED), 
sameInstance(view));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index fabca5e3bdb..f993c4dfce2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -24,6 +25,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -38,6 +40,8 @@ import org.mockito.quality.Strictness;
 import static java.time.Instant.ofEpochMilli;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -53,6 +57,8 @@ public class ChangeLoggingWindowBytesStoreTest {
     private WindowStore<Bytes, byte[]> inner;
     @Mock
     private ProcessorContextImpl context;
+    @Mock
+    private ReadOnlyWindowStore<Bytes, byte[]> view;
     private ChangeLoggingWindowBytesStore store;
 
     private static final Position POSITION = 
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 1L)))));
@@ -124,6 +130,18 @@ public class ChangeLoggingWindowBytesStoreTest {
         }
     }
 
+    @Test
+    public void shouldDelegateReadOnlyUncommittedToInner() {
+        when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(view);
+        assertThat(store.readOnly(IsolationLevel.READ_UNCOMMITTED), 
sameInstance(view));
+    }
+
+    @Test
+    public void shouldDelegateReadOnlyCommittedToInner() {
+        when(inner.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(view);
+        assertThat(store.readOnly(IsolationLevel.READ_COMMITTED), 
sameInstance(view));
+    }
+
     @Test
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true, 
WindowKeySchema::toStoreKeyBinary);

Reply via email to