This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4752b4b83a KAFKA-20185: Verify existing IQv2 methods for
MeteredTimestampedKeyValueStoreWithHeaders (#21643)
a4752b4b83a is described below
commit a4752b4b83a098c4a3bc19e681964c94e68c3bbf
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Mar 9 18:08:45 2026 +0100
KAFKA-20185: Verify existing IQv2 methods for
MeteredTimestampedKeyValueStoreWithHeaders (#21643)
This PR is NOT about implementing IQv2s for header-store, but provides
IQv2s for headers store through the timestamped state stores. So the
results do not contain the headers even though the headers are preserved
in the headers state store.
Part of KIP-1271.
Reviewers: Matthias J. Sax <[email protected]>
---
.../state/internals/CachingKeyValueStore.java | 8 -
...MeteredTimestampedKeyValueStoreWithHeaders.java | 289 ++++++++++-
.../RocksDBTimestampedStoreWithHeaders.java | 28 +-
...TimestampedKeyValueStoreBuilderWithHeaders.java | 12 -
.../TimestampedToHeadersStoreAdapter.java | 43 +-
.../RocksDBTimestampedStoreWithHeadersTest.java | 84 +++-
...stampedKeyValueStoreBuilderWithHeadersTest.java | 556 +++++++++++++++++++--
7 files changed, 917 insertions(+), 103 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 0e31ef87fa7..4f78d49d060 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -129,11 +129,6 @@ public class CachingKeyValueStore
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
-
- if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
- }
-
final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
final QueryResult<R> result;
@@ -177,9 +172,6 @@ public class CachingKeyValueStore
final Position mergedPosition,
final PositionBound positionBound,
final QueryConfig config) {
- if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
- }
QueryResult<R> result = null;
final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>)
query;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index df36c04b2d5..12bb9501700 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -18,22 +18,35 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
-import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.query.TimestampedRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import java.util.Map;
import java.util.Objects;
+import java.util.function.Function;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -59,6 +72,27 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
super(inner, metricScope, time, keySerde, valueSerde);
}
+ @SuppressWarnings("rawtypes")
+ private final Map<Class, StoreQueryUtils.QueryHandler> queryHandlers =
+ mkMap(
+ mkEntry(
+ KeyQuery.class,
+ (query, positionBound, config, store) -> runKeyQuery(query,
positionBound, config)
+ ),
+ mkEntry(
+ TimestampedKeyQuery.class,
+ (query, positionBound, config, store) ->
runTimestampedKeyQuery(query, positionBound, config)
+ ),
+ mkEntry(
+ RangeQuery.class,
+ (query, positionBound, config, store) -> runRangeQuery(query,
positionBound, config)
+ ),
+ mkEntry(
+ TimestampedRangeQuery.class,
+ (query, positionBound, config, store) ->
runTimestampedRangeQuery(query, positionBound, config)
+ )
+ );
+
@SuppressWarnings("unchecked")
@Override
protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final
Serde<ValueTimestampHeaders<V>> valueSerde,
@@ -86,7 +120,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
@Override
public ValueTimestampHeaders<V> putIfAbsent(final K key,
- final ValueTimestampHeaders<V> value) {
+ final ValueTimestampHeaders<V>
value) {
Objects.requireNonNull(key, "key cannot be null");
final Headers headers = value != null ? value.headers() : new
RecordHeaders();
final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
@@ -98,16 +132,259 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
return currentValue;
}
+ /**
+ * Executes a query against this store.
+ *
+ * <p>Note: Query results do NOT include headers, even though headers are
+ * preserved in the underlying store. This behavior provides compatibility
+ * with existing IQv2 APIs that operate on timestamped stores.
+ *
+ * @param query the query to execute
+ * @param positionBound the position bound
+ * @param config the query configuration
+ * @return the query result
+ */
+
+ @SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+
+ final long start = time.nanoseconds();
+ final QueryResult<R> result;
+
+ final StoreQueryUtils.QueryHandler handler =
queryHandlers.get(query.getClass());
+ if (handler == null) {
+ result = wrapped().query(query, positionBound, config);
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ }
+ } else {
+ result = (QueryResult<R>) handler.apply(
+ query,
+ positionBound,
+ config,
+ this
+ );
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo("Handled in " + getClass() + " with
serdes "
+ + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ }
+ }
+ return result;
}
- @Override
- public Position getPosition() {
- throw new UnsupportedOperationException("Position is not supported by
timestamped key-value stores with headers yet.");
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final QueryResult<R> result;
+ final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+ final KeyQuery<Bytes, byte[]> rawKeyQuery =
+ KeyQuery.withKey(keyBytes(typedKeyQuery.getKey(), new
RecordHeaders()));
+ final QueryResult<byte[]> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
+ final V plainValue = valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
+ final QueryResult<V> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
plainValue);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+ final PositionBound
positionBound,
+ final QueryConfig
config) {
+ final QueryResult<R> result;
+ final TimestampedKeyQuery<K, V> typedKeyQuery =
(TimestampedKeyQuery<K, V>) query;
+ final KeyQuery<Bytes, byte[]> rawKeyQuery =
+ KeyQuery.withKey(keyBytes(typedKeyQuery.key(), new
RecordHeaders()));
+ final QueryResult<byte[]> rawResult =
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
+ // Convert ValueTimestampHeaders to ValueAndTimestamp for the
result
+ final ValueAndTimestamp<V> valueAndTimestamp =
valueTimestampHeaders == null
+ ? null
+ : ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
+ final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
valueAndTimestamp);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+
+ final QueryResult<R> result;
+ final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+ RangeQuery<Bytes, byte[]> rawRangeQuery;
+ final ResultOrder order = typedQuery.resultOrder();
+ rawRangeQuery = RangeQuery.withRange(
+ keyBytes(typedQuery.getLowerBound().orElse(null), new
RecordHeaders()),
+ keyBytes(typedQuery.getUpperBound().orElse(null), new
RecordHeaders())
+ );
+ if (order.equals(ResultOrder.DESCENDING)) {
+ rawRangeQuery = rawRangeQuery.withDescendingKeys();
+ }
+ if (order.equals(ResultOrder.ASCENDING)) {
+ rawRangeQuery = rawRangeQuery.withAscendingKeys();
+ }
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ wrapped().query(rawRangeQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
+ final KeyValueIterator<K, V> resultIterator = new
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ iterator,
+ getSensor,
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
+ true
+ );
+ final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+ InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+ rawResult,
+ resultIterator
+ );
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+ final PositionBound
positionBound,
+ final QueryConfig
config) {
+
+ final QueryResult<R> result;
+ final TimestampedRangeQuery<K, V> typedQuery =
(TimestampedRangeQuery<K, V>) query;
+ RangeQuery<Bytes, byte[]> rawRangeQuery;
+ final ResultOrder order = typedQuery.resultOrder();
+ rawRangeQuery = RangeQuery.withRange(
+ keyBytes(typedQuery.lowerBound().orElse(null), new
RecordHeaders()),
+ keyBytes(typedQuery.upperBound().orElse(null), new
RecordHeaders())
+ );
+ if (order.equals(ResultOrder.DESCENDING)) {
+ rawRangeQuery = rawRangeQuery.withDescendingKeys();
+ }
+ if (order.equals(ResultOrder.ASCENDING)) {
+ rawRangeQuery = rawRangeQuery.withAscendingKeys();
+ }
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ wrapped().query(rawRangeQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
+ final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
+ (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ iterator,
+ getSensor,
+ StoreQueryUtils.deserializeValue(serdes,
wrapped()),
+ false
+ );
+ final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>>
typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+ rawResult,
+ resultIterator
+ );
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no
result set.
+ result = (QueryResult<R>) rawResult;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private class MeteredTimestampedKeyValueStoreWithHeadersIterator
implements KeyValueIterator<K, V>, MeteredIterator {
+
+ private final KeyValueIterator<Bytes, byte[]> iter;
+ private final Sensor sensor;
+ private final long startNs;
+ private final long startTimestampMs;
+ private final Function<byte[], ValueTimestampHeaders<V>>
valueTimestampHeadersDeserializer;
+
+ private final boolean returnPlainValue;
+
+ private MeteredTimestampedKeyValueStoreWithHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
+ final
Sensor sensor,
+ final
Function<byte[], ValueTimestampHeaders<V>> valueTimestampHeadersDeserializer,
+ final
boolean returnPlainValue) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.valueTimestampHeadersDeserializer =
valueTimestampHeadersDeserializer;
+ this.startNs = time.nanoseconds();
+ this.startTimestampMs = time.milliseconds();
+ this.returnPlainValue = returnPlainValue;
+ openIterators.add(this);
+ }
+
+ @Override
+ public long startTimestamp() {
+ return startTimestampMs;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, V> next() {
+ final KeyValue<Bytes, byte[]> keyValue = iter.next();
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
valueTimestampHeadersDeserializer.apply(keyValue.value);
+ final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
+ if (returnPlainValue) {
+ final V plainValue = valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
+ return KeyValue.pair(
+ serdes.keyFrom(keyValue.key.get(), headers),
+ plainValue
+ );
+ } else {
+ // Return as ValueAndTimestamp
+ final ValueAndTimestamp<V> valueAndTimestamp =
valueTimestampHeaders == null
+ ? null
+ : ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
+ return KeyValue.pair(
+ serdes.keyFrom(keyValue.key.get(), headers),
+ (V) valueAndTimestamp
+ );
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ final long duration = time.nanoseconds() - startNs;
+ sensor.record(duration);
+ iteratorDurationSensor.record(duration);
+ openIterators.remove(this);
+ }
+ }
+
+ @Override
+ public K peekNextKey() {
+ return serdes.keyFrom(iter.peekNextKey().get(), new
RecordHeaders());
+ }
}
protected Bytes keyBytes(final K key, final Headers headers) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index f451a202168..b776e34591c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -90,6 +90,27 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
}
}
+ @SuppressWarnings("SynchronizeOnNonFinalField")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+
+ synchronized (position) {
+ result = QueryResult.forUnknownQueryType(query, this);
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + this.getClass() + " in " +
(System.nanoTime() - start) + "ns"
+ );
+ }
+ result.setPosition(position.copy());
+ }
+ return result;
+ }
+
private void openInUpgradeMode(final DBOptions dbOptions,
final ColumnFamilyOptions
columnFamilyOptions) {
final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
@@ -152,11 +173,4 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
}
}
- @Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
- }
-
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index be9ecb38bc9..8e7c43a9ee8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -22,10 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.query.PositionBound;
-import org.apache.kafka.streams.query.Query;
-import org.apache.kafka.streams.query.QueryConfig;
-import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -169,14 +165,6 @@ public class TimestampedKeyValueStoreBuilderWithHeaders<K,
V>
return wrapped().approximateNumEntries();
}
- @Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
-
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported by timestamped key-value stores with headers yet.");
- }
-
@Override
public Position getPosition() {
return wrapped().getPosition();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
index cbc3bd1608b..7a5faf68342 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
@@ -24,11 +24,14 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -152,7 +155,45 @@ public class TimestampedToHeadersStoreAdapter implements
KeyValueStore<Bytes, by
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+
+ // Handle KeyQuery: convert byte[] result from timestamped to headers
format
+ if (query instanceof KeyQuery) {
+ final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>)
query;
+ final QueryResult<byte[]> rawResult = store.query(keyQuery,
positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final byte[] convertedValue =
convertToHeaderFormat(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedValue);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else if (query instanceof RangeQuery) {
+ // Handle RangeQuery: wrap iterator to convert values
+ final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes,
byte[]>) query;
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ store.query(rangeQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> convertedIterator =
+ new
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedIterator);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+ // For other query types, delegate to the underlying store
+ result = store.query(query, positionBound, config);
+ }
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (System.nanoTime() -
start) + "ns"
+ );
+ }
+
+ return result;
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index ab863d4e5fc..14c13b25d9a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -22,9 +22,11 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.junit.jupiter.api.Test;
@@ -46,6 +48,7 @@ import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -638,20 +641,6 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
}
}
- @Test
- public void shouldThrowUnsupportedOperationExceptionOnQuery() {
- rocksDBStore.init(context, rocksDBStore);
-
- final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test".getBytes()));
-
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- () -> rocksDBStore.query(query, PositionBound.unbounded(), new
QueryConfig(false))
- );
-
- assertTrue(exception.getMessage().contains("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet."));
- }
-
private byte[] wrapTimestampedValue(final byte[] value) {
// Format: [timestamp(8 bytes)][value]
// Use the numeric value as timestamp
@@ -671,4 +660,71 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
System.arraycopy(value, 0, result, 8, value.length);
return result;
}
+
+ @Test
+ public void shouldReturnUnknownQueryTypeForQuery() {
+ // Initialize the store
+ rocksDBStore.init(context, rocksDBStore);
+
+ // Create a query
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false);
+
+ // Execute query
+ final QueryResult<byte[]> result = rocksDBStore.query(query,
positionBound, config);
+
+ // Verify result indicates unknown query type
+ assertFalse(result.isSuccess(), "Expected query to fail with unknown
query type");
+ assertEquals(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ result.getFailureReason(),
+ "Expected UNKNOWN_QUERY_TYPE failure reason"
+ );
+
+ // Verify position is set
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoWhenRequested() {
+ // Initialize the store
+ rocksDBStore.init(context, rocksDBStore);
+
+ // Create a query with execution info collection enabled
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable execution
info
+
+ // Execute query
+ final QueryResult<byte[]> result = rocksDBStore.query(query,
positionBound, config);
+
+ // Verify execution info was collected
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ assertTrue(
+ result.getExecutionInfo().get(0).contains("Handled in"),
+ "Expected execution info to contain handling information"
+ );
+ assertTrue(
+
result.getExecutionInfo().get(0).contains(RocksDBTimestampedStoreWithHeaders.class.getName()),
+ "Expected execution info to mention the class name"
+ );
+ }
+
+ @Test
+ public void shouldNotCollectExecutionInfoWhenNotRequested() {
+ // Initialize the store
+ rocksDBStore.init(context, rocksDBStore);
+
+ // Create a query with execution info collection disabled
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false); // Disable
execution info
+
+ // Execute query
+ final QueryResult<byte[]> result = rocksDBStore.query(query,
positionBound, config);
+
+ // Verify no execution info was collected
+ assertTrue(result.getExecutionInfo().isEmpty(), "Expected no execution
info to be collected");
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
index a49e85bbb1a..8330c746e31 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -17,15 +17,30 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -34,10 +49,14 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.io.File;
import java.util.Collections;
+import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@@ -192,9 +211,30 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
}
@Test
- public void shouldThrowUsingIQv2ForInMemoryStores() {
+ public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
when(supplier.name()).thenReturn("test-store");
when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new RocksDBStore("test-store",
"metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () ->
builder.withLoggingDisabled().withCachingDisabled().build()
+ );
+
+ assertTrue(exception.getMessage().contains("Provided store must be a
timestamped store"));
+ }
+
+ @Test
+ public void shouldHandleKeyQueryOnInMemoryStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("in-memory");
when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
@@ -209,25 +249,101 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
.withCachingDisabled()
.build();
- final KeyQuery<String, ValueTimestampHeaders<String>> query =
- KeyQuery.withKey("test-key");
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ // Put data into the store
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("test-value",
12345L, headers));
+
+ // Verify wrapper type for InMemoryKeyValueStore
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(HeadersBytesStore.class, wrapped,
+ "Expected wrapper to implement HeadersBytesStore for
InMemoryKeyValueStore");
+
+ // Query at typed level - KeyQuery should return just the value
+ final KeyQuery<String, String> query =
KeyQuery.withKey("test-key");
+ final QueryResult<String> result = store.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ // Verify IQv2 query result
+ assertTrue(result.isSuccess(), "Expected query to succeed on
InMemoryKeyValueStore");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ assertInstanceOf(String.class, result.getResult());
+ assertEquals("test-value", result.getResult(), "KeyQuery should
return just the value");
+ } finally {
+ store.close();
+ }
+ }
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ @Test
+ public void shouldHandleTimestampedKeyQueryOnInMemoryStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("in-memory");
+ when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- () -> wrapped.query(query, PositionBound.unbounded(), new
QueryConfig(false))
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
);
- assertTrue(exception.getMessage().contains(
- "Queries (IQv2) are not supported by timestamped key-value stores
with headers yet."));
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ // Put data into the store
+ final Headers headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("test-value",
12345L, headers));
+
+ // Verify wrapper type for InMemoryKeyValueStore
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(HeadersBytesStore.class, wrapped,
+ "Expected wrapper to implement HeadersBytesStore for
InMemoryKeyValueStore");
+
+ // Query at typed level - TimestampedKeyQuery should return value
+ timestamp
+ final TimestampedKeyQuery<String, String> query =
TimestampedKeyQuery.withKey("test-key");
+ final QueryResult<ValueAndTimestamp<String>> result =
store.query(query, PositionBound.unbounded(), new QueryConfig(false));
+
+ // Verify IQv2 query result
+ assertTrue(result.isSuccess(), "Expected query to succeed on
InMemoryKeyValueStore");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ assertNotNull(result.getResult(), "Expected non-null result");
+ assertInstanceOf(ValueAndTimestamp.class, result.getResult());
+ assertEquals("test-value", result.getResult().value(),
"TimestampedKeyQuery should return the value");
+ assertEquals(12345L, result.getResult().timestamp(),
"TimestampedKeyQuery should return the timestamp");
+ } finally {
+ store.close();
+ }
}
@Test
- public void shouldThrowWhenUsingIQv2InHeadersStore() {
+ public void shouldReturnPositionFromHeadersStore() {
when(supplier.name()).thenReturn("test-store");
when(supplier.metricsScope()).thenReturn("metricScope");
- when(supplier.get()).thenReturn(new
RocksDBTimestampedStore("test-store", "metrics-scope"));
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
supplier,
@@ -241,25 +357,76 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
.withCachingDisabled()
.build();
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped);
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final Position position = wrapped.getPosition();
+
+ // Verify: Position is returned (should be non-null)
+ assertNotNull(position, "Expected non-null position");
+ assertTrue(position.isEmpty(), "Expected position to be empty
initially");
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void shouldReturnPositionFromAdaptedTimestampedStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
- final KeyQuery<String, ValueTimestampHeaders<String>> query =
- KeyQuery.withKey("test-key");
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- () -> wrapped.query(query, PositionBound.unbounded(), new
QueryConfig(false))
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
);
+ store.init(context, store);
- assertTrue(exception.getMessage().contains("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet."));
+ try {
+ // Verify adapter is used
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped);
+
+ // Get position from adapter (should delegate to underlying store)
+ final Position position = wrapped.getPosition();
+
+ assertNotNull(position, "Expected non-null position from adapter");
+ assertTrue(position.isEmpty(), "Expected position to be empty
initially");
+ } finally {
+ store.close();
+ }
}
@Test
- public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
+ public void shouldReturnPositionFromInMemoryStore() {
when(supplier.name()).thenReturn("test-store");
- when(supplier.metricsScope()).thenReturn("metricScope");
- when(supplier.get()).thenReturn(new RocksDBStore("test-store",
"metrics-scope"));
+ when(supplier.metricsScope()).thenReturn("in-memory");
+ when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
supplier,
@@ -268,16 +435,38 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
new MockTime()
);
- final IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () ->
builder.withLoggingDisabled().withCachingDisabled().build()
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
);
+ store.init(context, store);
- assertTrue(exception.getMessage().contains("Provided store must be a
timestamped store"));
+ try {
+ // Verify marker wrapper is used
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(HeadersBytesStore.class, wrapped);
+
+ // Get position from marker (should delegate to
InMemoryKeyValueStore)
+ final Position position = wrapped.getPosition();
+
+ assertNotNull(position, "Expected non-null position from in-memory
store");
+ assertTrue(position.isEmpty(), "Expected position to be empty
initially");
+ } finally {
+ store.close();
+ }
}
@Test
- public void shouldThrowUsingIQv2ForNativeHeadersStore() {
+ public void shouldMaintainPositionAcrossOperationsOnHeadersStore() {
when(supplier.name()).thenReturn("test-store");
when(supplier.metricsScope()).thenReturn("metricScope");
when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
@@ -294,25 +483,92 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
.withCachingDisabled()
.build();
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(RocksDBTimestampedStoreWithHeaders.class, wrapped);
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+
+ // Get initial position
+ final Position initialPosition = wrapped.getPosition();
+ assertNotNull(initialPosition, "Expected non-null initial
position");
+
+ // Put some data
+ store.put("key1", ValueTimestampHeaders.make("value1", 100L, new
RecordHeaders()));
+ store.put("key2", ValueTimestampHeaders.make("value2", 200L, new
RecordHeaders()));
+
+ // Get position after puts
+ final Position afterPutPosition = wrapped.getPosition();
+ assertNotNull(afterPutPosition, "Expected non-null position after
puts");
+
+ // Position object should be the same instance (stores maintain a
single position)
+ // The position content might be updated internally by the context
+ } finally {
+ store.close();
+ }
+ }
- final KeyQuery<String, ValueTimestampHeaders<String>> query =
- KeyQuery.withKey("test-key");
+ @Test
+ public void shouldReturnUnknownQueryTypeForKeyQueryOnHeadersStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- () -> wrapped.query(query, PositionBound.unbounded(), new
QueryConfig(false))
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
);
- assertTrue(exception.getMessage().contains("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet."));
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false);
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final QueryResult<byte[]> result = wrapped.query(query,
positionBound, config);
+
+ // Verify: Headers store currently returns UNKNOWN_QUERY_TYPE
+ assertFalse(result.isSuccess(), "Expected query to fail with
unknown query type");
+ assertEquals(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ result.getFailureReason(),
+ "Expected UNKNOWN_QUERY_TYPE failure reason"
+ );
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ } finally {
+ store.close();
+ }
}
@Test
- public void shouldThrowOnGetPositionForInMemoryStores() {
+ public void shouldReturnUnknownQueryTypeForRangeQueryOnHeadersStore() {
when(supplier.name()).thenReturn("test-store");
when(supplier.metricsScope()).thenReturn("metricScope");
- when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
supplier,
@@ -326,16 +582,93 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
.withCachingDisabled()
.build();
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- store::getPosition
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(
+ new Bytes("a".getBytes()),
+ new Bytes("z".getBytes())
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false);
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> result =
wrapped.query(query, positionBound, config);
+
+ // Verify: Headers store currently returns UNKNOWN_QUERY_TYPE
+ assertFalse(result.isSuccess(), "Expected query to fail with
unknown query type");
+ assertEquals(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ result.getFailureReason(),
+ "Expected UNKNOWN_QUERY_TYPE failure reason"
+ );
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForQueryOnHeadersStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
);
- assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable
execution info
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final QueryResult<byte[]> result = wrapped.query(query,
positionBound, config);
+
+ // Verify: Execution info was collected
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected
execution info to be collected");
+ assertTrue(
+ result.getExecutionInfo().get(0).contains("Handled in"),
+ "Expected execution info to contain handling information"
+ );
+ assertTrue(
+
result.getExecutionInfo().get(0).contains(RocksDBTimestampedStoreWithHeaders.class.getName()),
+ "Expected execution info to mention the class name"
+ );
+ } finally {
+ store.close();
+ }
}
@Test
- public void shouldThrowOnGetPositionForHeadersStoreAdapter() {
+ public void shouldHandleKeyQueryOnAdaptedTimestampedStore() {
when(supplier.name()).thenReturn("test-store");
when(supplier.metricsScope()).thenReturn("metricScope");
when(supplier.get()).thenReturn(new
RocksDBTimestampedStore("test-store", "metrics-scope"));
@@ -352,19 +685,48 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
.withCachingDisabled()
.build();
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- store::getPosition
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
);
-
- assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ store.init(context, store);
+
+ try {
+ // Put data into the store (headers will be discarded when adapted
to timestamped store)
+ final Headers headers = new RecordHeaders();
+ headers.add("adapter", "test".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("adapter-value",
55555L, headers));
+
+ // Verify adapter is used for legacy timestamped store
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped,
+ "Expected TimestampedToHeadersStoreAdapter for legacy
timestamped store");
+
+ // Query at typed level - KeyQuery should return just the value
+ final KeyQuery<String, String> query =
KeyQuery.withKey("test-key");
+ final QueryResult<String> result = store.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ // Verify IQv2 query result
+ // Adapter delegates to RocksDBTimestampedStore which supports
IQv2 through RocksDBStore
+ assertTrue(result.isSuccess(),
+ "Expected query to succeed since RocksDBTimestampedStore
supports IQv2");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ assertInstanceOf(String.class, result.getResult());
+ assertEquals("adapter-value", result.getResult(), "KeyQuery should
return just the value");
+ } finally {
+ store.close();
+ }
}
@Test
- public void shouldThrowOnGetPositionForNativeHeadersStore() {
+ public void shouldHandleTimestampedKeyQueryOnAdaptedTimestampedStore() {
when(supplier.name()).thenReturn("test-store");
when(supplier.metricsScope()).thenReturn("metricScope");
- when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStore("test-store", "metrics-scope"));
builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
supplier,
@@ -378,12 +740,96 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
.withCachingDisabled()
.build();
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- store::getPosition
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
);
-
- assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ store.init(context, store);
+
+ try {
+ // Put data into the store (headers will be discarded when adapted
to timestamped store)
+ final Headers headers = new RecordHeaders();
+ headers.add("adapter", "test".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("adapter-value",
55555L, headers));
+
+ // Verify adapter is used for legacy timestamped store
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped,
+ "Expected TimestampedToHeadersStoreAdapter for legacy
timestamped store");
+
+ // Query at typed level - TimestampedKeyQuery should return value
+ timestamp
+ final TimestampedKeyQuery<String, String> query =
TimestampedKeyQuery.withKey("test-key");
+ final QueryResult<ValueAndTimestamp<String>> result =
store.query(query, PositionBound.unbounded(), new QueryConfig(false));
+
+ // Verify IQv2 query result
+ // Adapter delegates to RocksDBTimestampedStore which supports
IQv2 through RocksDBStore
+ assertTrue(result.isSuccess(),
+ "Expected query to succeed since RocksDBTimestampedStore
supports IQv2");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ assertNotNull(result.getResult(), "Expected non-null result");
+ assertInstanceOf(ValueAndTimestamp.class, result.getResult());
+ assertEquals("adapter-value", result.getResult().value(),
"TimestampedKeyQuery should return the value");
+ assertEquals(55555L, result.getResult().timestamp(),
"TimestampedKeyQuery should return the timestamp");
+ } finally {
+ store.close();
+ }
}
-}
\ No newline at end of file
+ @Test
+ public void shouldCollectExecutionInfoForQueryOnAdaptedTimestampedStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final File dir = TestUtils.tempDirectory();
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ final InternalMockProcessorContext<String, String> context = new
InternalMockProcessorContext<>(
+ dir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+ store.init(context, store);
+
+ try {
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test-key".getBytes()));
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable
execution info
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final QueryResult<byte[]> result = wrapped.query(query,
positionBound, config);
+
+ // Verify: Execution info includes both adapter and underlying
store
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected
execution info to be collected");
+
+ final String executionInfo = String.join("\n",
result.getExecutionInfo());
+ assertTrue(
+ executionInfo.contains("Handled in"),
+ "Expected execution info to contain handling information"
+ );
+ // Should mention the adapter class
+ assertTrue(
+
executionInfo.contains(TimestampedToHeadersStoreAdapter.class.getName()),
+ "Expected execution info to mention the adapter class"
+ );
+ } finally {
+ store.close();
+ }
+ }
+}