This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f67225668f8 KAFKA-20497: Add readOnly(IsolationLevel) to 
CachingSessionStore (#22313)
f67225668f8 is described below

commit f67225668f84321cafbdd07f2f24590ea989dd67
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 29 19:39:05 2026 +0100

    KAFKA-20497: Add readOnly(IsolationLevel) to CachingSessionStore (#22313)
    
    The cache holds uncommitted writes that must not be visible under
    READ_COMMITTED, so that isolation level bypasses the cache entirely and
    delegates straight to the inner store's readOnly view. READ_UNCOMMITTED
    requires a merged view of both cache and store, so a ReadOnlyView inner
    class is introduced. The session fetch methods are extracted into
    *Internal helpers parameterised by the underlying store, allowing the
    view to share the same cache-merge logic (PeekingKeyValueIterator +
    HasNextCondition filtering) without duplication.
    
    KAFKA-20497
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../state/internals/CachingSessionStore.java       | 139 +++++++++++++++--
 .../CachingPersistentSessionStoreTest.java         | 169 +++++++++++++++++++++
 2 files changed, 294 insertions(+), 14 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 54228e29eb7..68235a7b906 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -29,6 +30,7 @@ import 
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.SessionStore;
 
 import org.slf4j.Logger;
@@ -71,6 +73,83 @@ public class CachingSessionStore
         this.maxObservedTimestamp = RecordQueue.UNKNOWN;
     }
 
+    @Override
+    public ReadOnlySessionStore<Bytes, byte[]> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel cannot be 
null");
+        if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+            return wrapped().readOnly(isolationLevel);
+        }
+        return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+    }
+
+    private final class ReadOnlyView implements ReadOnlySessionStore<Bytes, 
byte[]> {
+
+        private final ReadOnlySessionStore<Bytes, byte[]> underlying;
+
+        ReadOnlyView(final ReadOnlySessionStore<Bytes, byte[]> underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public byte[] fetchSession(final Bytes key,
+                                   final long earliestSessionEndTime,
+                                   final long latestSessionStartTime) {
+            return fetchSessionInternal(key, earliestSessionEndTime, 
latestSessionStartTime, underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Bytes key,
+                                                                      final 
long earliestSessionEndTime,
+                                                                      final 
long latestSessionStartTime) {
+            return findSessionsInternal(key, earliestSessionEndTime, 
latestSessionStartTime, underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes key,
+                                                                              
final long earliestSessionEndTime,
+                                                                              
final long latestSessionStartTime) {
+            return backwardFindSessionsInternal(key, earliestSessionEndTime, 
latestSessionStartTime, underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Bytes keyFrom,
+                                                                      final 
Bytes keyTo,
+                                                                      final 
long earliestSessionEndTime,
+                                                                      final 
long latestSessionStartTime) {
+            return findSessionsInternal(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime, underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes keyFrom,
+                                                                              
final Bytes keyTo,
+                                                                              
final long earliestSessionEndTime,
+                                                                              
final long latestSessionStartTime) {
+            return backwardFindSessionsInternal(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime, underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
key) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return findSessions(key, 0, Long.MAX_VALUE);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final 
Bytes key) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return backwardFindSessions(key, 0, Long.MAX_VALUE);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo) {
+            return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final 
Bytes keyFrom, final Bytes keyTo) {
+            return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE);
+        }
+    }
+
     @Override
     public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
         internalContext = asInternalProcessorContext(stateStoreContext);
@@ -161,6 +240,13 @@ public class CachingSessionStore
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
                                                                   final long 
earliestSessionEndTime,
                                                                   final long 
latestSessionStartTime) {
+        return findSessionsInternal(key, earliestSessionEndTime, 
latestSessionStartTime, wrapped());
+    }
+
+    private KeyValueIterator<Windowed<Bytes>, byte[]> 
findSessionsInternal(final Bytes key,
+                                                                           
final long earliestSessionEndTime,
+                                                                           
final long latestSessionStartTime,
+                                                                           
final ReadOnlySessionStore<Bytes, byte[]> underlying) {
         validateStoreOpen();
 
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
@@ -170,9 +256,8 @@ public class CachingSessionStore
                         
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, 
latestSessionStartTime))
             );
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
wrapped().findSessions(key,
-                                                                               
                earliestSessionEndTime,
-                                                                               
                latestSessionStartTime);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+            underlying.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key,
                                                                              
key,
                                                                              
earliestSessionEndTime,
@@ -187,6 +272,13 @@ public class CachingSessionStore
     public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes key,
                                                                           
final long earliestSessionEndTime,
                                                                           
final long latestSessionStartTime) {
+        return backwardFindSessionsInternal(key, earliestSessionEndTime, 
latestSessionStartTime, wrapped());
+    }
+
+    private KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessionsInternal(final Bytes key,
+                                                                               
    final long earliestSessionEndTime,
+                                                                               
    final long latestSessionStartTime,
+                                                                               
    final ReadOnlySessionStore<Bytes, byte[]> underlying) {
         validateStoreOpen();
 
         final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
wrapped().persistent() ?
@@ -198,11 +290,8 @@ public class CachingSessionStore
                 )
             );
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
wrapped().backwardFindSessions(
-            key,
-            earliestSessionEndTime,
-            latestSessionStartTime
-        );
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+            underlying.backwardFindSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(
             key,
             key,
@@ -220,6 +309,14 @@ public class CachingSessionStore
                                                                   final Bytes 
keyTo,
                                                                   final long 
earliestSessionEndTime,
                                                                   final long 
latestSessionStartTime) {
+        return findSessionsInternal(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime, wrapped());
+    }
+
+    private KeyValueIterator<Windowed<Bytes>, byte[]> 
findSessionsInternal(final Bytes keyFrom,
+                                                                           
final Bytes keyTo,
+                                                                           
final long earliestSessionEndTime,
+                                                                           
final long latestSessionStartTime,
+                                                                           
final ReadOnlySessionStore<Bytes, byte[]> underlying) {
         if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
             LOG.warn(INVALID_RANGE_WARN_MSG);
             return KeyValueIterators.emptyIterator();
@@ -231,9 +328,8 @@ public class CachingSessionStore
         final Bytes cacheKeyTo = keyTo == null ? null : 
cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = 
wrapped().findSessions(
-            keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime
-        );
+        final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+            underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime);
         final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(keyFrom,
                                                                              
keyTo,
                                                                              
earliestSessionEndTime,
@@ -249,6 +345,14 @@ public class CachingSessionStore
                                                                           
final Bytes keyTo,
                                                                           
final long earliestSessionEndTime,
                                                                           
final long latestSessionStartTime) {
+        return backwardFindSessionsInternal(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime, wrapped());
+    }
+
+    private KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessionsInternal(final Bytes keyFrom,
+                                                                               
    final Bytes keyTo,
+                                                                               
    final long earliestSessionEndTime,
+                                                                               
    final long latestSessionStartTime,
+                                                                               
    final ReadOnlySessionStore<Bytes, byte[]> underlying) {
         if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
             LOG.warn(INVALID_RANGE_WARN_MSG);
             return KeyValueIterators.emptyIterator();
@@ -261,7 +365,7 @@ public class CachingSessionStore
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo);
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
-            wrapped().backwardFindSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime);
+            underlying.backwardFindSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime);
         final HasNextCondition hasNextCondition = keySchema.hasNextCondition(
             keyFrom,
             keyTo,
@@ -277,16 +381,23 @@ public class CachingSessionStore
     @Override
     public byte[] fetchSession(final Bytes key, final long 
earliestSessionEndTime, final long latestSessionStartTime) {
         Objects.requireNonNull(key, "key cannot be null");
+        return fetchSessionInternal(key, earliestSessionEndTime, 
latestSessionStartTime, wrapped());
+    }
+
+    private byte[] fetchSessionInternal(final Bytes key,
+                                        final long earliestSessionEndTime,
+                                        final long latestSessionStartTime,
+                                        final ReadOnlySessionStore<Bytes, 
byte[]> underlying) {
         validateStoreOpen();
         if (internalContext.cache() == null) {
-            return wrapped().fetchSession(key, earliestSessionEndTime, 
latestSessionStartTime);
+            return underlying.fetchSession(key, earliestSessionEndTime, 
latestSessionStartTime);
         } else {
             final Bytes bytesKey = SessionKeySchema.toBinary(key, 
earliestSessionEndTime,
                 latestSessionStartTime);
             final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
             final LRUCacheEntry entry = internalContext.cache().get(cacheName, 
cacheKey);
             if (entry == null) {
-                return wrapped().fetchSession(key, earliestSessionEndTime, 
latestSessionStartTime);
+                return underlying.fetchSession(key, earliestSessionEndTime, 
latestSessionStartTime);
             } else {
                 return entry.value();
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 8dff7d0c1cd..92a3370d16a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -36,6 +37,7 @@ import 
org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
@@ -860,6 +862,173 @@ public class CachingPersistentSessionStoreTest {
         }
     }
 
+    @Test
+    public void shouldReadCommittedBypassesCache() {
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+
+        // cache-only entry is invisible under READ_COMMITTED
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> it =
+                 
cachingStore.readOnly(IsolationLevel.READ_COMMITTED).findSessions(keyA, 0, 0)) {
+            assertFalse(it.hasNext());
+        }
+
+        cachingStore.commit(Map.of());
+
+        // once flushed to store it is visible
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> it =
+                 
cachingStore.readOnly(IsolationLevel.READ_COMMITTED).findSessions(keyA, 0, 0)) {
+            assertTrue(it.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldReadUncommittedViewFetchSession() {
+        // keyA in store, keyB in cache
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"store".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), 
"cache".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertArrayEquals("store".getBytes(), view.fetchSession(keyA, 0, 0));
+        assertArrayEquals("cache".getBytes(), view.fetchSession(keyB, 0, 0));
+    }
+
+    @Test
+    public void shouldReadUncommittedViewFindSessionsMergesCacheAndStore() {
+        // keyA window(0,0) in store; keyA window(10,10) in cache
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+            toListAndCloseIterator(view.findSessions(keyA, 0, 10));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new 
SessionWindow(10, 10)), "2");
+    }
+
+    @Test
+    public void 
shouldReadUncommittedViewBackwardFindSessionsMergesCacheAndStore() {
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+            toListAndCloseIterator(view.backwardFindSessions(keyA, 0, 10));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new 
SessionWindow(10, 10)), "2");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+    }
+
+    @Test
+    public void 
shouldReadUncommittedViewFindSessionsRangeMergesCacheAndStore() {
+        // keyA in store, keyB in cache
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+            toListAndCloseIterator(view.findSessions(keyA, keyB, 0, 0));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyB, new 
SessionWindow(0, 0)), "2");
+    }
+
+    @Test
+    public void 
shouldReadUncommittedViewBackwardFindSessionsRangeMergesCacheAndStore() {
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+            toListAndCloseIterator(view.backwardFindSessions(keyA, keyB, 0, 
0));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyB, new 
SessionWindow(0, 0)), "2");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+    }
+
+    @Test
+    public void shouldReadUncommittedViewFetchSingleKeyMergesCacheAndStore() {
+        // keyA window(0,0) in store; keyA window(10,10) in cache
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results = 
toListAndCloseIterator(view.fetch(keyA));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new 
SessionWindow(10, 10)), "2");
+    }
+
+    @Test
+    public void 
shouldReadUncommittedViewBackwardFetchSingleKeyMergesCacheAndStore() {
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results = 
toListAndCloseIterator(view.backwardFetch(keyA));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new 
SessionWindow(10, 10)), "2");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+    }
+
+    @Test
+    public void shouldReadUncommittedViewFetchRangeMergesCacheAndStore() {
+        // keyA in store, keyB in cache
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results = 
toListAndCloseIterator(view.fetch(keyA, keyB));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyB, new 
SessionWindow(0, 0)), "2");
+    }
+
+    @Test
+    public void 
shouldReadUncommittedViewBackwardFetchRangeMergesCacheAndStore() {
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), 
"2".getBytes());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results = 
toListAndCloseIterator(view.backwardFetch(keyA, keyB));
+        assertEquals(2, results.size());
+        verifyWindowedKeyValue(results.get(0), new Windowed<>(keyB, new 
SessionWindow(0, 0)), "2");
+        verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new 
SessionWindow(0, 0)), "1");
+    }
+
+    @Test
+    public void 
shouldReadUncommittedViewFetchSessionReturnsNullForCachedTombstone() {
+        // Flush keyA to the underlying store, then remove it (cache holds 
null tombstone).
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"store".getBytes());
+        cachingStore.commit(Map.of());
+        cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)));
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        // Cache tombstone (null value) shadows the underlying "store" value.
+        assertNull(view.fetchSession(keyA, 0, 0));
+    }
+
+    @Test
+    public void shouldReadUncommittedViewFetchRangeWithNullBounds() {
+        cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), 
"1".getBytes());
+        cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)), 
"2".getBytes());
+        cachingStore.commit(Map.of());
+
+        final ReadOnlySessionStore<Bytes, byte[]> view = 
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        // null keyFrom/keyTo means unbounded — should return all sessions 
without NPE.
+        final List<KeyValue<Windowed<Bytes>, byte[]>> results = 
toListAndCloseIterator(view.fetch(null, null));
+        assertEquals(2, results.size());
+    }
+
     private List<KeyValue<Windowed<Bytes>, byte[]>> 
addSessionsUntilOverflow(final String... sessionIds) {
         final Random random = new Random();
         final List<KeyValue<Windowed<Bytes>, byte[]>> results = new 
ArrayList<>();

Reply via email to