This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 99611aa06f1 KAFKA-20496: Add isolation level to IQ dispatch (#22143)
99611aa06f1 is described below

commit 99611aa06f13f8ef36619e3f02d30aca3dcc2894
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 15 17:33:07 2026 +0100

    KAFKA-20496: Add isolation level to IQ dispatch (#22143)
    
    As part of KIP-892, interactive queries against transactional state
    stores need to distinguish between reading committed data only and
    reading data that may still be in an open transaction buffer. Without an
    isolation-level concept in the IQ path, there is no way to expose that
    distinction to callers.
    
    This change introduces the default.interactive.query.isolation.level
    StreamsConfig option and plumbs it through the IQ execution path. The
    level is attached to StateQueryRequest and QueryConfig so that each
    query carries its own isolation context. CompositeReadOnlyKeyValueStore,
    CompositeReadOnlySessionStore, and CompositeReadOnlyWindowStore are
    updated to forward the level when delegating reads to the underlying
    StateStoreProvider, and StoreQueryUtils routes it into the actual store
    query. The default remains READ_UNCOMMITTED to preserve existing
    behaviour; READ_COMMITTED will cause queries to skip any pending
    transaction data.
    
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   9 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  27 +++++
 .../apache/kafka/streams/query/QueryConfig.java    |  11 ++
 .../kafka/streams/query/StateQueryRequest.java     |  48 ++++++--
 .../kafka/streams/state/ReadOnlyKeyValueStore.java |  21 ++++
 .../kafka/streams/state/ReadOnlySessionStore.java  |   9 ++
 .../kafka/streams/state/ReadOnlyWindowStore.java   |   9 ++
 .../internals/CompositeReadOnlyKeyValueStore.java  |  39 ++++--
 .../internals/CompositeReadOnlySessionStore.java   |  43 +++++--
 .../internals/CompositeReadOnlyWindowStore.java    |  43 +++++--
 .../state/internals/QueryableStoreProvider.java    |   9 +-
 .../state/internals/StateStoreProvider.java        |  11 ++
 .../streams/state/internals/StoreQueryUtils.java   |  19 ++-
 .../state/internals/WrappingStoreProvider.java     |  16 ++-
 .../apache/kafka/streams/StreamsConfigTest.java    |  35 ++++++
 .../CompositeReadOnlyKeyValueStoreTest.java        |  48 ++++++++
 .../CompositeReadOnlySessionStoreTest.java         |  48 ++++++++
 .../CompositeReadOnlyWindowStoreTest.java          |  50 ++++++++
 .../internals/QueryableStoreProviderTest.java      |  26 +++-
 .../state/internals/ReadOnlyWindowStoreStub.java   |   8 ++
 .../state/internals/StoreQueryUtilsTest.java       | 132 +++++++++++++++++++++
 .../state/internals/WrappingStoreProviderTest.java |  17 +++
 .../org/apache/kafka/test/NoOpReadOnlyStore.java   |   8 ++
 .../kafka/test/ReadOnlySessionStoreStub.java       |   8 ++
 24 files changed, 650 insertions(+), 44 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5403dbdea18..b790f10e398 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1036,7 +1036,9 @@ public class KafkaStreams implements AutoCloseable {
             globalStreamThread.setStateListener(streamStateListener);
         }
 
-        queryableStoreProvider = new 
QueryableStoreProvider(globalStateStoreProvider);
+        queryableStoreProvider = new QueryableStoreProvider(
+            globalStateStoreProvider,
+            applicationConfigs::defaultInteractiveQueryIsolationLevel);
         for (int i = 1; i <= numStreamThreads; i++) {
             createAndAddStreamThread(cacheSizePerThread, i);
         }
@@ -2123,7 +2125,10 @@ public class KafkaStreams implements AutoCloseable {
                                     request.isRequireActive()
                                         ? PositionBound.unbounded()
                                         : request.getPositionBound(),
-                                    new 
QueryConfig(request.executionInfoEnabled())
+                                    new QueryConfig(
+                                        request.executionInfoEnabled(),
+                                        request.isolationLevel()
+                                            
.orElseGet(applicationConfigs::defaultInteractiveQueryIsolationLevel))
                                 );
                                 result.addResult(partition, r);
                             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 083ca82e549..a1c06e15221 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -533,6 +534,15 @@ public class StreamsConfig extends AbstractConfig {
     @Deprecated
     public static final String DEFAULT_DSL_STORE = ROCKS_DB;
 
+    /** {@code default.interactive.query.isolation.level} */
+    public static final String 
DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG = 
"default.interactive.query.isolation.level";
+    private static final String DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_DOC 
= "The default <code>IsolationLevel</code> used by interactive queries. " +
+        "Only meaningful when <code>" + "enable.transactional.statestores" + 
"</code> is <code>true</code>: " +
+        "<code>READ_UNCOMMITTED</code> reads include writes staged in the 
transaction buffer since the last commit; " +
+        "<code>READ_COMMITTED</code> reads skip the transaction buffer and 
return only committed data. " +
+        "IQv1 queries always use this value. IQv2 queries use this value as a 
default, but can override it per-query.";
+    public static final String 
DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_DEFAULT = 
IsolationLevel.READ_UNCOMMITTED.name();
+
     /** {@code dsl.store.suppliers.class } */
     public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = 
"dsl.store.suppliers.class";
     static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store 
implementations to plug in to DSL operators. Must implement the 
<code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
@@ -1157,6 +1167,14 @@ public class StreamsConfig extends AbstractConfig {
                     
ConfigDef.CaseInsensitiveValidString.in(DSL_STORE_FORMAT_DEFAULT, 
DSL_STORE_FORMAT_HEADERS),
                     Importance.LOW,
                     DSL_STORE_FORMAT_DOC)
+            .define(DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG,
+                    Type.STRING,
+                    DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_DEFAULT,
+                    ConfigDef.CaseInsensitiveValidString.in(
+                        IsolationLevel.READ_UNCOMMITTED.name(),
+                        IsolationLevel.READ_COMMITTED.name()),
+                    Importance.LOW,
+                    DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_DOC)
             .define(DEFAULT_CLIENT_SUPPLIER_CONFIG,
                     Type.CLASS,
                     DefaultKafkaClientSupplier.class.getName(),
@@ -2035,6 +2053,7 @@ public class StreamsConfig extends AbstractConfig {
      *
      * @return Map of the client tags.
      */
+
     @SuppressWarnings("WeakerAccess")
     public Map<String, String> getClientTags() {
         return 
originalsWithPrefix(CLIENT_TAG_PREFIX).entrySet().stream().collect(
@@ -2105,6 +2124,14 @@ public class StreamsConfig extends AbstractConfig {
             KafkaClientSupplier.class);
     }
 
+    /**
+     * Return the configured default {@link IsolationLevel} used by 
interactive queries.
+     */
+    public IsolationLevel defaultInteractiveQueryIsolationLevel() {
+        return IsolationLevel.valueOf(
+            
getString(DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
+    }
+
     /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of 
{@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde
      * class}.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/QueryConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/query/QueryConfig.java
index 89e263a6f31..259b67a2e98 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/QueryConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/QueryConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.query;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 
 /**
@@ -24,12 +25,22 @@ import 
org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 @Evolving
 public class QueryConfig {
     final boolean collectExecutionInfo;
+    final IsolationLevel isolationLevel;
 
     public QueryConfig(final boolean collectExecutionInfo) {
+        this(collectExecutionInfo, IsolationLevel.READ_UNCOMMITTED);
+    }
+
+    public QueryConfig(final boolean collectExecutionInfo, final 
IsolationLevel isolationLevel) {
         this.collectExecutionInfo = collectExecutionInfo;
+        this.isolationLevel = isolationLevel;
     }
 
     public boolean isCollectExecutionInfo() {
         return collectExecutionInfo;
     }
+
+    public IsolationLevel getIsolationLevel() {
+        return isolationLevel;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
index 5cdbf8bc8ce..e4574e9120b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryRequest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.query;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
 
 import java.util.Optional;
@@ -37,6 +38,7 @@ public class StateQueryRequest<R> {
     private final Query<R> query;
     private final boolean executionInfoEnabled;
     private final boolean requireActive;
+    private final Optional<IsolationLevel> isolationLevel;
 
     private StateQueryRequest(
         final String storeName,
@@ -44,7 +46,8 @@ public class StateQueryRequest<R> {
         final Optional<Set<Integer>> partitions,
         final Query<R> query,
         final boolean executionInfoEnabled,
-        final boolean requireActive) {
+        final boolean requireActive,
+        final Optional<IsolationLevel> isolationLevel) {
 
         this.storeName = storeName;
         this.position = position;
@@ -52,6 +55,7 @@ public class StateQueryRequest<R> {
         this.query = query;
         this.executionInfoEnabled = executionInfoEnabled;
         this.requireActive = requireActive;
+        this.isolationLevel = isolationLevel;
     }
 
     /**
@@ -71,7 +75,8 @@ public class StateQueryRequest<R> {
             partitions,
             query,
             executionInfoEnabled,
-            requireActive
+            requireActive,
+            isolationLevel
         );
     }
 
@@ -86,7 +91,8 @@ public class StateQueryRequest<R> {
             Optional.empty(),
             query,
             executionInfoEnabled,
-            requireActive
+            requireActive,
+            isolationLevel
         );
     }
 
@@ -103,7 +109,8 @@ public class StateQueryRequest<R> {
             Optional.of(Set.copyOf(partitions)),
             query,
             executionInfoEnabled,
-            requireActive
+            requireActive,
+            isolationLevel
         );
     }
 
@@ -118,7 +125,8 @@ public class StateQueryRequest<R> {
             partitions,
             query,
             true,
-            requireActive
+            requireActive,
+            isolationLevel
         );
     }
 
@@ -134,7 +142,24 @@ public class StateQueryRequest<R> {
             partitions,
             query,
             executionInfoEnabled,
-            true
+            true,
+            isolationLevel
+        );
+    }
+
+    /**
+     * Overrides the {@link IsolationLevel} for this query. When absent, the 
effective isolation
+     * level falls back to {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG}.
+     */
+    public StateQueryRequest<R> withIsolationLevel(final IsolationLevel 
isolationLevel) {
+        return new StateQueryRequest<>(
+            storeName,
+            position,
+            partitions,
+            query,
+            executionInfoEnabled,
+            requireActive,
+            Optional.of(isolationLevel)
         );
     }
 
@@ -195,6 +220,14 @@ public class StateQueryRequest<R> {
         return requireActive;
     }
 
+    /**
+     * The isolation level override for this query, if any. If empty, the 
effective isolation level
+     * comes from {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG}.
+     */
+    public Optional<IsolationLevel> isolationLevel() {
+        return isolationLevel;
+    }
+
     /**
      * A progressive builder interface for creating {@code StoreQueryRequest}s.
      */
@@ -216,7 +249,8 @@ public class StateQueryRequest<R> {
                 Optional.empty(), // default: all partitions
                 query, // the query is specified
                 false, // default: no execution info
-                false // default: don't require active
+                false, // default: don't require active
+                Optional.empty() // default: no isolation-level override
             );
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
index 9ced687bb43..7289f7ab03b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 
@@ -133,4 +134,24 @@ public interface ReadOnlyKeyValueStore<K, V> {
      * @throws InvalidStateStoreException if the store is not initialized
      */
     long approximateNumEntries();
+
+    /**
+     * Return a read-only view of this store bound to the given {@link 
IsolationLevel}.
+     * <p>
+     * For stores without a transaction buffer (i.e. when {@code 
enable.transactional.statestores}
+     * is {@code false}), the default implementation returns {@code this} — 
the isolation level has
+     * no observable effect. Transactional stores override this method to 
return a view that
+     * either consults the transaction buffer ({@code READ_UNCOMMITTED}) or 
reads directly from
+     * the underlying committed data ({@code READ_COMMITTED}).
+     * <p>
+     * Wrapping store implementations propagate {@code readOnly} to their 
inner store so that
+     * transformations such as serdes or metrics are applied on top of the 
isolation-level-aware
+     * view.
+     *
+     * @param isolationLevel the isolation level the returned view should use 
for reads.
+     * @return a read-only view of this store using the given isolation level.
+     */
+    default ReadOnlyKeyValueStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        return this;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
index 5a52f00f606..772f8d1b6cb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlySessionStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state;
 
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.kstream.Windowed;
 
 import java.time.Instant;
@@ -417,4 +418,12 @@ public interface ReadOnlySessionStore<K, AGG> {
         throw new UnsupportedOperationException(
             "This API is not supported by this implementation of 
ReadOnlySessionStore.");
     }
+
+    /**
+     * Return a read-only view of this store bound to the given {@link 
IsolationLevel}.
+     * See {@link ReadOnlyKeyValueStore#readOnly(IsolationLevel)} for 
semantics.
+     */
+    default ReadOnlySessionStore<K, AGG> readOnly(final IsolationLevel 
isolationLevel) {
+        return this;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
index 3bcb5ec8ddb..ceb0c43f3c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 
@@ -207,4 +208,12 @@ public interface ReadOnlyWindowStore<K, V> {
     default KeyValueIterator<Windowed<K>, V> backwardFetchAll(final Instant 
timeFrom, final Instant timeTo) throws IllegalArgumentException  {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Return a read-only view of this store bound to the given {@link 
IsolationLevel}.
+     * See {@link ReadOnlyKeyValueStore#readOnly(IsolationLevel)} for 
semantics.
+     */
+    default ReadOnlyWindowStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        return this;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
index c63d6dbe5be..ab5e40ad547 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * A wrapper over the underlying {@link ReadOnlyKeyValueStore}s found in a 
{@link
@@ -37,20 +39,35 @@ public class CompositeReadOnlyKeyValueStore<K, V> 
implements ReadOnlyKeyValueSto
     private final StateStoreProvider storeProvider;
     private final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType;
     private final String storeName;
+    private final IsolationLevel isolationOverride;
 
     public CompositeReadOnlyKeyValueStore(final StateStoreProvider 
storeProvider,
                                           final 
QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType,
                                           final String storeName) {
+        this(storeProvider, storeType, storeName, null);
+    }
+
+    private CompositeReadOnlyKeyValueStore(final StateStoreProvider 
storeProvider,
+                                           final 
QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType,
+                                           final String storeName,
+                                           final IsolationLevel 
isolationOverride) {
         this.storeProvider = storeProvider;
         this.storeType = storeType;
         this.storeName = storeName;
+        this.isolationOverride = isolationOverride;
+    }
+
+    @Override
+    public ReadOnlyKeyValueStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel");
+        return new CompositeReadOnlyKeyValueStore<>(storeProvider, storeType, 
storeName, isolationLevel);
     }
 
 
     @Override
     public V get(final K key) {
         Objects.requireNonNull(key);
-        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        final List<ReadOnlyKeyValueStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlyKeyValueStore<K, V> store : stores) {
             try {
                 final V result = store.get(key);
@@ -77,7 +94,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements 
ReadOnlyKeyValueSto
                 }
             }
         };
-        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        final List<ReadOnlyKeyValueStore<K, V>> stores = readOnlyStores();
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(stores.iterator(), 
nextIteratorFunction));
@@ -95,7 +112,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements 
ReadOnlyKeyValueSto
                 }
             }
         };
-        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        final List<ReadOnlyKeyValueStore<K, V>> stores = readOnlyStores();
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(stores.iterator(), 
nextIteratorFunction));
@@ -115,7 +132,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> 
implements ReadOnlyKeyValueSto
                 }
             }
         };
-        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        final List<ReadOnlyKeyValueStore<K, V>> stores = readOnlyStores();
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(stores.iterator(), 
nextIteratorFunction));
@@ -133,7 +150,7 @@ public class CompositeReadOnlyKeyValueStore<K, V> 
implements ReadOnlyKeyValueSto
                 }
             }
         };
-        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        final List<ReadOnlyKeyValueStore<K, V>> stores = readOnlyStores();
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(stores.iterator(), 
nextIteratorFunction));
@@ -151,15 +168,23 @@ public class CompositeReadOnlyKeyValueStore<K, V> 
implements ReadOnlyKeyValueSto
                 }
             }
         };
-        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        final List<ReadOnlyKeyValueStore<K, V>> stores = readOnlyStores();
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(stores.iterator(), 
nextIteratorFunction));
     }
 
+    private List<ReadOnlyKeyValueStore<K, V>> readOnlyStores() {
+        final IsolationLevel level =
+            isolationOverride != null ? isolationOverride : 
storeProvider.defaultIsolationLevel();
+        return storeProvider.stores(storeName, storeType).stream()
+            .map(s -> s.readOnly(level))
+            .collect(Collectors.toList());
+    }
+
     @Override
     public long approximateNumEntries() {
-        final List<ReadOnlyKeyValueStore<K, V>> stores = 
storeProvider.stores(storeName, storeType);
+        final List<ReadOnlyKeyValueStore<K, V>> stores = readOnlyStores();
         long total = 0;
         for (final ReadOnlyKeyValueStore<K, V> store : stores) {
             total += store.approximateNumEntries();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
index 7949437109f..2ffc8d33ec1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Wrapper over the underlying {@link ReadOnlySessionStore}s found in a {@link
@@ -33,13 +35,36 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
     private final StateStoreProvider storeProvider;
     private final QueryableStoreType<ReadOnlySessionStore<K, V>> 
queryableStoreType;
     private final String storeName;
+    private final IsolationLevel isolationOverride;
 
     public CompositeReadOnlySessionStore(final StateStoreProvider 
storeProvider,
                                          final 
QueryableStoreType<ReadOnlySessionStore<K, V>> queryableStoreType,
                                          final String storeName) {
+        this(storeProvider, queryableStoreType, storeName, null);
+    }
+
+    private CompositeReadOnlySessionStore(final StateStoreProvider 
storeProvider,
+                                          final 
QueryableStoreType<ReadOnlySessionStore<K, V>> queryableStoreType,
+                                          final String storeName,
+                                          final IsolationLevel 
isolationOverride) {
         this.storeProvider = storeProvider;
         this.queryableStoreType = queryableStoreType;
         this.storeName = storeName;
+        this.isolationOverride = isolationOverride;
+    }
+
+    @Override
+    public ReadOnlySessionStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel");
+        return new CompositeReadOnlySessionStore<>(storeProvider, 
queryableStoreType, storeName, isolationLevel);
+    }
+
+    private List<ReadOnlySessionStore<K, V>> readOnlyStores() {
+        final IsolationLevel level =
+            isolationOverride != null ? isolationOverride : 
storeProvider.defaultIsolationLevel();
+        return storeProvider.stores(storeName, queryableStoreType).stream()
+            .map(s -> s.readOnly(level))
+            .collect(Collectors.toList());
     }
 
     @Override
@@ -47,7 +72,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
                                                          final long 
earliestSessionEndTime,
                                                          final long 
latestSessionStartTime) {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlySessionStore<K, V>> stores = 
storeProvider.stores(storeName, queryableStoreType);
+        final List<ReadOnlySessionStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
                 final KeyValueIterator<Windowed<K>, V> result =
@@ -75,7 +100,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
                                                                  final long 
earliestSessionEndTime,
                                                                  final long 
latestSessionStartTime) {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlySessionStore<K, V>> stores = 
storeProvider.stores(storeName, queryableStoreType);
+        final List<ReadOnlySessionStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
                 final KeyValueIterator<Windowed<K>, V> result = 
store.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime);
@@ -101,7 +126,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
                                                          final K keyTo,
                                                          final long 
earliestSessionEndTime,
                                                          final long 
latestSessionStartTime) {
-        final List<ReadOnlySessionStore<K, V>> stores = 
storeProvider.stores(storeName, queryableStoreType);
+        final List<ReadOnlySessionStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
                 final KeyValueIterator<Windowed<K>, V> result =
@@ -128,7 +153,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
                                                                  final K keyTo,
                                                                  final long 
earliestSessionEndTime,
                                                                  final long 
latestSessionStartTime) {
-        final List<ReadOnlySessionStore<K, V>> stores = 
storeProvider.stores(storeName, queryableStoreType);
+        final List<ReadOnlySessionStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
                 final KeyValueIterator<Windowed<K>, V> result =
@@ -153,7 +178,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
     @Override
     public V fetchSession(final K key, final long earliestSessionEndTime, 
final long latestSessionStartTime) {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlySessionStore<K, V>> stores = 
storeProvider.stores(storeName, queryableStoreType);
+        final List<ReadOnlySessionStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
                 return store.fetchSession(key, earliestSessionEndTime, 
latestSessionStartTime);
@@ -172,7 +197,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
     @Override
     public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlySessionStore<K, V>> stores = 
storeProvider.stores(storeName, queryableStoreType);
+        final List<ReadOnlySessionStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
                 final KeyValueIterator<Windowed<K>, V> result = 
store.fetch(key);
@@ -194,7 +219,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlySessionStore<K, V>> stores = 
storeProvider.stores(storeName, queryableStoreType);
+        final List<ReadOnlySessionStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlySessionStore<K, V> store : stores) {
             try {
                 final KeyValueIterator<Windowed<K>, V> result = 
store.backwardFetch(key);
@@ -221,7 +246,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
             store -> store.fetch(keyFrom, keyTo);
         return new DelegatingPeekingKeyValueIterator<>(storeName,
                                                        new 
CompositeKeyValueIterator<>(
-                                                               
storeProvider.stores(storeName, queryableStoreType).iterator(),
+                                                               
readOnlyStores().iterator(),
                                                                
nextIteratorFunction));
     }
 
@@ -232,7 +257,7 @@ public class CompositeReadOnlySessionStore<K, V> implements 
ReadOnlySessionStore
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(
-                storeProvider.stores(storeName, queryableStoreType).iterator(),
+                readOnlyStores().iterator(),
                 nextIteratorFunction
             )
         );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
index c6b0b604f14..47c3db01b2f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -26,6 +27,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
 import java.time.Instant;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
@@ -36,19 +38,42 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
     private final QueryableStoreType<ReadOnlyWindowStore<K, V>> 
windowStoreType;
     private final String storeName;
     private final StateStoreProvider provider;
+    private final IsolationLevel isolationOverride;
 
     public CompositeReadOnlyWindowStore(final StateStoreProvider provider,
                                         final 
QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStoreType,
                                         final String storeName) {
+        this(provider, windowStoreType, storeName, null);
+    }
+
+    private CompositeReadOnlyWindowStore(final StateStoreProvider provider,
+                                         final 
QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStoreType,
+                                         final String storeName,
+                                         final IsolationLevel 
isolationOverride) {
         this.provider = provider;
         this.windowStoreType = windowStoreType;
         this.storeName = storeName;
+        this.isolationOverride = isolationOverride;
+    }
+
+    @Override
+    public ReadOnlyWindowStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel");
+        return new CompositeReadOnlyWindowStore<>(provider, windowStoreType, 
storeName, isolationLevel);
+    }
+
+    private List<ReadOnlyWindowStore<K, V>> readOnlyStores() {
+        final IsolationLevel level =
+            isolationOverride != null ? isolationOverride : 
provider.defaultIsolationLevel();
+        return provider.stores(storeName, windowStoreType).stream()
+            .map(s -> s.readOnly(level))
+            .collect(Collectors.toList());
     }
 
     @Override
     public V fetch(final K key, final long time) {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlyWindowStore<K, V>> stores = 
provider.stores(storeName, windowStoreType);
+        final List<ReadOnlyWindowStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
             try {
                 final V result = windowStore.fetch(key, time);
@@ -69,7 +94,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
                                         final Instant timeFrom,
                                         final Instant timeTo) {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlyWindowStore<K, V>> stores = 
provider.stores(storeName, windowStoreType);
+        final List<ReadOnlyWindowStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
             try {
                 final WindowStoreIterator<V> result = windowStore.fetch(key, 
timeFrom, timeTo);
@@ -92,7 +117,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
                                                 final Instant timeFrom,
                                                 final Instant timeTo) throws 
IllegalArgumentException {
         Objects.requireNonNull(key, "key can't be null");
-        final List<ReadOnlyWindowStore<K, V>> stores = 
provider.stores(storeName, windowStoreType);
+        final List<ReadOnlyWindowStore<K, V>> stores = readOnlyStores();
         for (final ReadOnlyWindowStore<K, V> windowStore : stores) {
             try {
                 final WindowStoreIterator<V> result = 
windowStore.backwardFetch(key, timeFrom, timeTo);
@@ -120,7 +145,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(
-                provider.stores(storeName, windowStoreType).iterator(),
+                readOnlyStores().iterator(),
                 nextIteratorFunction));
     }
 
@@ -134,7 +159,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(
-                provider.stores(storeName, windowStoreType).iterator(),
+                readOnlyStores().iterator(),
                 nextIteratorFunction));
     }
 
@@ -145,7 +170,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(
-                provider.stores(storeName, windowStoreType).iterator(),
+                readOnlyStores().iterator(),
                 nextIteratorFunction));
     }
 
@@ -156,7 +181,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(
-                provider.stores(storeName, windowStoreType).iterator(),
+                readOnlyStores().iterator(),
                 nextIteratorFunction));
     }
 
@@ -168,7 +193,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(
-                provider.stores(storeName, windowStoreType).iterator(),
+                readOnlyStores().iterator(),
                 nextIteratorFunction));
     }
 
@@ -180,7 +205,7 @@ public class CompositeReadOnlyWindowStore<K, V> implements 
ReadOnlyWindowStore<K
         return new DelegatingPeekingKeyValueIterator<>(
             storeName,
             new CompositeKeyValueIterator<>(
-                provider.stores(storeName, windowStoreType).iterator(),
+                readOnlyStores().iterator(),
                 nextIteratorFunction));
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 54e0c05dc39..3808aeccf15 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -23,6 +24,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 
 /**
  * A wrapper over all of the {@link StateStoreProvider}s in a Topology
@@ -34,10 +36,13 @@ public class QueryableStoreProvider {
     // map of StreamThread.name to StreamThreadStateStoreProvider
     private final Map<String, StreamThreadStateStoreProvider> storeProviders;
     private final GlobalStateStoreProvider globalStoreProvider;
+    private final Supplier<IsolationLevel> defaultIsolationLevel;
 
-    public QueryableStoreProvider(final GlobalStateStoreProvider 
globalStateStoreProvider) {
+    public QueryableStoreProvider(final GlobalStateStoreProvider 
globalStateStoreProvider,
+                                  final Supplier<IsolationLevel> 
defaultIsolationLevel) {
         this.storeProviders = new ConcurrentHashMap<>();
         this.globalStoreProvider = globalStateStoreProvider;
+        this.defaultIsolationLevel = defaultIsolationLevel;
     }
 
     /**
@@ -59,7 +64,7 @@ public class QueryableStoreProvider {
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
         return queryableStoreType.create(
-            new WrappingStoreProvider(storeProviders.values(), 
storeQueryParameters),
+            new WrappingStoreProvider(storeProviders.values(), 
storeQueryParameters, defaultIsolationLevel.get()),
             storeName
         );
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java
index bc140916e5c..746615641e1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -40,4 +41,14 @@ public interface StateStoreProvider {
      * @return  List of the instances of the store in this topology. Empty 
List if not found
      */
     <T> List<T> stores(String storeName, QueryableStoreType<T> 
queryableStoreType);
+
+    /**
+     * The default {@link IsolationLevel} that composite read-only stores 
apply when
+     * invoking typed read methods on the underlying stores. Defaults to
+     * {@link IsolationLevel#READ_UNCOMMITTED} for providers that don't have 
access to the
+     * Streams configuration.
+     */
+    default IsolationLevel defaultIsolationLevel() {
+        return IsolationLevel.READ_UNCOMMITTED;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 08a5bf8e8d5..ab23baa765b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -41,6 +41,9 @@ import org.apache.kafka.streams.query.WindowKeyQuery;
 import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
@@ -203,7 +206,8 @@ public final class StoreQueryUtils {
         if (!(store instanceof KeyValueStore)) {
             return QueryResult.forUnknownQueryType(query, store);
         }
-        final KeyValueStore<Bytes, byte[]> kvStore = (KeyValueStore<Bytes, 
byte[]>) store;
+        final ReadOnlyKeyValueStore<Bytes, byte[]> kvStore =
+            ((KeyValueStore<Bytes, byte[]>) 
store).readOnly(config.getIsolationLevel());
         final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes, 
byte[]>) query;
         final Optional<Bytes> lowerRange = rangeQuery.getLowerBound();
         final Optional<Bytes> upperRange = rangeQuery.getUpperBound();
@@ -238,8 +242,8 @@ public final class StoreQueryUtils {
 
         if (store instanceof KeyValueStore) {
             final KeyQuery<Bytes, byte[]> rawKeyQuery = (KeyQuery<Bytes, 
byte[]>) query;
-            final KeyValueStore<Bytes, byte[]> keyValueStore =
-                (KeyValueStore<Bytes, byte[]>) store;
+            final ReadOnlyKeyValueStore<Bytes, byte[]> keyValueStore =
+                ((KeyValueStore<Bytes, byte[]>) 
store).readOnly(config.getIsolationLevel());
             try {
                 final byte[] bytes = keyValueStore.get(rawKeyQuery.getKey());
                 return (QueryResult<R>) QueryResult.forResult(bytes);
@@ -263,7 +267,8 @@ public final class StoreQueryUtils {
         if (store instanceof WindowStore) {
             final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
                 (WindowKeyQuery<Bytes, byte[]>) query;
-            final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, 
byte[]>) store;
+            final ReadOnlyWindowStore<Bytes, byte[]> windowStore =
+                ((WindowStore<Bytes, byte[]>) 
store).readOnly(config.getIsolationLevel());
             try {
                 if (windowKeyQuery.getTimeFrom().isPresent() && 
windowKeyQuery.getTimeTo().isPresent()) {
                     final WindowStoreIterator<byte[]> iterator = 
windowStore.fetch(
@@ -299,7 +304,8 @@ public final class StoreQueryUtils {
         if (store instanceof WindowStore) {
             final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
                 (WindowRangeQuery<Bytes, byte[]>) query;
-            final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, 
byte[]>) store;
+            final ReadOnlyWindowStore<Bytes, byte[]> windowStore =
+                ((WindowStore<Bytes, byte[]>) 
store).readOnly(config.getIsolationLevel());
             try {
                 // There's no store API for open time ranges
                 if (windowRangeQuery.getTimeFrom().isPresent() && 
windowRangeQuery.getTimeTo().isPresent()) {
@@ -329,7 +335,8 @@ public final class StoreQueryUtils {
         } else if (store instanceof SessionStore) {
             final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
                 (WindowRangeQuery<Bytes, byte[]>) query;
-            final SessionStore<Bytes, byte[]> sessionStore = 
(SessionStore<Bytes, byte[]>) store;
+            final ReadOnlySessionStore<Bytes, byte[]> sessionStore =
+                ((SessionStore<Bytes, byte[]>) 
store).readOnly(config.getIsolationLevel());
             try {
                 if (windowRangeQuery.getKey().isPresent()) {
                     final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
sessionStore.fetch(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index 21cc21a21df..1b54b69b71c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
@@ -33,11 +34,24 @@ public class WrappingStoreProvider implements 
StateStoreProvider {
 
     private final Collection<StreamThreadStateStoreProvider> storeProviders;
     private StoreQueryParameters storeQueryParameters;
+    private final IsolationLevel defaultIsolationLevel;
+
+    public WrappingStoreProvider(final 
Collection<StreamThreadStateStoreProvider> storeProviders,
+                                 final StoreQueryParameters 
storeQueryParameters) {
+        this(storeProviders, storeQueryParameters, 
IsolationLevel.READ_UNCOMMITTED);
+    }
 
     WrappingStoreProvider(final Collection<StreamThreadStateStoreProvider> 
storeProviders,
-                          final StoreQueryParameters storeQueryParameters) {
+                          final StoreQueryParameters storeQueryParameters,
+                          final IsolationLevel defaultIsolationLevel) {
         this.storeProviders = storeProviders;
         this.storeQueryParameters = storeQueryParameters;
+        this.defaultIsolationLevel = defaultIsolationLevel;
+    }
+
+    @Override
+    public IsolationLevel defaultIsolationLevel() {
+        return defaultIsolationLevel;
     }
 
     //visible for testing
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 34db1ccc253..fa54bb6c78c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -1038,6 +1039,40 @@ public class StreamsConfigTest {
         assertEquals("HeAdErS", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
     }
 
+    @Test
+    public void 
shouldUseDefaultInteractiveQueryIsolationLevelWhenNotSpecified() {
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals(IsolationLevel.READ_UNCOMMITTED, 
config.defaultInteractiveQueryIsolationLevel());
+    }
+
+    @Test
+    public void shouldAcceptValidInteractiveQueryIsolationLevels() {
+        
props.put(StreamsConfig.DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG, 
"READ_UNCOMMITTED");
+        assertEquals(IsolationLevel.READ_UNCOMMITTED, new 
StreamsConfig(props).defaultInteractiveQueryIsolationLevel());
+
+        
props.put(StreamsConfig.DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG, 
"READ_COMMITTED");
+        assertEquals(IsolationLevel.READ_COMMITTED, new 
StreamsConfig(props).defaultInteractiveQueryIsolationLevel());
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionForInvalidInteractiveQueryIsolationLevel() {
+        
props.put(StreamsConfig.DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG, 
"FOO");
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Invalid value FOO for 
configuration default.interactive.query.isolation.level"));
+    }
+
+    @Test
+    public void shouldAcceptInteractiveQueryIsolationLevelCaseInsensitively() {
+        
props.put(StreamsConfig.DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
+        assertEquals(IsolationLevel.READ_UNCOMMITTED, new 
StreamsConfig(props).defaultInteractiveQueryIsolationLevel());
+
+        
props.put(StreamsConfig.DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG, 
"read_committed");
+        assertEquals(IsolationLevel.READ_COMMITTED, new 
StreamsConfig(props).defaultInteractiveQueryIsolationLevel());
+
+        
props.put(StreamsConfig.DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG, 
"Read_Committed");
+        assertEquals(IsolationLevel.READ_COMMITTED, new 
StreamsConfig(props).defaultInteractiveQueryIsolationLevel());
+    }
+
     @Test
     public void 
shouldSpecifyRocksdbDslSupplierWhenNotExplicitlyAddedToConfigs() {
         final Class<?> expectedDefaultStoreType = 
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index f3644f43537..c34a5f245fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -45,6 +46,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.kafka.test.StreamsTestUtils.toListAndCloseIterator;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -521,6 +523,52 @@ public class CompositeReadOnlyKeyValueStoreTest {
         assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries());
     }
 
+    @Test
+    public void readOnlyShouldReturnNewInstanceWithOverride() {
+        final CompositeReadOnlyKeyValueStore<String, String> override =
+            (CompositeReadOnlyKeyValueStore<String, String>) 
theStore.readOnly(IsolationLevel.READ_COMMITTED);
+        assertNotSame(theStore, override);
+    }
+
+    @Test
+    public void readOnlyShouldPropagateLevelToUnderlyingStore() {
+        final StateStoreProviderStub stub = new StateStoreProviderStub(false);
+        final NoOpReadOnlyStore<String, String> recorder = new 
NoOpReadOnlyStore<>();
+        stub.addStore(storeName, recorder);
+        final CompositeReadOnlyKeyValueStore<String, String> store = new 
CompositeReadOnlyKeyValueStore<>(
+            new WrappingStoreProvider(singletonList(stub), 
StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore())),
+            QueryableStoreTypes.keyValueStore(),
+            storeName
+        );
+
+        store.readOnly(IsolationLevel.READ_COMMITTED).get("k");
+
+        assertEquals(IsolationLevel.READ_COMMITTED, recorder.isolationLevel);
+    }
+
+    @Test
+    public void readOnlyOverrideShouldBeatProviderDefault() {
+        final StateStoreProviderStub stub = new StateStoreProviderStub(false);
+        final NoOpReadOnlyStore<String, String> recorder = new 
NoOpReadOnlyStore<>();
+        stub.addStore(storeName, recorder);
+        final CompositeReadOnlyKeyValueStore<String, String> store = new 
CompositeReadOnlyKeyValueStore<>(
+            new WrappingStoreProvider(singletonList(stub),
+                StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore()),
+                IsolationLevel.READ_UNCOMMITTED),
+            QueryableStoreTypes.keyValueStore(),
+            storeName
+        );
+
+        store.readOnly(IsolationLevel.READ_COMMITTED).get("k");
+
+        assertEquals(IsolationLevel.READ_COMMITTED, recorder.isolationLevel);
+    }
+
+    @Test
+    public void readOnlyShouldRejectNullLevel() {
+        assertThrows(NullPointerException.class, () -> 
theStore.readOnly(null));
+    }
+
     private CompositeReadOnlyKeyValueStore<Object, Object> rebalancing() {
         return new CompositeReadOnlyKeyValueStore<>(
             new WrappingStoreProvider(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index 59d6ad1e175..52da2f2887f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -41,6 +42,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -195,4 +197,50 @@ public class CompositeReadOnlySessionStoreTest {
     public void shouldThrowNPEIfKeyIsNull() {
         assertThrows(NullPointerException.class, () -> 
underlyingSessionStore.fetch(null));
     }
+
+    @Test
+    public void readOnlyShouldReturnNewInstanceWithOverride() {
+        final CompositeReadOnlySessionStore<String, Long> override =
+            (CompositeReadOnlySessionStore<String, Long>) 
sessionStore.readOnly(IsolationLevel.READ_COMMITTED);
+        assertNotSame(sessionStore, override);
+    }
+
+    @Test
+    public void readOnlyShouldPropagateLevelToUnderlyingStore() {
+        final StateStoreProviderStub stub = new StateStoreProviderStub(false);
+        final ReadOnlySessionStoreStub<String, Long> recorder = new 
ReadOnlySessionStoreStub<>();
+        stub.addStore(storeName, recorder);
+        final CompositeReadOnlySessionStore<String, Long> store = new 
CompositeReadOnlySessionStore<>(
+            new WrappingStoreProvider(Arrays.asList(stub), 
StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.sessionStore())),
+            QueryableStoreTypes.sessionStore(),
+            storeName
+        );
+
+        
toListAndCloseIterator(store.readOnly(IsolationLevel.READ_COMMITTED).fetch("k"));
+
+        assertEquals(IsolationLevel.READ_COMMITTED, recorder.isolationLevel);
+    }
+
+    @Test
+    public void readOnlyOverrideShouldBeatProviderDefault() {
+        final StateStoreProviderStub stub = new StateStoreProviderStub(false);
+        final ReadOnlySessionStoreStub<String, Long> recorder = new 
ReadOnlySessionStoreStub<>();
+        stub.addStore(storeName, recorder);
+        final CompositeReadOnlySessionStore<String, Long> store = new 
CompositeReadOnlySessionStore<>(
+            new WrappingStoreProvider(Arrays.asList(stub),
+                StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.sessionStore()),
+                IsolationLevel.READ_UNCOMMITTED),
+            QueryableStoreTypes.sessionStore(),
+            storeName
+        );
+
+        
toListAndCloseIterator(store.readOnly(IsolationLevel.READ_COMMITTED).fetch("k"));
+
+        assertEquals(IsolationLevel.READ_COMMITTED, recorder.isolationLevel);
+    }
+
+    @Test
+    public void readOnlyShouldRejectNullLevel() {
+        assertThrows(NullPointerException.class, () -> 
sessionStore.readOnly(null));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 2d22e6e15a7..4f453581eb5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -46,6 +47,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -524,4 +526,52 @@ public class CompositeReadOnlyWindowStoreTest {
     public void shouldThrowNPEIfKeyIsNull() {
         assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 
ofEpochMilli(0), ofEpochMilli(0)));
     }
+
+    @Test
+    public void readOnlyShouldReturnNewInstanceWithOverride() {
+        final CompositeReadOnlyWindowStore<String, String> override =
+            (CompositeReadOnlyWindowStore<String, String>) 
windowStore.readOnly(IsolationLevel.READ_COMMITTED);
+        assertNotSame(windowStore, override);
+    }
+
+    @Test
+    public void readOnlyShouldPropagateLevelToUnderlyingStore() {
+        final StateStoreProviderStub stub = new StateStoreProviderStub(false);
+        final ReadOnlyWindowStoreStub<String, String> recorder = new 
ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stub.addStore(storeName, recorder);
+        final CompositeReadOnlyWindowStore<String, String> store = new 
CompositeReadOnlyWindowStore<>(
+            new WrappingStoreProvider(asList(stub), 
StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.windowStore())),
+            QueryableStoreTypes.windowStore(),
+            storeName
+        );
+
+        StreamsTestUtils.toListAndCloseIterator(
+            store.readOnly(IsolationLevel.READ_COMMITTED).fetch("k", 
ofEpochMilli(0), ofEpochMilli(0)));
+
+        assertEquals(IsolationLevel.READ_COMMITTED, recorder.isolationLevel);
+    }
+
+    @Test
+    public void readOnlyOverrideShouldBeatProviderDefault() {
+        final StateStoreProviderStub stub = new StateStoreProviderStub(false);
+        final ReadOnlyWindowStoreStub<String, String> recorder = new 
ReadOnlyWindowStoreStub<>(WINDOW_SIZE);
+        stub.addStore(storeName, recorder);
+        final CompositeReadOnlyWindowStore<String, String> store = new 
CompositeReadOnlyWindowStore<>(
+            new WrappingStoreProvider(asList(stub),
+                StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.windowStore()),
+                IsolationLevel.READ_UNCOMMITTED),
+            QueryableStoreTypes.windowStore(),
+            storeName
+        );
+
+        StreamsTestUtils.toListAndCloseIterator(
+            store.readOnly(IsolationLevel.READ_COMMITTED).fetch("k", 
ofEpochMilli(0), ofEpochMilli(0)));
+
+        assertEquals(IsolationLevel.READ_COMMITTED, recorder.isolationLevel);
+    }
+
+    @Test
+    public void readOnlyShouldRejectNullLevel() {
+        assertThrows(NullPointerException.class, () -> 
windowStore.readOnly(null));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 7cfec2a2a42..4a0d0bb7591 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
@@ -29,9 +30,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -53,7 +56,8 @@ public class QueryableStoreProviderTest {
         globalStateStores = new HashMap<>();
         storeProvider =
             new QueryableStoreProvider(
-                new GlobalStateStoreProvider(globalStateStores)
+                new GlobalStateStoreProvider(globalStateStores),
+                () -> org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED
             );
         storeProvider.addStoreProviderForThread("thread1", theStoreProvider);
     }
@@ -103,6 +107,26 @@ public class QueryableStoreProviderTest {
         
assertNotNull(storeProvider.store(StoreQueryParameters.fromNameAndType(keyValueStore,
 QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 
1)));
     }
 
+    @Test
+    public void shouldResolveDefaultIsolationLevelAtQueryTime() {
+        final NoOpReadOnlyStore<Object, Object> kvStore = new 
NoOpReadOnlyStore<>();
+        final StateStoreProviderStub threadProvider = new 
StateStoreProviderStub(false);
+        threadProvider.addStore(keyValueStore, kvStore);
+
+        final AtomicReference<IsolationLevel> levelRef = new 
AtomicReference<>(IsolationLevel.READ_UNCOMMITTED);
+        final QueryableStoreProvider provider = new QueryableStoreProvider(
+            new GlobalStateStoreProvider(new HashMap<>()),
+            levelRef::get
+        );
+        provider.addStoreProviderForThread("thread-iso", threadProvider);
+
+        levelRef.set(IsolationLevel.READ_COMMITTED);
+
+        provider.store(StoreQueryParameters.fromNameAndType(keyValueStore, 
QueryableStoreTypes.keyValueStore())).get("k");
+
+        assertEquals(IsolationLevel.READ_COMMITTED, kvStore.isolationLevel);
+    }
+
     @Test
     public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() {
         final int partition = numStateStorePartitions + 1;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index fd8ff4eb5db..cdd56c4b1d8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -48,6 +49,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
     private final long windowSize;
     private final NavigableMap<Long, NavigableMap<K, V>> data = new 
TreeMap<>();
     private boolean open = true;
+    public IsolationLevel isolationLevel;
 
     ReadOnlyWindowStoreStub(final long windowSize) {
         this.windowSize = windowSize;
@@ -408,6 +410,12 @@ public class ReadOnlyWindowStoreStub<K, V> implements 
ReadOnlyWindowStore<K, V>,
         this.open = open;
     }
 
+    @Override
+    public ReadOnlyWindowStore<K, V> readOnly(final IsolationLevel level) {
+        this.isolationLevel = level;
+        return this;
+    }
+
     private static class TheWindowStoreIterator<E> implements 
WindowStoreIterator<E> {
 
         private final Iterator<KeyValue<Long, E>> underlying;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQueryUtilsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQueryUtilsTest.java
index f2c9ea42c10..e13408b1e0c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQueryUtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreQueryUtilsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.query.FailureReason;
@@ -24,13 +26,26 @@ import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
 
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import java.time.Instant;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class StoreQueryUtilsTest {
 
@@ -84,4 +99,121 @@ public class StoreQueryUtilsTest {
                    + " 
PositionBound{position=Position{position={topic={0=1}}}}")
         );
     }
+
+    @Test
+    public void shouldPassIsolationLevelToReadOnlyForRangeQuery() {
+        @SuppressWarnings("unchecked") final KeyValueStore<Bytes, byte[]> 
store =
+            Mockito.mock(KeyValueStore.class);
+        @SuppressWarnings("unchecked") final ReadOnlyKeyValueStore<Bytes, 
byte[]> readOnlyView =
+            Mockito.mock(ReadOnlyKeyValueStore.class);
+        when(store.readOnly(any())).thenReturn(readOnlyView);
+        
when(readOnlyView.all()).thenReturn(Mockito.mock(org.apache.kafka.streams.state.KeyValueIterator.class));
+        final StateStoreContext context = 
Mockito.mock(StateStoreContext.class);
+        when(context.taskId()).thenReturn(new TaskId(0, 0));
+
+        StoreQueryUtils.handleBasicQueries(
+            RangeQuery.withNoBounds(),
+            PositionBound.unbounded(),
+            new QueryConfig(false, IsolationLevel.READ_COMMITTED),
+            store,
+            Position.emptyPosition(),
+            context
+        );
+
+        verify(store).readOnly(IsolationLevel.READ_COMMITTED);
+    }
+
+    @Test
+    public void shouldPassIsolationLevelToReadOnlyForKeyQuery() {
+        @SuppressWarnings("unchecked") final KeyValueStore<Bytes, byte[]> 
store =
+            Mockito.mock(KeyValueStore.class);
+        @SuppressWarnings("unchecked") final ReadOnlyKeyValueStore<Bytes, 
byte[]> readOnlyView =
+            Mockito.mock(ReadOnlyKeyValueStore.class);
+        when(store.readOnly(any())).thenReturn(readOnlyView);
+        final StateStoreContext context = 
Mockito.mock(StateStoreContext.class);
+        when(context.taskId()).thenReturn(new TaskId(0, 0));
+
+        StoreQueryUtils.handleBasicQueries(
+            KeyQuery.withKey(Bytes.wrap(new byte[]{1})),
+            PositionBound.unbounded(),
+            new QueryConfig(false, IsolationLevel.READ_COMMITTED),
+            store,
+            Position.emptyPosition(),
+            context
+        );
+
+        verify(store).readOnly(IsolationLevel.READ_COMMITTED);
+    }
+
+    @Test
+    public void shouldPassIsolationLevelToReadOnlyForWindowKeyQuery() {
+        @SuppressWarnings("unchecked") final WindowStore<Bytes, byte[]> store =
+            Mockito.mock(WindowStore.class);
+        @SuppressWarnings("unchecked") final ReadOnlyWindowStore<Bytes, 
byte[]> readOnlyView =
+            Mockito.mock(ReadOnlyWindowStore.class);
+        when(store.readOnly(any())).thenReturn(readOnlyView);
+        when(readOnlyView.fetch(any(), any(Instant.class), any(Instant.class)))
+            
.thenReturn(Mockito.mock(org.apache.kafka.streams.state.WindowStoreIterator.class));
+        final StateStoreContext context = 
Mockito.mock(StateStoreContext.class);
+        when(context.taskId()).thenReturn(new TaskId(0, 0));
+
+        StoreQueryUtils.handleBasicQueries(
+            WindowKeyQuery.withKeyAndWindowStartRange(Bytes.wrap(new 
byte[]{1}), Instant.ofEpochMilli(0), Instant.ofEpochMilli(100)),
+            PositionBound.unbounded(),
+            new QueryConfig(false, IsolationLevel.READ_COMMITTED),
+            store,
+            Position.emptyPosition(),
+            context
+        );
+
+        verify(store).readOnly(IsolationLevel.READ_COMMITTED);
+    }
+
+    @Test
+    public void shouldPassIsolationLevelToReadOnlyForWindowRangeQuery() {
+        @SuppressWarnings("unchecked") final WindowStore<Bytes, byte[]> store =
+            Mockito.mock(WindowStore.class);
+        @SuppressWarnings("unchecked") final ReadOnlyWindowStore<Bytes, 
byte[]> readOnlyView =
+            Mockito.mock(ReadOnlyWindowStore.class);
+        when(store.readOnly(any())).thenReturn(readOnlyView);
+        when(readOnlyView.fetchAll(any(), any()))
+            
.thenReturn(Mockito.mock(org.apache.kafka.streams.state.KeyValueIterator.class));
+        final StateStoreContext context = 
Mockito.mock(StateStoreContext.class);
+        when(context.taskId()).thenReturn(new TaskId(0, 0));
+
+        StoreQueryUtils.handleBasicQueries(
+            WindowRangeQuery.withWindowStartRange(Instant.ofEpochMilli(0), 
Instant.ofEpochMilli(100)),
+            PositionBound.unbounded(),
+            new QueryConfig(false, IsolationLevel.READ_COMMITTED),
+            store,
+            Position.emptyPosition(),
+            context
+        );
+
+        verify(store).readOnly(IsolationLevel.READ_COMMITTED);
+    }
+
+    @Test
+    public void shouldPassIsolationLevelToReadOnlyForSessionWindowRangeQuery() 
{
+        @SuppressWarnings("unchecked") final SessionStore<Bytes, byte[]> store 
=
+            Mockito.mock(SessionStore.class);
+        @SuppressWarnings("unchecked") final ReadOnlySessionStore<Bytes, 
byte[]> readOnlyView =
+            Mockito.mock(ReadOnlySessionStore.class);
+        when(store.readOnly(any())).thenReturn(readOnlyView);
+        when(readOnlyView.fetch(any(Bytes.class)))
+            
.thenReturn(Mockito.mock(org.apache.kafka.streams.state.KeyValueIterator.class));
+        final StateStoreContext context = 
Mockito.mock(StateStoreContext.class);
+        when(context.taskId()).thenReturn(new TaskId(0, 0));
+
+        StoreQueryUtils.handleBasicQueries(
+            WindowRangeQuery.withKey(Bytes.wrap(new byte[]{1})),
+            PositionBound.unbounded(),
+            new QueryConfig(false, IsolationLevel.READ_COMMITTED),
+            store,
+            Position.emptyPosition(),
+            context
+        );
+
+        verify(store).readOnly(IsolationLevel.READ_COMMITTED);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index ab868b672d8..a2908a9306e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -99,6 +100,22 @@ public class WrappingStoreProviderTest {
         assertEquals(numStateStorePartitions, results.size());
     }
 
+    @Test
+    public void shouldDefaultToReadUncommittedWhenIsolationLevelNotSpecified() 
{
+        assertEquals(IsolationLevel.READ_UNCOMMITTED, 
wrappingStoreProvider.defaultIsolationLevel());
+    }
+
+    @Test
+    public void shouldPropagateConfiguredDefaultIsolationLevel() {
+        final StateStoreProviderStub stub = new StateStoreProviderStub(false);
+        final WrappingStoreProvider provider = new WrappingStoreProvider(
+            Arrays.asList(stub),
+            StoreQueryParameters.fromNameAndType("kv", 
QueryableStoreTypes.keyValueStore()),
+            IsolationLevel.READ_COMMITTED
+        );
+        assertEquals(IsolationLevel.READ_COMMITTED, 
provider.defaultIsolationLevel());
+    }
+
     @Test
     public void shouldReturnSingleStoreWhenQueryWithPartition() {
         
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv",
 QueryableStoreTypes.<String, 
String>keyValueStore()).withPartition(numStateStorePartitions - 1));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index a71c20c648a..bc8d5894e2f 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -35,6 +36,7 @@ public class NoOpReadOnlyStore<K, V> implements 
ReadOnlyKeyValueStore<K, V>, Sta
     private boolean open = true;
     public boolean initialized;
     public boolean committed;
+    public IsolationLevel isolationLevel;
 
     public NoOpReadOnlyStore() {
         this("", false);
@@ -124,4 +126,10 @@ public class NoOpReadOnlyStore<K, V> implements 
ReadOnlyKeyValueStore<K, V>, Sta
         throw new UnsupportedOperationException("Position handling not 
implemented");
     }
 
+    @Override
+    public ReadOnlyKeyValueStore<K, V> readOnly(final IsolationLevel level) {
+        this.isolationLevel = level;
+        return this;
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java 
b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
index 3a8be4e34e1..d381f82e37d 100644
--- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -37,6 +38,7 @@ import java.util.TreeMap;
 public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, 
V>, StateStore {
     private final NavigableMap<K, List<KeyValue<Windowed<K>, V>>> sessions = 
new TreeMap<>();
     private boolean open = true;
+    public IsolationLevel isolationLevel;
 
     public void put(final Windowed<K> sessionKey, final V value) {
         if (!sessions.containsKey(sessionKey.key())) {
@@ -217,4 +219,10 @@ public class ReadOnlySessionStoreStub<K, V> implements 
ReadOnlySessionStore<K, V
     public void setOpen(final boolean open) {
         this.open = open;
     }
+
+    @Override
+    public ReadOnlySessionStore<K, V> readOnly(final IsolationLevel level) {
+        this.isolationLevel = level;
+        return this;
+    }
 }

Reply via email to