This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4752b4b83a KAFKA-20185: Verify existing IQv2 methods for 
MeteredTimestampedKeyValueStoreWithHeaders (#21643)
a4752b4b83a is described below

commit a4752b4b83a098c4a3bc19e681964c94e68c3bbf
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Mar 9 18:08:45 2026 +0100

    KAFKA-20185: Verify existing IQv2 methods for 
MeteredTimestampedKeyValueStoreWithHeaders (#21643)
    
    This PR is NOT about implementing IQv2s for header-store, but provides
    IQv2s for headers store through the timestamped state stores. So the
    results do not contain the headers even though the headers are preserved
    in the headers state store.
    
    Part of KIP-1271.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../state/internals/CachingKeyValueStore.java      |   8 -
 ...MeteredTimestampedKeyValueStoreWithHeaders.java | 289 ++++++++++-
 .../RocksDBTimestampedStoreWithHeaders.java        |  28 +-
 ...TimestampedKeyValueStoreBuilderWithHeaders.java |  12 -
 .../TimestampedToHeadersStoreAdapter.java          |  43 +-
 .../RocksDBTimestampedStoreWithHeadersTest.java    |  84 +++-
 ...stampedKeyValueStoreBuilderWithHeadersTest.java | 556 +++++++++++++++++++--
 7 files changed, 917 insertions(+), 103 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 0e31ef87fa7..4f78d49d060 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -129,11 +129,6 @@ public class CachingKeyValueStore
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-
-        if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
-            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
-        }
-
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
         final QueryResult<R> result;
 
@@ -177,9 +172,6 @@ public class CachingKeyValueStore
                                            final Position mergedPosition,
                                            final PositionBound positionBound,
                                            final QueryConfig config) {
-        if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
-            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
-        }
         QueryResult<R> result = null;
         final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>) 
query;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index df36c04b2d5..12bb9501700 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -18,22 +18,35 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
-import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.query.TimestampedRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 
+import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 
 
@@ -59,6 +72,27 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         super(inner, metricScope, time, keySerde, valueSerde);
     }
 
+    @SuppressWarnings("rawtypes")
+    private final Map<Class, StoreQueryUtils.QueryHandler> queryHandlers =
+        mkMap(
+            mkEntry(
+                KeyQuery.class,
+                (query, positionBound, config, store) -> runKeyQuery(query, 
positionBound, config)
+            ),
+            mkEntry(
+                TimestampedKeyQuery.class,
+                (query, positionBound, config, store) -> 
runTimestampedKeyQuery(query, positionBound, config)
+            ),
+            mkEntry(
+                RangeQuery.class,
+                (query, positionBound, config, store) -> runRangeQuery(query, 
positionBound, config)
+            ),
+            mkEntry(
+                TimestampedRangeQuery.class,
+                (query, positionBound, config, store) -> 
runTimestampedRangeQuery(query, positionBound, config)
+            )
+        );
+
     @SuppressWarnings("unchecked")
     @Override
     protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final 
Serde<ValueTimestampHeaders<V>> valueSerde,
@@ -86,7 +120,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
 
     @Override
     public ValueTimestampHeaders<V> putIfAbsent(final K key,
-                         final ValueTimestampHeaders<V> value) {
+                                                final ValueTimestampHeaders<V> 
value) {
         Objects.requireNonNull(key, "key cannot be null");
         final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
         final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
@@ -98,16 +132,259 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
         return currentValue;
     }
 
+    /**
+     * Executes a query against this store.
+     *
+     * <p>Note: Query results do NOT include headers, even though headers are
+     * preserved in the underlying store. This behavior provides compatibility
+     * with existing IQv2 APIs that operate on timestamped stores.
+     *
+     * @param query the query to execute
+     * @param positionBound the position bound
+     * @param config the query configuration
+     * @return the query result
+     */
+
+    @SuppressWarnings("unchecked")
     @Override
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                config,
+                this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes "
+                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+            }
+        }
+        return result;
     }
 
-    @Override
-    public Position getPosition() {
-        throw new UnsupportedOperationException("Position is not supported by 
timestamped key-value stores with headers yet.");
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                           final PositionBound positionBound,
+                                           final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+            KeyQuery.withKey(keyBytes(typedKeyQuery.getKey(), new 
RecordHeaders()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
+            final V plainValue = valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value();
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key(), new 
RecordHeaders()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
+            // Convert ValueTimestampHeaders to ValueAndTimestamp for the 
result
+            final ValueAndTimestamp<V> valueAndTimestamp = 
valueTimestampHeaders == null
+                    ? null
+                    : ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final ResultOrder order = typedQuery.resultOrder();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.getLowerBound().orElse(null), new 
RecordHeaders()),
+                keyBytes(typedQuery.getUpperBound().orElse(null), new 
RecordHeaders())
+        );
+        if (order.equals(ResultOrder.DESCENDING)) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            rawRangeQuery = rawRangeQuery.withAscendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+            wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+                    iterator,
+                    getSensor,
+                    StoreQueryUtils.deserializeValue(serdes, wrapped()),
+                    true
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                        rawResult,
+                        resultIterator
+                );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final ResultOrder order = typedQuery.resultOrder();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.lowerBound().orElse(null), new 
RecordHeaders()),
+                keyBytes(typedQuery.upperBound().orElse(null), new 
RecordHeaders())
+        );
+        if (order.equals(ResultOrder.DESCENDING)) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            rawRangeQuery = rawRangeQuery.withAscendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
+                    (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+                            iterator,
+                            getSensor,
+                            StoreQueryUtils.deserializeValue(serdes, 
wrapped()),
+                            false
+            );
+            final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private class MeteredTimestampedKeyValueStoreWithHeadersIterator 
implements KeyValueIterator<K, V>, MeteredIterator {
+
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final long startTimestampMs;
+        private final Function<byte[], ValueTimestampHeaders<V>> 
valueTimestampHeadersDeserializer;
+
+        private final boolean returnPlainValue;
+
+        private MeteredTimestampedKeyValueStoreWithHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                                   final 
Sensor sensor,
+                                                                   final 
Function<byte[], ValueTimestampHeaders<V>> valueTimestampHeadersDeserializer,
+                                                                   final 
boolean returnPlainValue) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.valueTimestampHeadersDeserializer = 
valueTimestampHeadersDeserializer;
+            this.startNs = time.nanoseconds();
+            this.startTimestampMs = time.milliseconds();
+            this.returnPlainValue = returnPlainValue;
+            openIterators.add(this);
+        }
+
+        @Override
+        public long startTimestamp() {
+            return startTimestampMs;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            final KeyValue<Bytes, byte[]> keyValue = iter.next();
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
valueTimestampHeadersDeserializer.apply(keyValue.value);
+            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
+            if (returnPlainValue) {
+                final V plainValue = valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value();
+                return KeyValue.pair(
+                    serdes.keyFrom(keyValue.key.get(), headers),
+                    plainValue
+                );
+            } else {
+                // Return as ValueAndTimestamp
+                final ValueAndTimestamp<V> valueAndTimestamp = 
valueTimestampHeaders == null
+                    ? null
+                    : ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp());
+                return KeyValue.pair(
+                    serdes.keyFrom(keyValue.key.get(), headers),
+                    (V) valueAndTimestamp
+                );
+            }
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                final long duration = time.nanoseconds() - startNs;
+                sensor.record(duration);
+                iteratorDurationSensor.record(duration);
+                openIterators.remove(this);
+            }
+        }
+
+        @Override
+        public K peekNextKey() {
+            return serdes.keyFrom(iter.peekNextKey().get(), new 
RecordHeaders());
+        }
     }
 
     protected Bytes keyBytes(final K key, final Headers headers) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index f451a202168..b776e34591c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -90,6 +90,27 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
         }
     }
 
+    @SuppressWarnings("SynchronizeOnNonFinalField")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+
+        synchronized (position) {
+            result = QueryResult.forUnknownQueryType(query, this);
+
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + this.getClass() + " in " + 
(System.nanoTime() - start) + "ns"
+                );
+            }
+            result.setPosition(position.copy());
+        }
+        return result;
+    }
+
     private void openInUpgradeMode(final DBOptions dbOptions,
                                    final ColumnFamilyOptions 
columnFamilyOptions) {
         final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
@@ -152,11 +173,4 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
         }
     }
 
-    @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
-    }
-
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index be9ecb38bc9..8e7c43a9ee8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -22,10 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.query.PositionBound;
-import org.apache.kafka.streams.query.Query;
-import org.apache.kafka.streams.query.QueryConfig;
-import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -169,14 +165,6 @@ public class TimestampedKeyValueStoreBuilderWithHeaders<K, 
V>
             return wrapped().approximateNumEntries();
         }
 
-        @Override
-        public <R> QueryResult<R> query(final Query<R> query,
-                                        final PositionBound positionBound,
-                                        final QueryConfig config) {
-
-            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported by timestamped key-value stores with headers yet.");
-        }
-
         @Override
         public Position getPosition() {
             return wrapped().getPosition();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
index cbc3bd1608b..7a5faf68342 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
@@ -24,11 +24,14 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -152,7 +155,45 @@ public class TimestampedToHeadersStoreAdapter implements 
KeyValueStore<Bytes, by
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+
+        // Handle KeyQuery: convert byte[] result from timestamped to headers 
format
+        if (query instanceof KeyQuery) {
+            final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>) 
query;
+            final QueryResult<byte[]> rawResult = store.query(keyQuery, 
positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final byte[] convertedValue = 
convertToHeaderFormat(rawResult.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
convertedValue);
+            } else {
+                result = (QueryResult<R>) rawResult;
+            }
+        } else if (query instanceof RangeQuery) {
+            // Handle RangeQuery: wrap iterator to convert values
+            final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes, 
byte[]>) query;
+            final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                    store.query(rangeQuery, positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final KeyValueIterator<Bytes, byte[]> convertedIterator =
+                        new 
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
convertedIterator);
+            } else {
+                result = (QueryResult<R>) rawResult;
+            }
+        } else {
+            // For other query types, delegate to the underlying store
+            result = store.query(query, positionBound, config);
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns"
+            );
+        }
+
+        return result;
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index ab863d4e5fc..14c13b25d9a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -22,9 +22,11 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import org.junit.jupiter.api.Test;
@@ -46,6 +48,7 @@ import static java.util.Arrays.asList;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -638,20 +641,6 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         }
     }
 
-    @Test
-    public void shouldThrowUnsupportedOperationExceptionOnQuery() {
-        rocksDBStore.init(context, rocksDBStore);
-
-        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test".getBytes()));
-
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-                () -> rocksDBStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
-        );
-
-        assertTrue(exception.getMessage().contains("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet."));
-    }
-
     private byte[] wrapTimestampedValue(final byte[] value) {
         // Format: [timestamp(8 bytes)][value]
         // Use the numeric value as timestamp
@@ -671,4 +660,71 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         System.arraycopy(value, 0, result, 8, value.length);
         return result;
     }
+
+    @Test
+    public void shouldReturnUnknownQueryTypeForQuery() {
+        // Initialize the store
+        rocksDBStore.init(context, rocksDBStore);
+
+        // Create a query
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test-key".getBytes()));
+        final PositionBound positionBound = PositionBound.unbounded();
+        final QueryConfig config = new QueryConfig(false);
+
+        // Execute query
+        final QueryResult<byte[]> result = rocksDBStore.query(query, 
positionBound, config);
+
+        // Verify result indicates unknown query type
+        assertFalse(result.isSuccess(), "Expected query to fail with unknown 
query type");
+        assertEquals(
+            FailureReason.UNKNOWN_QUERY_TYPE,
+            result.getFailureReason(),
+            "Expected UNKNOWN_QUERY_TYPE failure reason"
+        );
+
+        // Verify position is set
+        assertNotNull(result.getPosition(), "Expected position to be set");
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoWhenRequested() {
+        // Initialize the store
+        rocksDBStore.init(context, rocksDBStore);
+
+        // Create a query with execution info collection enabled
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test-key".getBytes()));
+        final PositionBound positionBound = PositionBound.unbounded();
+        final QueryConfig config = new QueryConfig(true); // Enable execution 
info
+
+        // Execute query
+        final QueryResult<byte[]> result = rocksDBStore.query(query, 
positionBound, config);
+
+        // Verify execution info was collected
+        assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution 
info to be collected");
+        assertTrue(
+            result.getExecutionInfo().get(0).contains("Handled in"),
+            "Expected execution info to contain handling information"
+        );
+        assertTrue(
+            
result.getExecutionInfo().get(0).contains(RocksDBTimestampedStoreWithHeaders.class.getName()),
+            "Expected execution info to mention the class name"
+        );
+    }
+
+    @Test
+    public void shouldNotCollectExecutionInfoWhenNotRequested() {
+        // Initialize the store
+        rocksDBStore.init(context, rocksDBStore);
+
+        // Create a query with execution info collection disabled
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test-key".getBytes()));
+        final PositionBound positionBound = PositionBound.unbounded();
+        final QueryConfig config = new QueryConfig(false); // Disable 
execution info
+
+        // Execute query
+        final QueryResult<byte[]> result = rocksDBStore.query(query, 
positionBound, config);
+
+        // Verify no execution info was collected
+        assertTrue(result.getExecutionInfo().isEmpty(), "Expected no execution 
info to be collected");
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
index a49e85bbb1a..8330c746e31 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -17,15 +17,30 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -34,10 +49,14 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.io.File;
 import java.util.Collections;
+import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.when;
@@ -192,9 +211,30 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
     }
 
     @Test
-    public void shouldThrowUsingIQv2ForInMemoryStores() {
+    public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
         when(supplier.name()).thenReturn("test-store");
         when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new RocksDBStore("test-store", 
"metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final IllegalArgumentException exception = assertThrows(
+                IllegalArgumentException.class,
+                () -> 
builder.withLoggingDisabled().withCachingDisabled().build()
+        );
+
+        assertTrue(exception.getMessage().contains("Provided store must be a 
timestamped store"));
+    }
+
+    @Test
+    public void shouldHandleKeyQueryOnInMemoryStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("in-memory");
         when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
 
         builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
@@ -209,25 +249,101 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
                 .withCachingDisabled()
                 .build();
 
-        final KeyQuery<String, ValueTimestampHeaders<String>> query =
-                KeyQuery.withKey("test-key");
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            // Put data into the store
+            final Headers headers = new RecordHeaders();
+            headers.add("key1", "value1".getBytes());
+            store.put("test-key", ValueTimestampHeaders.make("test-value", 
12345L, headers));
+
+            // Verify wrapper type for InMemoryKeyValueStore
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(HeadersBytesStore.class, wrapped,
+                    "Expected wrapper to implement HeadersBytesStore for 
InMemoryKeyValueStore");
+
+            // Query at typed level - KeyQuery should return just the value
+            final KeyQuery<String, String> query = 
KeyQuery.withKey("test-key");
+            final QueryResult<String> result = store.query(query, 
PositionBound.unbounded(), new QueryConfig(false));
+
+            // Verify IQv2 query result
+            assertTrue(result.isSuccess(), "Expected query to succeed on 
InMemoryKeyValueStore");
+            assertNotNull(result.getPosition(), "Expected position to be set");
+            assertInstanceOf(String.class, result.getResult());
+            assertEquals("test-value", result.getResult(), "KeyQuery should 
return just the value");
+        } finally {
+            store.close();
+        }
+    }
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+    @Test
+    public void shouldHandleTimestampedKeyQueryOnInMemoryStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("in-memory");
+        when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
 
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-                () -> wrapped.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
         );
 
-        assertTrue(exception.getMessage().contains(
-            "Queries (IQv2) are not supported by timestamped key-value stores 
with headers yet."));
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            // Put data into the store
+            final Headers headers = new RecordHeaders();
+            headers.add("key1", "value1".getBytes());
+            store.put("test-key", ValueTimestampHeaders.make("test-value", 
12345L, headers));
+
+            // Verify wrapper type for InMemoryKeyValueStore
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(HeadersBytesStore.class, wrapped,
+                    "Expected wrapper to implement HeadersBytesStore for 
InMemoryKeyValueStore");
+
+            // Query at typed level - TimestampedKeyQuery should return value 
+ timestamp
+            final TimestampedKeyQuery<String, String> query = 
TimestampedKeyQuery.withKey("test-key");
+            final QueryResult<ValueAndTimestamp<String>> result = 
store.query(query, PositionBound.unbounded(), new QueryConfig(false));
+
+            // Verify IQv2 query result
+            assertTrue(result.isSuccess(), "Expected query to succeed on 
InMemoryKeyValueStore");
+            assertNotNull(result.getPosition(), "Expected position to be set");
+            assertNotNull(result.getResult(), "Expected non-null result");
+            assertInstanceOf(ValueAndTimestamp.class, result.getResult());
+            assertEquals("test-value", result.getResult().value(), 
"TimestampedKeyQuery should return the value");
+            assertEquals(12345L, result.getResult().timestamp(), 
"TimestampedKeyQuery should return the timestamp");
+        } finally {
+            store.close();
+        }
     }
 
     @Test
-    public void shouldThrowWhenUsingIQv2InHeadersStore() {
+    public void shouldReturnPositionFromHeadersStore() {
         when(supplier.name()).thenReturn("test-store");
         when(supplier.metricsScope()).thenReturn("metricScope");
-        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStore("test-store", "metrics-scope"));
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
 
         builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
                 supplier,
@@ -241,25 +357,76 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
                 .withCachingDisabled()
                 .build();
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped);
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            final Position position = wrapped.getPosition();
+
+            // Verify: Position is returned (should be non-null)
+            assertNotNull(position, "Expected non-null position");
+            assertTrue(position.isEmpty(), "Expected position to be empty 
initially");
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void shouldReturnPositionFromAdaptedTimestampedStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
 
-        final KeyQuery<String, ValueTimestampHeaders<String>> query =
-                KeyQuery.withKey("test-key");
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
 
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-                () -> wrapped.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
         );
+        store.init(context, store);
 
-        assertTrue(exception.getMessage().contains("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet."));
+        try {
+            // Verify adapter is used
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped);
+
+            // Get position from adapter (should delegate to underlying store)
+            final Position position = wrapped.getPosition();
+
+            assertNotNull(position, "Expected non-null position from adapter");
+            assertTrue(position.isEmpty(), "Expected position to be empty 
initially");
+        } finally {
+            store.close();
+        }
     }
 
     @Test
-    public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
+    public void shouldReturnPositionFromInMemoryStore() {
         when(supplier.name()).thenReturn("test-store");
-        when(supplier.metricsScope()).thenReturn("metricScope");
-        when(supplier.get()).thenReturn(new RocksDBStore("test-store", 
"metrics-scope"));
+        when(supplier.metricsScope()).thenReturn("in-memory");
+        when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
 
         builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
                 supplier,
@@ -268,16 +435,38 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
                 new MockTime()
         );
 
-        final IllegalArgumentException exception = assertThrows(
-                IllegalArgumentException.class,
-                () -> 
builder.withLoggingDisabled().withCachingDisabled().build()
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
         );
+        store.init(context, store);
 
-        assertTrue(exception.getMessage().contains("Provided store must be a 
timestamped store"));
+        try {
+            // Verify marker wrapper is used
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(HeadersBytesStore.class, wrapped);
+
+            // Get position from marker (should delegate to 
InMemoryKeyValueStore)
+            final Position position = wrapped.getPosition();
+
+            assertNotNull(position, "Expected non-null position from in-memory 
store");
+            assertTrue(position.isEmpty(), "Expected position to be empty 
initially");
+        } finally {
+            store.close();
+        }
     }
 
     @Test
-    public void shouldThrowUsingIQv2ForNativeHeadersStore() {
+    public void shouldMaintainPositionAcrossOperationsOnHeadersStore() {
         when(supplier.name()).thenReturn("test-store");
         when(supplier.metricsScope()).thenReturn("metricScope");
         when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
@@ -294,25 +483,92 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
                 .withCachingDisabled()
                 .build();
 
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(RocksDBTimestampedStoreWithHeaders.class, wrapped);
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            
+            // Get initial position
+            final Position initialPosition = wrapped.getPosition();
+            assertNotNull(initialPosition, "Expected non-null initial 
position");
+
+            // Put some data
+            store.put("key1", ValueTimestampHeaders.make("value1", 100L, new 
RecordHeaders()));
+            store.put("key2", ValueTimestampHeaders.make("value2", 200L, new 
RecordHeaders()));
+
+            // Get position after puts
+            final Position afterPutPosition = wrapped.getPosition();
+            assertNotNull(afterPutPosition, "Expected non-null position after 
puts");
+            
+            // Position object should be the same instance (stores maintain a 
single position)
+            // The position content might be updated internally by the context
+        } finally {
+            store.close();
+        }
+    }
 
-        final KeyQuery<String, ValueTimestampHeaders<String>> query =
-                KeyQuery.withKey("test-key");
+    @Test
+    public void shouldReturnUnknownQueryTypeForKeyQueryOnHeadersStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
 
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-                () -> wrapped.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
         );
 
-        assertTrue(exception.getMessage().contains("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet."));
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test-key".getBytes()));
+            final PositionBound positionBound = PositionBound.unbounded();
+            final QueryConfig config = new QueryConfig(false);
+
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            final QueryResult<byte[]> result = wrapped.query(query, 
positionBound, config);
+
+            // Verify: Headers store currently returns UNKNOWN_QUERY_TYPE
+            assertFalse(result.isSuccess(), "Expected query to fail with 
unknown query type");
+            assertEquals(
+                    FailureReason.UNKNOWN_QUERY_TYPE,
+                    result.getFailureReason(),
+                    "Expected UNKNOWN_QUERY_TYPE failure reason"
+            );
+            assertNotNull(result.getPosition(), "Expected position to be set");
+        } finally {
+            store.close();
+        }
     }
 
     @Test
-    public void shouldThrowOnGetPositionForInMemoryStores() {
+    public void shouldReturnUnknownQueryTypeForRangeQueryOnHeadersStore() {
         when(supplier.name()).thenReturn("test-store");
         when(supplier.metricsScope()).thenReturn("metricScope");
-        when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
 
         builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
                 supplier,
@@ -326,16 +582,93 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
                 .withCachingDisabled()
                 .build();
 
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-                store::getPosition
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(
+                    new Bytes("a".getBytes()),
+                    new Bytes("z".getBytes())
+            );
+            final PositionBound positionBound = PositionBound.unbounded();
+            final QueryConfig config = new QueryConfig(false);
+
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            final QueryResult<KeyValueIterator<Bytes, byte[]>> result = 
wrapped.query(query, positionBound, config);
+
+            // Verify: Headers store currently returns UNKNOWN_QUERY_TYPE
+            assertFalse(result.isSuccess(), "Expected query to fail with 
unknown query type");
+            assertEquals(
+                    FailureReason.UNKNOWN_QUERY_TYPE,
+                    result.getFailureReason(),
+                    "Expected UNKNOWN_QUERY_TYPE failure reason"
+            );
+            assertNotNull(result.getPosition(), "Expected position to be set");
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoForQueryOnHeadersStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
         );
 
-        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test-key".getBytes()));
+            final PositionBound positionBound = PositionBound.unbounded();
+            final QueryConfig config = new QueryConfig(true); // Enable 
execution info
+
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            final QueryResult<byte[]> result = wrapped.query(query, 
positionBound, config);
+
+            // Verify: Execution info was collected
+            assertFalse(result.getExecutionInfo().isEmpty(), "Expected 
execution info to be collected");
+            assertTrue(
+                    result.getExecutionInfo().get(0).contains("Handled in"),
+                    "Expected execution info to contain handling information"
+            );
+            assertTrue(
+                    
result.getExecutionInfo().get(0).contains(RocksDBTimestampedStoreWithHeaders.class.getName()),
+                    "Expected execution info to mention the class name"
+            );
+        } finally {
+            store.close();
+        }
     }
 
     @Test
-    public void shouldThrowOnGetPositionForHeadersStoreAdapter() {
+    public void shouldHandleKeyQueryOnAdaptedTimestampedStore() {
         when(supplier.name()).thenReturn("test-store");
         when(supplier.metricsScope()).thenReturn("metricScope");
         when(supplier.get()).thenReturn(new 
RocksDBTimestampedStore("test-store", "metrics-scope"));
@@ -352,19 +685,48 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
                 .withCachingDisabled()
                 .build();
 
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-                store::getPosition
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
         );
-
-        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
+        store.init(context, store);
+
+        try {
+            // Put data into the store (headers will be discarded when adapted 
to timestamped store)
+            final Headers headers = new RecordHeaders();
+            headers.add("adapter", "test".getBytes());
+            store.put("test-key", ValueTimestampHeaders.make("adapter-value", 
55555L, headers));
+
+            // Verify adapter is used for legacy timestamped store
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped,
+                    "Expected TimestampedToHeadersStoreAdapter for legacy 
timestamped store");
+
+            // Query at typed level - KeyQuery should return just the value
+            final KeyQuery<String, String> query = 
KeyQuery.withKey("test-key");
+            final QueryResult<String> result = store.query(query, 
PositionBound.unbounded(), new QueryConfig(false));
+
+            // Verify IQv2 query result
+            // Adapter delegates to RocksDBTimestampedStore which supports 
IQv2 through RocksDBStore
+            assertTrue(result.isSuccess(),
+                    "Expected query to succeed since RocksDBTimestampedStore 
supports IQv2");
+            assertNotNull(result.getPosition(), "Expected position to be set");
+            assertInstanceOf(String.class, result.getResult());
+            assertEquals("adapter-value", result.getResult(), "KeyQuery should 
return just the value");
+        } finally {
+            store.close();
+        }
     }
 
     @Test
-    public void shouldThrowOnGetPositionForNativeHeadersStore() {
+    public void shouldHandleTimestampedKeyQueryOnAdaptedTimestampedStore() {
         when(supplier.name()).thenReturn("test-store");
         when(supplier.metricsScope()).thenReturn("metricScope");
-        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStore("test-store", "metrics-scope"));
 
         builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
                 supplier,
@@ -378,12 +740,96 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
                 .withCachingDisabled()
                 .build();
 
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-                store::getPosition
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
         );
-
-        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
+        store.init(context, store);
+
+        try {
+            // Put data into the store (headers will be discarded when adapted 
to timestamped store)
+            final Headers headers = new RecordHeaders();
+            headers.add("adapter", "test".getBytes());
+            store.put("test-key", ValueTimestampHeaders.make("adapter-value", 
55555L, headers));
+
+            // Verify adapter is used for legacy timestamped store
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped,
+                    "Expected TimestampedToHeadersStoreAdapter for legacy 
timestamped store");
+
+            // Query at typed level - TimestampedKeyQuery should return value 
+ timestamp
+            final TimestampedKeyQuery<String, String> query = 
TimestampedKeyQuery.withKey("test-key");
+            final QueryResult<ValueAndTimestamp<String>> result = 
store.query(query, PositionBound.unbounded(), new QueryConfig(false));
+
+            // Verify IQv2 query result
+            // Adapter delegates to RocksDBTimestampedStore which supports 
IQv2 through RocksDBStore
+            assertTrue(result.isSuccess(),
+                    "Expected query to succeed since RocksDBTimestampedStore 
supports IQv2");
+            assertNotNull(result.getPosition(), "Expected position to be set");
+            assertNotNull(result.getResult(), "Expected non-null result");
+            assertInstanceOf(ValueAndTimestamp.class, result.getResult());
+            assertEquals("adapter-value", result.getResult().value(), 
"TimestampedKeyQuery should return the value");
+            assertEquals(55555L, result.getResult().timestamp(), 
"TimestampedKeyQuery should return the timestamp");
+        } finally {
+            store.close();
+        }
     }
 
-}
\ No newline at end of file
+    @Test
+    public void shouldCollectExecutionInfoForQueryOnAdaptedTimestampedStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final File dir = TestUtils.tempDirectory();
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+                dir,
+                Serdes.String(),
+                Serdes.String(),
+                new StreamsConfig(props)
+        );
+        store.init(context, store);
+
+        try {
+            final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test-key".getBytes()));
+            final PositionBound positionBound = PositionBound.unbounded();
+            final QueryConfig config = new QueryConfig(true); // Enable 
execution info
+
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            final QueryResult<byte[]> result = wrapped.query(query, 
positionBound, config);
+
+            // Verify: Execution info includes both adapter and underlying 
store
+            assertFalse(result.getExecutionInfo().isEmpty(), "Expected 
execution info to be collected");
+
+            final String executionInfo = String.join("\n", 
result.getExecutionInfo());
+            assertTrue(
+                    executionInfo.contains("Handled in"),
+                    "Expected execution info to contain handling information"
+            );
+            // Should mention the adapter class
+            assertTrue(
+                    
executionInfo.contains(TimestampedToHeadersStoreAdapter.class.getName()),
+                    "Expected execution info to mention the adapter class"
+            );
+        } finally {
+            store.close();
+        }
+    }
+}

Reply via email to