This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8bda3061394 KAFKA-20277: Add session store with headers support for
IQv2 (#21670)
8bda3061394 is described below
commit 8bda30613947bdd0c788f7273a103349e1643b75
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Mar 11 16:11:22 2026 -0400
KAFKA-20277: Add session store with headers support for IQv2 (#21670)
This PR is NOT about implementing IQv2s for header-store, but provides
IQv2s for headers store through the session state stores. So the results
do not contain the headers even though the headers are preserved in the
headers state store.
Part of KIP-1271.
Reviewers: Matthias Sax <[email protected]>
---
.../internals/AbstractReadWriteDecorator.java | 13 +++
.../internals/MeteredSessionStoreWithHeaders.java | 79 +++++++++++++++
.../internals/RocksDBSessionStoreWithHeaders.java | 26 ++++-
.../RocksDBTimeOrderedSessionStoreWithHeaders.java | 26 ++++-
.../MeteredSessionStoreWithHeadersTest.java | 73 ++++++++++++++
.../RocksDBSessionStoreWithHeadersTest.java | 103 ++++++++++++++++++--
...ksDBTimeOrderedSessionStoreWithHeadersTest.java | 106 ++++++++++++++++++---
.../internals/SessionStoreIteratorFacadeTest.java | 91 ++++++++++++++++++
8 files changed, 489 insertions(+), 28 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 3b3d57298fe..99a53656fa2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -22,9 +22,11 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -78,6 +80,8 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
return new
TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>)
store);
} else if (store instanceof WindowStore) {
return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>)
store);
+ } else if (store instanceof SessionStoreWithHeaders) {
+ return new
SessionStoreWithHeadersReadWriteDecorator<>((SessionStoreWithHeaders<?, ?>)
store);
} else if (store instanceof SessionStore) {
return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>)
store);
} else {
@@ -284,6 +288,15 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
}
}
+ static class SessionStoreWithHeadersReadWriteDecorator<K, AGG>
+ extends SessionStoreReadWriteDecorator<K, AggregationWithHeaders<AGG>>
+ implements SessionStoreWithHeaders<K, AGG> {
+
+ SessionStoreWithHeadersReadWriteDecorator(final
SessionStoreWithHeaders<K, AGG> inner) {
+ super(inner);
+ }
+ }
+
static class SessionStoreReadWriteDecorator<K, AGG>
extends AbstractReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
implements SessionStore<K, AGG> {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index d0ba6fd8802..444d29834bb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -23,7 +23,15 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.SessionStoreWithHeaders;
@@ -59,4 +67,75 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
}
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ if (query instanceof WindowRangeQuery) {
+ final WindowRangeQuery<K, AGG> windowRangeQuery =
(WindowRangeQuery<K, AGG>) query;
+ if (windowRangeQuery.getKey().isPresent()) {
+ result = runRangeQuery(query, positionBound, config);
+ } else {
+ result = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " SessionStores only support
WindowRangeQuery.withKey."
+ + " Contact the store maintainer if you need support"
+ + " for a new query type."
+ );
+ }
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " with serdes "
+ + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ }
+ } else {
+ result = wrapped().query(query, positionBound, config);
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final WindowRangeQuery<K, AGG> typedQuery = (WindowRangeQuery<K, AGG>)
query;
+ final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+ WindowRangeQuery.withKey(
+ Bytes.wrap(serdes.rawKey(typedQuery.getKey().get(), new
RecordHeaders()))
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult
=
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final MeteredWindowedKeyValueIterator<K, AGG> typedResult =
+ new MeteredWindowedKeyValueIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ streamsMetrics,
+ bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
+ byteArray -> {
+ final AggregationWithHeaders<AGG> awh =
+
serdes.valueDeserializer().deserialize(serdes.topic(), byteArray);
+ return awh == null ? null : awh.aggregation();
+ },
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ return (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+ } else {
+ return (QueryResult<R>) rawResult;
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
index 75d0ed17eff..77fb98fabff 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
@@ -25,8 +26,9 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
/**
* RocksDB-backed session store with support for record headers.
* <p>
- * This store extends {@link RocksDBSessionStore} and overrides
- * {@code query()} to disable IQv2 for header-aware stores.
+ * This store extends {@link RocksDBSessionStore} and returns
+ * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * as IQv2 query handling is done at the metered layer.
* <p>
* The storage format for values is:
[headersSize(varint)][headersBytes][aggregationBytes]
*
@@ -39,7 +41,23 @@ class RocksDBSessionStoreWithHeaders extends
RocksDBSessionStore implements Head
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
- throw new UnsupportedOperationException("Querying stores with headers
is not supported");
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+ final Position position = getPosition();
+
+ synchronized (position) {
+ result = QueryResult.forUnknownQueryType(query, this);
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + this.getClass() + " in " +
(System.nanoTime() - start) + "ns"
+ );
+ }
+ result.setPosition(position.copy());
+ }
+ return result;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
index 2d5ab757804..aa513138d9f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
@@ -25,8 +26,9 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
/**
* RocksDB-backed time-ordered session store with support for record headers.
* <p>
- * This store extends {@link RocksDBTimeOrderedSessionStore} and overrides
- * {@code query()} to disable IQv2 for header-aware stores.
+ * This store extends {@link RocksDBTimeOrderedSessionStore} and returns
+ * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * as IQv2 query handling is done at the metered layer.
* <p>
* The storage format for values is:
[headersSize(varint)][headersBytes][aggregationBytes]
*
@@ -39,7 +41,23 @@ class RocksDBTimeOrderedSessionStoreWithHeaders extends
RocksDBTimeOrderedSessio
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
- throw new UnsupportedOperationException("Querying stores with headers
is not supported");
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+ final Position position = getPosition();
+
+ synchronized (position) {
+ result = QueryResult.forUnknownQueryType(query, this);
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + this.getClass() + " in " +
(System.nanoTime() - start) + "ns"
+ );
+ }
+ result.setPosition(position.copy());
+ }
+ return result;
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
index aaab4310164..236f704e946 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
@@ -37,6 +37,12 @@ import
org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@@ -733,4 +739,71 @@ public class MeteredSessionStoreWithHeadersTest {
// Expected
}
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldHandleWindowRangeQueryWithKeyAndUnwrapHeaders() {
+ setUp();
+ init();
+
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ final AggregationWithHeaders<String> valueAndHeaders =
AggregationWithHeaders.make(VALUE, headers);
+
+ final AggregationWithHeadersSerializer<String> serializer = new
AggregationWithHeadersSerializer<>(Serdes.String().serializer());
+ final byte[] serializedValue = serializer.serialize(CHANGELOG_TOPIC,
valueAndHeaders);
+
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult
=
+ QueryResult.forResult(new KeyValueIteratorStub<>(
+ Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
serializedValue)).iterator()));
+
+ when(innerStore.query(any(), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn((QueryResult) rawResult);
+
+ final WindowRangeQuery<String, String> query =
WindowRangeQuery.withKey(KEY);
+ final QueryResult<KeyValueIterator<Windowed<String>, String>> result =
+ store.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertTrue(result.isSuccess());
+ final KeyValueIterator<Windowed<String>, String> iterator =
result.getResult();
+ assertTrue(iterator.hasNext());
+ final KeyValue<Windowed<String>, String> next = iterator.next();
+ assertEquals(VALUE, next.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+ }
+
+ @Test
+ public void shouldFailWindowRangeQueryWithoutKey() {
+ setUp();
+ init();
+
+ final WindowRangeQuery<String, String> query =
WindowRangeQuery.withWindowStartRange(
+ java.time.Instant.ofEpochMilli(0L),
+ java.time.Instant.ofEpochMilli(0L)
+ );
+ final QueryResult<KeyValueIterator<Windowed<String>, String>> result =
+ store.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertTrue(result.isFailure());
+ assertEquals(FailureReason.UNKNOWN_QUERY_TYPE,
result.getFailureReason());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldDelegateUnknownQueryToWrappedStore() {
+ setUp();
+ init();
+
+ final QueryResult<Void> expectedResult = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE, "unknown");
+ when(innerStore.query(any(), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn((QueryResult) expectedResult);
+
+ final Query<Void> unknownQuery = new Query<Void>() { };
+ final QueryResult<Void> result =
+ store.query(unknownQuery, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertTrue(result.isFailure());
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
index 2f9c522f368..9acbdfef326 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
@@ -16,18 +16,107 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.File;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class RocksDBSessionStoreWithHeadersTest {
+ private static final String STORE_NAME = "test-session-store";
+ private static final long RETENTION_PERIOD = 60_000L;
+ private static final long SEGMENT_INTERVAL = 30_000L;
+
+ private RocksDBSessionStoreWithHeaders sessionStore;
+ private InternalMockProcessorContext<String, String> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
+ final SegmentedBytesStore segmentedBytesStore = new
RocksDBSegmentedBytesStore(
+ STORE_NAME,
+ "test-metrics-scope",
+ RETENTION_PERIOD,
+ SEGMENT_INTERVAL,
+ new SessionKeySchema()
+ );
+
+ sessionStore = new RocksDBSessionStoreWithHeaders(segmentedBytesStore);
+ sessionStore.init(context, sessionStore);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (sessionStore != null) {
+ sessionStore.close();
+ }
+ }
+
@Test
- public void shouldThrowUnsupportedOperationOnQuery() {
- final RocksDBSessionStoreWithHeaders store = new
RocksDBSessionStoreWithHeaders(
- new RocksDBSegmentedBytesStore("test", "scope", 10_000L,
- 60_000L, new SessionKeySchema()));
- assertThrows(UnsupportedOperationException.class,
- () -> store.query(null, null, null));
+ public void shouldReturnUnknownQueryTypeForWindowRangeQuery() {
+ final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+ new Bytes("test-key".getBytes())
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ sessionStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertFalse(result.isSuccess());
+ assertEquals(FailureReason.UNKNOWN_QUERY_TYPE,
result.getFailureReason());
+ assertNotNull(result.getPosition());
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoWhenRequested() {
+ final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+ new Bytes("test-key".getBytes())
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ sessionStore.query(query, PositionBound.unbounded(), new
QueryConfig(true));
+
+ assertFalse(result.getExecutionInfo().isEmpty());
+ assertTrue(result.getExecutionInfo().get(0).contains("Handled in"));
+ assertTrue(result.getExecutionInfo().get(0).contains(
+ RocksDBSessionStoreWithHeaders.class.getName()));
+ }
+
+ @Test
+ public void shouldNotCollectExecutionInfoWhenNotRequested() {
+ final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+ new Bytes("test-key".getBytes())
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ sessionStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertTrue(result.getExecutionInfo().isEmpty());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
index 761058193f1..3f9c77bf30a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
@@ -16,27 +16,107 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.File;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class RocksDBTimeOrderedSessionStoreWithHeadersTest {
+ private static final String STORE_NAME = "test-session-store";
+ private static final long RETENTION_PERIOD = 60_000L;
+ private static final long SEGMENT_INTERVAL = 30_000L;
+
+ private RocksDBTimeOrderedSessionStoreWithHeaders sessionStore;
+ private InternalMockProcessorContext<String, String> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
+ sessionStore = new RocksDBTimeOrderedSessionStoreWithHeaders(
+ new RocksDBTimeOrderedSessionSegmentedBytesStore(
+ STORE_NAME,
+ "test-metrics-scope",
+ RETENTION_PERIOD,
+ SEGMENT_INTERVAL,
+ true
+ )
+ );
+ sessionStore.init(context, sessionStore);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (sessionStore != null) {
+ sessionStore.close();
+ }
+ }
+
@Test
- public void shouldThrowUnsupportedOperationOnQueryWithIndex() {
- final RocksDBTimeOrderedSessionStoreWithHeaders store = new
RocksDBTimeOrderedSessionStoreWithHeaders(
- new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
- 10_000L, 60_000L, true));
- assertThrows(UnsupportedOperationException.class,
- () -> store.query(null, null, null));
+ public void shouldReturnUnknownQueryTypeForWindowRangeQuery() {
+ final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+ new Bytes("test-key".getBytes())
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ sessionStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertFalse(result.isSuccess());
+ assertEquals(FailureReason.UNKNOWN_QUERY_TYPE,
result.getFailureReason());
+ assertNotNull(result.getPosition());
}
@Test
- public void shouldThrowUnsupportedOperationOnQueryWithOutIndex() {
- final RocksDBTimeOrderedSessionStoreWithHeaders store = new
RocksDBTimeOrderedSessionStoreWithHeaders(
- new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
- 10_000L, 60_000L, false));
- assertThrows(UnsupportedOperationException.class,
- () -> store.query(null, null, null));
+ public void shouldCollectExecutionInfoWhenRequested() {
+ final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+ new Bytes("test-key".getBytes())
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ sessionStore.query(query, PositionBound.unbounded(), new
QueryConfig(true));
+
+ assertFalse(result.getExecutionInfo().isEmpty());
+ assertTrue(result.getExecutionInfo().get(0).contains("Handled in"));
+ assertTrue(result.getExecutionInfo().get(0).contains(
+ RocksDBTimeOrderedSessionStoreWithHeaders.class.getName()));
+ }
+
+ @Test
+ public void shouldNotCollectExecutionInfoWhenNotRequested() {
+ final WindowRangeQuery<Bytes, byte[]> query = WindowRangeQuery.withKey(
+ new Bytes("test-key".getBytes())
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ sessionStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertTrue(result.getExecutionInfo().isEmpty());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
new file mode 100644
index 00000000000..c1f35083051
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class SessionStoreIteratorFacadeTest {
+ @Mock
+ private KeyValueIterator<String, AggregationWithHeaders<String>>
mockedInnerIterator;
+
+ private SessionStoreIteratorFacade<String, String> facade;
+
+ @BeforeEach
+ public void setup() {
+ facade = new SessionStoreIteratorFacade<>(mockedInnerIterator);
+ }
+
+ @Test
+ public void shouldForwardHasNext() {
+ when(mockedInnerIterator.hasNext()).thenReturn(true);
+ assertTrue(facade.hasNext());
+ }
+
+ @Test
+ public void shouldForwardPeekNextKey() {
+ when(mockedInnerIterator.peekNextKey()).thenReturn("key");
+ assertThat(facade.peekNextKey(), is("key"));
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairOnNext() {
+ final AggregationWithHeaders<String> aggregation =
+ AggregationWithHeaders.make("value", new RecordHeaders());
+ when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key",
aggregation));
+ assertThat(facade.next(), is(KeyValue.pair("key", "value")));
+ }
+
+ @Test
+ public void shouldReturnNullValueWhenAggregationWithHeadersIsNull() {
+ when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key",
null));
+ final KeyValue<String, String> result = facade.next();
+ assertThat(result.key, is("key"));
+ assertThat(result.value, is(nullValue()));
+ }
+
+ @Test
+ public void shouldReturnNullWhenInnerNextReturnsNull() {
+ when(mockedInnerIterator.next()).thenReturn(null);
+ assertThat(facade.next(), is(nullValue()));
+ }
+
+ @Test
+ public void shouldCloseInnerIterator() {
+ facade.close();
+ verify(mockedInnerIterator).close();
+ }
+}