This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f67225668f8 KAFKA-20497: Add readOnly(IsolationLevel) to
CachingSessionStore (#22313)
f67225668f8 is described below
commit f67225668f84321cafbdd07f2f24590ea989dd67
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 29 19:39:05 2026 +0100
KAFKA-20497: Add readOnly(IsolationLevel) to CachingSessionStore (#22313)
The cache holds uncommitted writes that must not be visible under
READ_COMMITTED, so that isolation level bypasses the cache entirely and
delegates straight to the inner store's readOnly view. READ_UNCOMMITTED
requires a merged view of both cache and store, so a ReadOnlyView inner
class is introduced. The session fetch methods are extracted into
*Internal helpers parameterised by the underlying store, allowing the
view to share the same cache-merge logic (PeekingKeyValueIterator +
HasNextCondition filtering) without duplication.
KAFKA-20497
Reviewers: Bill Bejeck <[email protected]>
---
.../state/internals/CachingSessionStore.java | 139 +++++++++++++++--
.../CachingPersistentSessionStoreTest.java | 169 +++++++++++++++++++++
2 files changed, 294 insertions(+), 14 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 54228e29eb7..68235a7b906 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@@ -29,6 +30,7 @@ import
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import org.slf4j.Logger;
@@ -71,6 +73,83 @@ public class CachingSessionStore
this.maxObservedTimestamp = RecordQueue.UNKNOWN;
}
+ @Override
+ public ReadOnlySessionStore<Bytes, byte[]> readOnly(final IsolationLevel
isolationLevel) {
+ Objects.requireNonNull(isolationLevel, "isolationLevel cannot be
null");
+ if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+ return wrapped().readOnly(isolationLevel);
+ }
+ return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+ }
+
+ private final class ReadOnlyView implements ReadOnlySessionStore<Bytes,
byte[]> {
+
+ private final ReadOnlySessionStore<Bytes, byte[]> underlying;
+
+ ReadOnlyView(final ReadOnlySessionStore<Bytes, byte[]> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public byte[] fetchSession(final Bytes key,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime) {
+ return fetchSessionInternal(key, earliestSessionEndTime,
latestSessionStartTime, underlying);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final
Bytes key,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return findSessionsInternal(key, earliestSessionEndTime,
latestSessionStartTime, underlying);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFindSessions(final Bytes key,
+
final long earliestSessionEndTime,
+
final long latestSessionStartTime) {
+ return backwardFindSessionsInternal(key, earliestSessionEndTime,
latestSessionStartTime, underlying);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final
Bytes keyFrom,
+ final
Bytes keyTo,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return findSessionsInternal(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime, underlying);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFindSessions(final Bytes keyFrom,
+
final Bytes keyTo,
+
final long earliestSessionEndTime,
+
final long latestSessionStartTime) {
+ return backwardFindSessionsInternal(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime, underlying);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return findSessions(key, 0, Long.MAX_VALUE);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final
Bytes key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return backwardFindSessions(key, 0, Long.MAX_VALUE);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
keyFrom, final Bytes keyTo) {
+ return findSessions(keyFrom, keyTo, 0, Long.MAX_VALUE);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final
Bytes keyFrom, final Bytes keyTo) {
+ return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE);
+ }
+ }
+
@Override
public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
internalContext = asInternalProcessorContext(stateStoreContext);
@@ -161,6 +240,13 @@ public class CachingSessionStore
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes
key,
final long
earliestSessionEndTime,
final long
latestSessionStartTime) {
+ return findSessionsInternal(key, earliestSessionEndTime,
latestSessionStartTime, wrapped());
+ }
+
+ private KeyValueIterator<Windowed<Bytes>, byte[]>
findSessionsInternal(final Bytes key,
+
final long earliestSessionEndTime,
+
final long latestSessionStartTime,
+
final ReadOnlySessionStore<Bytes, byte[]> underlying) {
validateStoreOpen();
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator =
wrapped().persistent() ?
@@ -170,9 +256,8 @@ public class CachingSessionStore
cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key,
latestSessionStartTime))
);
- final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
wrapped().findSessions(key,
-
earliestSessionEndTime,
-
latestSessionStartTime);
+ final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+ underlying.findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
final HasNextCondition hasNextCondition =
keySchema.hasNextCondition(key,
key,
earliestSessionEndTime,
@@ -187,6 +272,13 @@ public class CachingSessionStore
public KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFindSessions(final Bytes key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
+ return backwardFindSessionsInternal(key, earliestSessionEndTime,
latestSessionStartTime, wrapped());
+ }
+
+ private KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFindSessionsInternal(final Bytes key,
+
final long earliestSessionEndTime,
+
final long latestSessionStartTime,
+
final ReadOnlySessionStore<Bytes, byte[]> underlying) {
validateStoreOpen();
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator =
wrapped().persistent() ?
@@ -198,11 +290,8 @@ public class CachingSessionStore
)
);
- final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
wrapped().backwardFindSessions(
- key,
- earliestSessionEndTime,
- latestSessionStartTime
- );
+ final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+ underlying.backwardFindSessions(key, earliestSessionEndTime,
latestSessionStartTime);
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(
key,
key,
@@ -220,6 +309,14 @@ public class CachingSessionStore
final Bytes
keyTo,
final long
earliestSessionEndTime,
final long
latestSessionStartTime) {
+ return findSessionsInternal(keyFrom, keyTo, earliestSessionEndTime,
latestSessionStartTime, wrapped());
+ }
+
+ private KeyValueIterator<Windowed<Bytes>, byte[]>
findSessionsInternal(final Bytes keyFrom,
+
final Bytes keyTo,
+
final long earliestSessionEndTime,
+
final long latestSessionStartTime,
+
final ReadOnlySessionStore<Bytes, byte[]> underlying) {
if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn(INVALID_RANGE_WARN_MSG);
return KeyValueIterators.emptyIterator();
@@ -231,9 +328,8 @@ public class CachingSessionStore
final Bytes cacheKeyTo = keyTo == null ? null :
cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
- final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
wrapped().findSessions(
- keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime
- );
+ final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+ underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime,
latestSessionStartTime);
final HasNextCondition hasNextCondition =
keySchema.hasNextCondition(keyFrom,
keyTo,
earliestSessionEndTime,
@@ -249,6 +345,14 @@ public class CachingSessionStore
final Bytes keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
+ return backwardFindSessionsInternal(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime, wrapped());
+ }
+
+ private KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFindSessionsInternal(final Bytes keyFrom,
+
final Bytes keyTo,
+
final long earliestSessionEndTime,
+
final long latestSessionStartTime,
+
final ReadOnlySessionStore<Bytes, byte[]> underlying) {
if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
LOG.warn(INVALID_RANGE_WARN_MSG);
return KeyValueIterators.emptyIterator();
@@ -261,7 +365,7 @@ public class CachingSessionStore
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo);
final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
- wrapped().backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
+ underlying.backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
final HasNextCondition hasNextCondition = keySchema.hasNextCondition(
keyFrom,
keyTo,
@@ -277,16 +381,23 @@ public class CachingSessionStore
@Override
public byte[] fetchSession(final Bytes key, final long
earliestSessionEndTime, final long latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
+ return fetchSessionInternal(key, earliestSessionEndTime,
latestSessionStartTime, wrapped());
+ }
+
+ private byte[] fetchSessionInternal(final Bytes key,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime,
+ final ReadOnlySessionStore<Bytes,
byte[]> underlying) {
validateStoreOpen();
if (internalContext.cache() == null) {
- return wrapped().fetchSession(key, earliestSessionEndTime,
latestSessionStartTime);
+ return underlying.fetchSession(key, earliestSessionEndTime,
latestSessionStartTime);
} else {
final Bytes bytesKey = SessionKeySchema.toBinary(key,
earliestSessionEndTime,
latestSessionStartTime);
final Bytes cacheKey = cacheFunction.cacheKey(bytesKey);
final LRUCacheEntry entry = internalContext.cache().get(cacheName,
cacheKey);
if (entry == null) {
- return wrapped().fetchSession(key, earliestSessionEndTime,
latestSessionStartTime);
+ return underlying.fetchSession(key, earliestSessionEndTime,
latestSessionStartTime);
} else {
return entry.value();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 8dff7d0c1cd..92a3370d16a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
@@ -36,6 +37,7 @@ import
org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
@@ -860,6 +862,173 @@ public class CachingPersistentSessionStoreTest {
}
}
+ @Test
+ public void shouldReadCommittedBypassesCache() {
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+
+ // cache-only entry is invisible under READ_COMMITTED
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> it =
+
cachingStore.readOnly(IsolationLevel.READ_COMMITTED).findSessions(keyA, 0, 0)) {
+ assertFalse(it.hasNext());
+ }
+
+ cachingStore.commit(Map.of());
+
+ // once flushed to store it is visible
+ try (final KeyValueIterator<Windowed<Bytes>, byte[]> it =
+
cachingStore.readOnly(IsolationLevel.READ_COMMITTED).findSessions(keyA, 0, 0)) {
+ assertTrue(it.hasNext());
+ }
+ }
+
+ @Test
+ public void shouldReadUncommittedViewFetchSession() {
+ // keyA in store, keyB in cache
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"store".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)),
"cache".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertArrayEquals("store".getBytes(), view.fetchSession(keyA, 0, 0));
+ assertArrayEquals("cache".getBytes(), view.fetchSession(keyB, 0, 0));
+ }
+
+ @Test
+ public void shouldReadUncommittedViewFindSessionsMergesCacheAndStore() {
+ // keyA window(0,0) in store; keyA window(10,10) in cache
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+ toListAndCloseIterator(view.findSessions(keyA, 0, 10));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new
SessionWindow(10, 10)), "2");
+ }
+
+ @Test
+ public void
shouldReadUncommittedViewBackwardFindSessionsMergesCacheAndStore() {
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+ toListAndCloseIterator(view.backwardFindSessions(keyA, 0, 10));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new
SessionWindow(10, 10)), "2");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ }
+
+ @Test
+ public void
shouldReadUncommittedViewFindSessionsRangeMergesCacheAndStore() {
+ // keyA in store, keyB in cache
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+ toListAndCloseIterator(view.findSessions(keyA, keyB, 0, 0));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyB, new
SessionWindow(0, 0)), "2");
+ }
+
+ @Test
+ public void
shouldReadUncommittedViewBackwardFindSessionsRangeMergesCacheAndStore() {
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
+ toListAndCloseIterator(view.backwardFindSessions(keyA, keyB, 0,
0));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyB, new
SessionWindow(0, 0)), "2");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ }
+
+ @Test
+ public void shouldReadUncommittedViewFetchSingleKeyMergesCacheAndStore() {
+ // keyA window(0,0) in store; keyA window(10,10) in cache
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
toListAndCloseIterator(view.fetch(keyA));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new
SessionWindow(10, 10)), "2");
+ }
+
+ @Test
+ public void
shouldReadUncommittedViewBackwardFetchSingleKeyMergesCacheAndStore() {
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(10, 10)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
toListAndCloseIterator(view.backwardFetch(keyA));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new
SessionWindow(10, 10)), "2");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ }
+
+ @Test
+ public void shouldReadUncommittedViewFetchRangeMergesCacheAndStore() {
+ // keyA in store, keyB in cache
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
toListAndCloseIterator(view.fetch(keyA, keyB));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyB, new
SessionWindow(0, 0)), "2");
+ }
+
+ @Test
+ public void
shouldReadUncommittedViewBackwardFetchRangeMergesCacheAndStore() {
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)),
"2".getBytes());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
toListAndCloseIterator(view.backwardFetch(keyA, keyB));
+ assertEquals(2, results.size());
+ verifyWindowedKeyValue(results.get(0), new Windowed<>(keyB, new
SessionWindow(0, 0)), "2");
+ verifyWindowedKeyValue(results.get(1), new Windowed<>(keyA, new
SessionWindow(0, 0)), "1");
+ }
+
+ @Test
+ public void
shouldReadUncommittedViewFetchSessionReturnsNullForCachedTombstone() {
+ // Flush keyA to the underlying store, then remove it (cache holds
null tombstone).
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"store".getBytes());
+ cachingStore.commit(Map.of());
+ cachingStore.remove(new Windowed<>(keyA, new SessionWindow(0, 0)));
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ // Cache tombstone (null value) shadows the underlying "store" value.
+ assertNull(view.fetchSession(keyA, 0, 0));
+ }
+
+ @Test
+ public void shouldReadUncommittedViewFetchRangeWithNullBounds() {
+ cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)),
"1".getBytes());
+ cachingStore.put(new Windowed<>(keyB, new SessionWindow(0, 0)),
"2".getBytes());
+ cachingStore.commit(Map.of());
+
+ final ReadOnlySessionStore<Bytes, byte[]> view =
cachingStore.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ // null keyFrom/keyTo means unbounded — should return all sessions
without NPE.
+ final List<KeyValue<Windowed<Bytes>, byte[]>> results =
toListAndCloseIterator(view.fetch(null, null));
+ assertEquals(2, results.size());
+ }
+
private List<KeyValue<Windowed<Bytes>, byte[]>>
addSessionsUntilOverflow(final String... sessionIds) {
final Random random = new Random();
final List<KeyValue<Windowed<Bytes>, byte[]>> results = new
ArrayList<>();