This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8bda3061394 KAFKA-20277: Add session store with headers support for 
IQv2 (#21670)
8bda3061394 is described below

commit 8bda30613947bdd0c788f7273a103349e1643b75
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Mar 11 16:11:22 2026 -0400

    KAFKA-20277: Add session store with headers support for IQv2 (#21670)
    
    This PR is NOT about implementing IQv2s for header-store, but provides
    IQv2s for headers store through the session state stores. So the results
    do not contain the headers even though the headers are preserved in the
    headers state store.
    
    Part of KIP-1271.
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../internals/AbstractReadWriteDecorator.java      |  13 +++
 .../internals/MeteredSessionStoreWithHeaders.java  |  79 +++++++++++++++
 .../internals/RocksDBSessionStoreWithHeaders.java  |  26 ++++-
 .../RocksDBTimeOrderedSessionStoreWithHeaders.java |  26 ++++-
 .../MeteredSessionStoreWithHeadersTest.java        |  73 ++++++++++++++
 .../RocksDBSessionStoreWithHeadersTest.java        | 103 ++++++++++++++++++--
 ...ksDBTimeOrderedSessionStoreWithHeadersTest.java | 106 ++++++++++++++++++---
 .../internals/SessionStoreIteratorFacadeTest.java  |  91 ++++++++++++++++++
 8 files changed, 489 insertions(+), 28 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 3b3d57298fe..99a53656fa2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -22,9 +22,11 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -78,6 +80,8 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
             return new 
TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) 
store);
         } else if (store instanceof WindowStore) {
             return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) 
store);
+        } else if (store instanceof SessionStoreWithHeaders) {
+            return new 
SessionStoreWithHeadersReadWriteDecorator<>((SessionStoreWithHeaders<?, ?>) 
store);
         } else if (store instanceof SessionStore) {
             return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) 
store);
         } else {
@@ -284,6 +288,15 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
         }
     }
 
+    static class SessionStoreWithHeadersReadWriteDecorator<K, AGG>
+        extends SessionStoreReadWriteDecorator<K, AggregationWithHeaders<AGG>>
+        implements SessionStoreWithHeaders<K, AGG> {
+
+        SessionStoreWithHeadersReadWriteDecorator(final 
SessionStoreWithHeaders<K, AGG> inner) {
+            super(inner);
+        }
+    }
+
     static class SessionStoreReadWriteDecorator<K, AGG>
         extends AbstractReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
         implements SessionStore<K, AGG> {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index d0ba6fd8802..444d29834bb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -23,7 +23,15 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 
@@ -59,4 +67,75 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
         }
 
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        if (query instanceof WindowRangeQuery) {
+            final WindowRangeQuery<K, AGG> windowRangeQuery = 
(WindowRangeQuery<K, AGG>) query;
+            if (windowRangeQuery.getKey().isPresent()) {
+                result = runRangeQuery(query, positionBound, config);
+            } else {
+                result = QueryResult.forFailure(
+                    FailureReason.UNKNOWN_QUERY_TYPE,
+                    "This store (" + getClass() + ") doesn't know how to"
+                        + " execute the given query (" + query + ") because"
+                        + " SessionStores only support 
WindowRangeQuery.withKey."
+                        + " Contact the store maintainer if you need support"
+                        + " for a new query type."
+                );
+            }
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " with serdes "
+                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+            }
+        } else {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+        final WindowRangeQuery<K, AGG> typedQuery = (WindowRangeQuery<K, AGG>) 
query;
+        final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+            WindowRangeQuery.withKey(
+                Bytes.wrap(serdes.rawKey(typedQuery.getKey().get(), new 
RecordHeaders()))
+            );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult 
=
+            wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final MeteredWindowedKeyValueIterator<K, AGG> typedResult =
+                new MeteredWindowedKeyValueIterator<>(
+                    rawResult.getResult(),
+                    fetchSensor,
+                    iteratorDurationSensor,
+                    streamsMetrics,
+                    bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
+                    byteArray -> {
+                        final AggregationWithHeaders<AGG> awh =
+                            
serdes.valueDeserializer().deserialize(serdes.topic(), byteArray);
+                        return awh == null ? null : awh.aggregation();
+                    },
+                    time,
+                    numOpenIterators,
+                    openIterators
+                );
+            return (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
typedResult);
+        } else {
+            return (QueryResult<R>) rawResult;
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
index 75d0ed17eff..77fb98fabff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
@@ -25,8 +26,9 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
 /**
  * RocksDB-backed session store with support for record headers.
  * <p>
- * This store extends {@link RocksDBSessionStore} and overrides
- * {@code query()} to disable IQv2 for header-aware stores.
+ * This store extends {@link RocksDBSessionStore} and returns
+ * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * as IQv2 query handling is done at the metered layer.
  * <p>
  * The storage format for values is: 
[headersSize(varint)][headersBytes][aggregationBytes]
  *
@@ -39,7 +41,23 @@ class RocksDBSessionStoreWithHeaders extends 
RocksDBSessionStore implements Head
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
-        throw new UnsupportedOperationException("Querying stores with headers 
is not supported");
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+        final Position position = getPosition();
+
+        synchronized (position) {
+            result = QueryResult.forUnknownQueryType(query, this);
+
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + this.getClass() + " in " + 
(System.nanoTime() - start) + "ns"
+                );
+            }
+            result.setPosition(position.copy());
+        }
+        return result;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
index 2d5ab757804..aa513138d9f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
@@ -25,8 +26,9 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
 /**
  * RocksDB-backed time-ordered session store with support for record headers.
  * <p>
- * This store extends {@link RocksDBTimeOrderedSessionStore} and overrides
- * {@code query()} to disable IQv2 for header-aware stores.
+ * This store extends {@link RocksDBTimeOrderedSessionStore} and returns
+ * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * as IQv2 query handling is done at the metered layer.
  * <p>
  * The storage format for values is: 
[headersSize(varint)][headersBytes][aggregationBytes]
  *
@@ -39,7 +41,23 @@ class RocksDBTimeOrderedSessionStoreWithHeaders extends 
RocksDBTimeOrderedSessio
     }
 
     @Override
-    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
-        throw new UnsupportedOperationException("Querying stores with headers 
is not supported");
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+        final Position position = getPosition();
+
+        synchronized (position) {
+            result = QueryResult.forUnknownQueryType(query, this);
+
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + this.getClass() + " in " + 
(System.nanoTime() - start) + "ns"
+                );
+            }
+            result.setPosition(position.copy());
+        }
+        return result;
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
index aaab4310164..236f704e946 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
@@ -37,6 +37,12 @@ import 
org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -733,4 +739,71 @@ public class MeteredSessionStoreWithHeadersTest {
             // Expected
         }
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldHandleWindowRangeQueryWithKeyAndUnwrapHeaders() {
+        setUp();
+        init();
+
+        final Headers headers = new RecordHeaders();
+        headers.add("key1", "value1".getBytes());
+        final AggregationWithHeaders<String> valueAndHeaders = 
AggregationWithHeaders.make(VALUE, headers);
+
+        final AggregationWithHeadersSerializer<String> serializer = new 
AggregationWithHeadersSerializer<>(Serdes.String().serializer());
+        final byte[] serializedValue = serializer.serialize(CHANGELOG_TOPIC, 
valueAndHeaders);
+
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult 
=
+            QueryResult.forResult(new KeyValueIteratorStub<>(
+                Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
serializedValue)).iterator()));
+
+        when(innerStore.query(any(), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn((QueryResult) rawResult);
+
+        final WindowRangeQuery<String, String> query = 
WindowRangeQuery.withKey(KEY);
+        final QueryResult<KeyValueIterator<Windowed<String>, String>> result =
+            store.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertTrue(result.isSuccess());
+        final KeyValueIterator<Windowed<String>, String> iterator = 
result.getResult();
+        assertTrue(iterator.hasNext());
+        final KeyValue<Windowed<String>, String> next = iterator.next();
+        assertEquals(VALUE, next.value);
+        assertFalse(iterator.hasNext());
+        iterator.close();
+    }
+
+    @Test
+    public void shouldFailWindowRangeQueryWithoutKey() {
+        setUp();
+        init();
+
+        final WindowRangeQuery<String, String> query = 
WindowRangeQuery.withWindowStartRange(
+            java.time.Instant.ofEpochMilli(0L),
+            java.time.Instant.ofEpochMilli(0L)
+        );
+        final QueryResult<KeyValueIterator<Windowed<String>, String>> result =
+            store.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertTrue(result.isFailure());
+        assertEquals(FailureReason.UNKNOWN_QUERY_TYPE, 
result.getFailureReason());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldDelegateUnknownQueryToWrappedStore() {
+        setUp();
+        init();
+
+        final QueryResult<Void> expectedResult = QueryResult.forFailure(
+            FailureReason.UNKNOWN_QUERY_TYPE, "unknown");
+        when(innerStore.query(any(), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn((QueryResult) expectedResult);
+
+        final Query<Void> unknownQuery = new Query<Void>() { };
+        final QueryResult<Void> result =
+            store.query(unknownQuery, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertTrue(result.isFailure());
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
index 2f9c522f368..9acbdfef326 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
@@ -16,18 +16,107 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.File;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RocksDBSessionStoreWithHeadersTest {
 
+    private static final String STORE_NAME = "test-session-store";
+    private static final long RETENTION_PERIOD = 60_000L;
+    private static final long SEGMENT_INTERVAL = 30_000L;
+
+    private RocksDBSessionStoreWithHeaders sessionStore;
+    private InternalMockProcessorContext<String, String> context;
+    private File baseDir;
+
+    @BeforeEach
+    public void setUp() {
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        baseDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            baseDir,
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(props)
+        );
+
+        final SegmentedBytesStore segmentedBytesStore = new 
RocksDBSegmentedBytesStore(
+            STORE_NAME,
+            "test-metrics-scope",
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new SessionKeySchema()
+        );
+
+        sessionStore = new RocksDBSessionStoreWithHeaders(segmentedBytesStore);
+        sessionStore.init(context, sessionStore);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (sessionStore != null) {
+            sessionStore.close();
+        }
+    }
+
     @Test
-    public void shouldThrowUnsupportedOperationOnQuery() {
-        final RocksDBSessionStoreWithHeaders store = new 
RocksDBSessionStoreWithHeaders(
-            new RocksDBSegmentedBytesStore("test", "scope", 10_000L,
-                60_000L, new SessionKeySchema()));
-        assertThrows(UnsupportedOperationException.class,
-            () -> store.query(null, null, null));
+    public void shouldReturnUnknownQueryTypeForWindowRangeQuery() {
+        final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+            new Bytes("test-key".getBytes())
+        );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+            sessionStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertFalse(result.isSuccess());
+        assertEquals(FailureReason.UNKNOWN_QUERY_TYPE, 
result.getFailureReason());
+        assertNotNull(result.getPosition());
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoWhenRequested() {
+        final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+            new Bytes("test-key".getBytes())
+        );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+            sessionStore.query(query, PositionBound.unbounded(), new 
QueryConfig(true));
+
+        assertFalse(result.getExecutionInfo().isEmpty());
+        assertTrue(result.getExecutionInfo().get(0).contains("Handled in"));
+        assertTrue(result.getExecutionInfo().get(0).contains(
+            RocksDBSessionStoreWithHeaders.class.getName()));
+    }
+
+    @Test
+    public void shouldNotCollectExecutionInfoWhenNotRequested() {
+        final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+            new Bytes("test-key".getBytes())
+        );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+            sessionStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertTrue(result.getExecutionInfo().isEmpty());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
index 761058193f1..3f9c77bf30a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
@@ -16,27 +16,107 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.File;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RocksDBTimeOrderedSessionStoreWithHeadersTest {
 
+    private static final String STORE_NAME = "test-session-store";
+    private static final long RETENTION_PERIOD = 60_000L;
+    private static final long SEGMENT_INTERVAL = 30_000L;
+
+    private RocksDBTimeOrderedSessionStoreWithHeaders sessionStore;
+    private InternalMockProcessorContext<String, String> context;
+    private File baseDir;
+
+    @BeforeEach
+    public void setUp() {
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        baseDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            baseDir,
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(props)
+        );
+
+        sessionStore = new RocksDBTimeOrderedSessionStoreWithHeaders(
+            new RocksDBTimeOrderedSessionSegmentedBytesStore(
+                STORE_NAME,
+                "test-metrics-scope",
+                RETENTION_PERIOD,
+                SEGMENT_INTERVAL,
+                true
+            )
+        );
+        sessionStore.init(context, sessionStore);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (sessionStore != null) {
+            sessionStore.close();
+        }
+    }
+
     @Test
-    public void shouldThrowUnsupportedOperationOnQueryWithIndex() {
-        final RocksDBTimeOrderedSessionStoreWithHeaders store = new 
RocksDBTimeOrderedSessionStoreWithHeaders(
-            new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
-                10_000L, 60_000L, true));
-        assertThrows(UnsupportedOperationException.class,
-            () -> store.query(null, null, null));
+    public void shouldReturnUnknownQueryTypeForWindowRangeQuery() {
+        final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+            new Bytes("test-key".getBytes())
+        );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+            sessionStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertFalse(result.isSuccess());
+        assertEquals(FailureReason.UNKNOWN_QUERY_TYPE, 
result.getFailureReason());
+        assertNotNull(result.getPosition());
     }
 
     @Test
-    public void shouldThrowUnsupportedOperationOnQueryWithOutIndex() {
-        final RocksDBTimeOrderedSessionStoreWithHeaders store = new 
RocksDBTimeOrderedSessionStoreWithHeaders(
-            new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
-                10_000L, 60_000L, false));
-        assertThrows(UnsupportedOperationException.class,
-            () -> store.query(null, null, null));
+    public void shouldCollectExecutionInfoWhenRequested() {
+        final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+            new Bytes("test-key".getBytes())
+        );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+            sessionStore.query(query, PositionBound.unbounded(), new 
QueryConfig(true));
+
+        assertFalse(result.getExecutionInfo().isEmpty());
+        assertTrue(result.getExecutionInfo().get(0).contains("Handled in"));
+        assertTrue(result.getExecutionInfo().get(0).contains(
+            RocksDBTimeOrderedSessionStoreWithHeaders.class.getName()));
+    }
+
+    @Test
+    public void shouldNotCollectExecutionInfoWhenNotRequested() {
+        final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+            new Bytes("test-key".getBytes())
+        );
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+            sessionStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+        assertTrue(result.getExecutionInfo().isEmpty());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
new file mode 100644
index 00000000000..c1f35083051
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class SessionStoreIteratorFacadeTest {
+    @Mock
+    private KeyValueIterator<String, AggregationWithHeaders<String>> 
mockedInnerIterator;
+
+    private SessionStoreIteratorFacade<String, String> facade;
+
+    @BeforeEach
+    public void setup() {
+        facade = new SessionStoreIteratorFacade<>(mockedInnerIterator);
+    }
+
+    @Test
+    public void shouldForwardHasNext() {
+        when(mockedInnerIterator.hasNext()).thenReturn(true);
+        assertTrue(facade.hasNext());
+    }
+
+    @Test
+    public void shouldForwardPeekNextKey() {
+        when(mockedInnerIterator.peekNextKey()).thenReturn("key");
+        assertThat(facade.peekNextKey(), is("key"));
+    }
+
+    @Test
+    public void shouldReturnPlainKeyValuePairOnNext() {
+        final AggregationWithHeaders<String> aggregation =
+            AggregationWithHeaders.make("value", new RecordHeaders());
+        when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key", 
aggregation));
+        assertThat(facade.next(), is(KeyValue.pair("key", "value")));
+    }
+
+    @Test
+    public void shouldReturnNullValueWhenAggregationWithHeadersIsNull() {
+        when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key", 
null));
+        final KeyValue<String, String> result = facade.next();
+        assertThat(result.key, is("key"));
+        assertThat(result.value, is(nullValue()));
+    }
+
+    @Test
+    public void shouldReturnNullWhenInnerNextReturnsNull() {
+        when(mockedInnerIterator.next()).thenReturn(null);
+        assertThat(facade.next(), is(nullValue()));
+    }
+
+    @Test
+    public void shouldCloseInnerIterator() {
+        facade.close();
+        verify(mockedInnerIterator).close();
+    }
+}

Reply via email to