This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8cad0403741 KAFKA-20261: Upgrade path for plain window store to 
headers store (#21710)
8cad0403741 is described below

commit 8cad0403741e7a0f0f19648deb437eb62d303098
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Mar 13 05:48:59 2026 +0000

    KAFKA-20261: Upgrade path for plain window store to headers store (#21710)
    
    Extends headers-aware window store migration to support upgrading from
    plain (non-timestamped) stores.
    - Plain-to-Headers Migration Support
    - Implemented query() method in the adapter class
    
    Testing:
    - 3 upgrade intergration tests
    - 2 downgrade integration tests
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 344 +++++++++++++++++++++
 .../MeteredTimestampedWindowStoreWithHeaders.java  |  19 +-
 .../PlainToHeadersWindowStoreAdapter.java          | 238 ++++++++++++++
 .../PlainToHeadersWindowStoreIteratorAdapter.java  |  35 +++
 .../TimestampedWindowStoreWithHeadersBuilder.java  |   6 +-
 .../PlainToHeadersWindowStoreAdapterTest.java      | 303 ++++++++++++++++++
 6 files changed, 940 insertions(+), 5 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 145720c4374..ea6d141ad50 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -628,6 +629,123 @@ public class HeadersStoreUpgradeIntegrationTest {
         }
     }
 
+    @Test
+    public void 
shouldMigrateInMemoryPlainWindowStoreToTimestampedWindowStoreWithHeaders() 
throws Exception {
+        
shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentPlainWindowStoreToTimestampedWindowStoreWithHeaders() 
throws Exception {
+        shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(true);
+    }
+
+    private void 
shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(final boolean 
persistentStore) throws Exception {
+        // Run with old plain WindowStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.windowStoreBuilder(
+                    persistentStore
+                        ? Stores.persistentWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime + 
100);
+        processPlainWindowedKeyValueAndVerify("key2", "value2", baseTime + 
200);
+        processPlainWindowedKeyValueAndVerify("key3", "value3", baseTime + 
300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Restart with TimestampedWindowStoreWithHeaders
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    persistentStore
+                        ? 
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1", 
baseTime + 100, persistentStore ? -1L : baseTime + 100);
+        verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2", 
baseTime + 200, persistentStore ? -1L : baseTime + 200);
+        verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key3", "value3", 
baseTime + 300, persistentStore ? -1L : baseTime + 300);
+
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "migration-test".getBytes());
+        headers.add("version", "1.0".getBytes());
+
+        processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", 
baseTime + 350, headers, headers);
+        processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void 
shouldProxyPlainWindowStoreToTimestampedWindowStoreWithHeaders() throws 
Exception {
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.windowStoreBuilder(
+                    Stores.persistentWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime + 
100);
+        processPlainWindowedKeyValueAndVerify("key2", "value2", baseTime + 
200);
+        processPlainWindowedKeyValueAndVerify("key3", "value3", baseTime + 
300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Restart with headers-aware builder but non-headers supplier 
(proxy/adapter mode)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    Stores.persistentWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),  // 
non-headers supplier!
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1", 
baseTime + 100, -1L);
+        verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2", 
baseTime + 200, -1L);
+        verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key3", "value3", 
baseTime + 300, -1L);
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+        headers.add("version", "2.0".getBytes());
+
+        // In proxy mode with plain store, headers and timestamps are not 
preserved
+        final RecordHeaders expectedHeaders = new RecordHeaders();
+
+        processPlainWindowedKeyValueWithHeadersAndVerify("key3", 
"value3-updated", baseTime + 350, headers, expectedHeaders);
+        processPlainWindowedKeyValueWithHeadersAndVerify("key4", "value4", 
baseTime + 400, headers, expectedHeaders);
+
+        kafkaStreams.close();
+    }
+
     @Test
     public void 
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
 throws Exception {
         
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
@@ -748,6 +866,136 @@ public class HeadersStoreUpgradeIntegrationTest {
         kafkaStreams.close();
     }
 
+    private void processPlainWindowedKeyValueAndVerify(final String key,
+                                                       final String value,
+                                                       final long timestamp) 
throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            List.of(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, String> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+                final String result = store.fetch(key, windowStart);
+
+                return result != null && result.equals(value);
+            } catch (final Exception e) {
+                return false;
+            }
+        }, 60_000L, "Could not verify plain window value in time.");
+    }
+
+    private void verifyPlainWindowValueWithEmptyHeadersAndTimestamp(final 
String key,
+                                                                    final 
String value,
+                                                                    final long 
windowTimestamp,
+                                                                    final long 
expectedTimestamp) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = windowTimestamp - (windowTimestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                assertNotNull(result, "Result should not be null");
+                assertEquals(value, result.value(), "Value should match");
+                assertEquals(expectedTimestamp, result.timestamp(), "Timestamp 
should be " + expectedTimestamp + " for plain store migration");
+
+                // Verify headers exist but are empty (migrated from plain 
store without headers or timestamps)
+                assertNotNull(result.headers(), "Headers should not be null 
for migrated data");
+                assertEquals(0, result.headers().toArray().length, "Headers 
should be empty for migrated data");
+
+                return true;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify plain window value with empty headers 
and timestamp in time.");
+    }
+
+    private void processPlainWindowedKeyValueWithHeadersAndVerify(final String 
key,
+                                                                  final String 
value,
+                                                                  final long 
timestamp,
+                                                                  final 
Headers headers,
+                                                                  final 
Headers expectedHeaders) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            List.of(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                // For plain window stores, timestamp is always -1 since it's 
not preserved
+                return result != null
+                    && result.value().equals(value)
+                    && result.timestamp() == -1L
+                    && result.headers().equals(expectedHeaders);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify plain windowed value with headers in 
time.");
+    }
+
     private void processWindowedKeyValueAndVerifyTimestamped(final String key,
                                                              final String 
value,
                                                              final long 
timestamp) throws Exception {
@@ -878,6 +1126,24 @@ public class HeadersStoreUpgradeIntegrationTest {
         }, 60_000L, "Could not verify legacy value with empty headers in 
time.");
     }
 
+    /**
+     * Processor for plain WindowStore (without timestamps or headers).
+     */
+    private static class PlainWindowedProcessor implements Processor<String, 
String, Void, Void> {
+        private WindowStore<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(WINDOW_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStart = record.timestamp() - (record.timestamp() 
% WINDOW_SIZE_MS);
+            store.put(record.key(), record.value(), windowStart);
+        }
+    }
+
     /**
      * Processor for TimestampedWindowStore (without headers).
      */
@@ -1065,6 +1331,54 @@ public class HeadersStoreUpgradeIntegrationTest {
     }
 
 
+    @Test
+    public void 
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToPlainWindowStore() 
throws Exception {
+        final Properties props = props();
+        setupAndPopulateWindowStoreWithHeaders(props, 
List.of(KeyValue.pair("key1", 100L)));
+        kafkaStreams = null;
+
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.windowStoreBuilder(
+                    Stores.persistentWindowStore(WINDOW_STORE_NAME,
+                        Duration.ofMillis(RETENTION_MS),
+                        Duration.ofMillis(WINDOW_SIZE_MS),
+                        false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+        boolean exceptionThrown = false;
+        try {
+            kafkaStreams.start();
+        } catch (final Exception e) {
+            Throwable cause = e;
+            while (cause != null) {
+                if (cause instanceof ProcessorStateException &&
+                    cause.getMessage() != null &&
+                    cause.getMessage().contains("headers-aware") &&
+                    cause.getMessage().contains("Downgrade")) {
+                    exceptionThrown = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            if (!exceptionThrown) {
+                throw new AssertionError("Expected ProcessorStateException 
about downgrade not being supported, but got: " + e.getMessage(), e);
+            }
+        } finally {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+        }
+
+        if (!exceptionThrown) {
+            throw new AssertionError("Expected ProcessorStateException to be 
thrown when attempting to downgrade from headers-aware to plain window store");
+        }
+    }
+
     @Test
     public void 
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
 throws Exception {
         final Properties props = props();
@@ -1114,6 +1428,36 @@ public class HeadersStoreUpgradeIntegrationTest {
         }
     }
 
+    @Test
+    public void 
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersToPlainWindowStoreAfterCleanup()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateWindowStoreWithHeaders(props, 
asList(KeyValue.pair("key1", 100L), KeyValue.pair("key2", 200L)));
+
+        kafkaStreams.cleanUp();
+        kafkaStreams = null;
+
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.windowStoreBuilder(
+                    Stores.persistentWindowStore(WINDOW_STORE_NAME,
+                        Duration.ofMillis(RETENTION_MS),
+                        Duration.ofMillis(WINDOW_SIZE_MS),
+                        false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long newTime = CLUSTER.time.milliseconds();
+        processPlainWindowedKeyValueAndVerify("key3", "value3", newTime + 300);
+        processPlainWindowedKeyValueAndVerify("key4", "value4", newTime + 400);
+
+        kafkaStreams.close();
+    }
+
     @Test
     public void 
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfterCleanup() 
throws Exception {
         final Properties props = props();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 85a2dfe2e77..2131bb4ec96 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -412,12 +412,23 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, 
V>
     private boolean isUnderlyingStoreTimestamped() {
         Object store = wrapped();
         do {
-            if (store instanceof TimestampedBytesStore
-                    || store instanceof 
TimestampedToHeadersWindowStoreAdapter) {
+            // Check adapters first before attempting to unwrap
+            if (store instanceof TimestampedToHeadersWindowStoreAdapter) {
                 return true;
             }
-            store = ((WrappedStateStore<?, ?, ?>) store).wrapped();
-        } while ((store instanceof WrappedStateStore));
+            if (store instanceof PlainToHeadersWindowStoreAdapter) {
+                return false; // Plain store doesn't preserve timestamps
+            }
+            if (store instanceof TimestampedBytesStore) {
+                return true;
+            }
+            // Only unwrap if it's a WrappedStateStore
+            if (store instanceof WrappedStateStore) {
+                store = ((WrappedStateStore<?, ?, ?>) store).wrapped();
+            } else {
+                break;
+            }
+        } while (true);
         return store instanceof TimestampedBytesStore;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
new file mode 100644
index 00000000000..88be98af91b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+
+/**
+ * Adapter for backward compatibility between {@link 
TimestampedWindowStoreWithHeaders}
+ * and plain {@link WindowStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code WindowStore} (without 
timestamp or headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates 
between the plain
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]} 
format.
+ * <p>
+ * Format conversion:
+ * <ul>
+ *   <li>Write: {@code [headers][timestamp][value]} → {@code [value]} (strip 
headers and timestamp)</li>
+ *   <li>Read: {@code [value]} → {@code [headers][timestamp][value]} (add 
empty headers and timestamp=-1)</li>
+ * </ul>
+ */
+public class PlainToHeadersWindowStoreAdapter implements WindowStore<Bytes, 
byte[]> {
+    private final WindowStore<Bytes, byte[]> store;
+
+    public PlainToHeadersWindowStoreAdapter(final WindowStore<Bytes, byte[]> 
store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        if (store instanceof TimestampedBytesStore) {
+            throw new IllegalArgumentException("Provided store must be a plain 
(non-timestamped) window store, but it is timestamped.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] 
valueWithTimestampAndHeaders, final long windowStartTimestamp) {
+        store.put(key, rawPlainValue(valueWithTimestampAndHeaders), 
windowStartTimestamp);
+    }
+
+    @Override
+    public byte[] fetch(final Bytes key, final long timestamp) {
+        return convertFromPlainToHeaderFormat(store.fetch(key, timestamp));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> fetch(final Bytes key, final long 
timeFrom, final long timeTo) {
+        return new PlainToHeadersWindowStoreIteratorAdapter(store.fetch(key, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant 
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new PlainToHeadersWindowStoreIteratorAdapter(store.fetch(key, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final 
long timeFrom, final long timeTo) {
+        return new 
PlainToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final 
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
PlainToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                           final long 
timeFrom, final long timeTo) {
+        return new PlainToHeadersIteratorAdapter<>(store.fetch(keyFrom, keyTo, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                           final Instant 
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new PlainToHeadersIteratorAdapter<>(store.fetch(keyFrom, keyTo, 
timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                                   final long 
timeFrom, final long timeTo) {
+        return new 
PlainToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom, final Bytes keyTo,
+                                                                   final 
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
PlainToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo, timeFrom, 
timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom, final long timeTo) {
+        return new PlainToHeadersIteratorAdapter<>(store.fetchAll(timeFrom, 
timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant 
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new PlainToHeadersIteratorAdapter<>(store.fetchAll(timeFrom, 
timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final 
long timeFrom, final long timeTo) {
+        return new 
PlainToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final 
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+        return new 
PlainToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+        return new PlainToHeadersIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+        return new PlainToHeadersIteratorAdapter<>(store.backwardAll());
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+
+        // Handle WindowKeyQuery: wrap iterator to convert from plain to 
headers format
+        if (query instanceof WindowKeyQuery) {
+            final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = 
(WindowKeyQuery<Bytes, byte[]>) query;
+            final QueryResult<WindowStoreIterator<byte[]>> rawResult = 
store.query(windowKeyQuery, positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final WindowStoreIterator<byte[]> wrappedIterator =
+                    new 
PlainToHeadersWindowStoreIteratorAdapter(rawResult.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
wrappedIterator);
+            } else {
+                result = (QueryResult<R>) rawResult;
+            }
+        } else if (query instanceof WindowRangeQuery) {
+            // Handle WindowRangeQuery: wrap iterator to convert values
+            final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = 
(WindowRangeQuery<Bytes, byte[]>) query;
+            final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> 
rawResult =
+                store.query(windowRangeQuery, positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final KeyValueIterator<Windowed<Bytes>, byte[]> 
wrappedIterator =
+                    new PlainToHeadersIteratorAdapter<>(rawResult.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
wrappedIterator);
+            } else {
+                result = (QueryResult<R>) rawResult;
+            }
+        } else {
+            // For other query types, delegate to the underlying store
+            result = store.query(query, positionBound, config);
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns"
+            );
+        }
+
+        return result;
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        store.init(context, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreIteratorAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreIteratorAdapter.java
new file mode 100644
index 00000000000..7227d9fa78a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreIteratorAdapter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * Iterator adapter for {@link WindowStoreIterator} that converts plain values
+ * to timestamp-with-headers format by adding empty headers and timestamp=-1.
+ * <p>
+ * This extends {@link PlainToHeadersIteratorAdapter} to also implement the
+ * {@link WindowStoreIterator} interface marker.
+ */
+class PlainToHeadersWindowStoreIteratorAdapter
+    extends PlainToHeadersIteratorAdapter<Long>
+    implements WindowStoreIterator<byte[]> {
+
+    PlainToHeadersWindowStoreIteratorAdapter(final WindowStoreIterator<byte[]> 
innerIterator) {
+        super(innerIterator);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index c5957ca6748..3714643d43b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -68,7 +68,11 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
 
         if (!(store instanceof HeadersBytesStore)) {
             if (store.persistent()) {
-                store = new TimestampedToHeadersWindowStoreAdapter(store);
+                if (store instanceof TimestampedBytesStore) {
+                    store = new TimestampedToHeadersWindowStoreAdapter(store);
+                } else {
+                    store = new PlainToHeadersWindowStoreAdapter(store);
+                }
             } else {
                 store = new 
InMemoryTimestampedWindowStoreWithHeadersMarker(store);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapterTest.java
new file mode 100644
index 00000000000..6c3400c344e
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapterTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PlainToHeadersWindowStoreAdapterTest {
+
+    private static final long WINDOW_SIZE = 10_000L;
+    private static final long RETENTION_PERIOD = 60_000L;
+    private static final long SEGMENT_INTERVAL = 30_000L;
+
+    private PlainToHeadersWindowStoreAdapter adapter;
+    private RocksDBWindowStore underlyingStore;
+    private InternalMockProcessorContext<String, String> context;
+    private File baseDir;
+
+    @BeforeEach
+    public void setUp() {
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        baseDir = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            baseDir,
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(props)
+        );
+
+        final SegmentedBytesStore segmentedBytesStore = new 
RocksDBSegmentedBytesStore(
+            "iqv2-test-store",
+            "test-metrics-scope",
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new WindowKeySchema()
+        );
+
+        underlyingStore = new RocksDBWindowStore(
+            segmentedBytesStore,
+            false,
+            WINDOW_SIZE
+        );
+
+        adapter = new PlainToHeadersWindowStoreAdapter(underlyingStore);
+        adapter.init(context, adapter);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (adapter != null) {
+            adapter.close();
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldHandleWindowKeyQuerySuccessfully() {
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.persistentWindowStore(
+                    "typed-adapter-test",
+                    ofMillis(RETENTION_PERIOD),
+                    ofMillis(WINDOW_SIZE),
+                    false),
+                Serdes.String(),
+                Serdes.String())
+            .withLoggingDisabled()
+            .build();
+
+        store.init(context, store);
+
+        try {
+            final Headers headers1 = new RecordHeaders();
+            headers1.add("key", "header1".getBytes());
+            store.put("test-key", ValueTimestampHeaders.make("value1", 1000L, 
headers1), 1000L);
+
+            final Headers headers2 = new RecordHeaders();
+            headers2.add("key", "header2".getBytes());
+            store.put("test-key", ValueTimestampHeaders.make("value2", 5000L, 
headers2), 5000L);
+
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(PlainToHeadersWindowStoreAdapter.class, wrapped,
+                "Expected PlainToHeadersWindowStoreAdapter for plain window 
store");
+
+            // Query at typed level - For plain stores, WindowKeyQuery returns 
plain V (String), not ValueTimestampHeaders
+            final WindowKeyQuery<String, ValueTimestampHeaders<String>> query 
= WindowKeyQuery.withKeyAndWindowStartRange(
+                "test-key",
+                Instant.ofEpochMilli(0),
+                Instant.ofEpochMilli(10000L)
+            );
+            final QueryResult<WindowStoreIterator<String>> result =
+                (QueryResult<WindowStoreIterator<String>>) (QueryResult<?>) 
store.query(query, PositionBound.unbounded(), new QueryConfig(false));
+
+            assertTrue(result.isSuccess(), "Expected query to succeed");
+            assertNotNull(result.getPosition(), "Expected position to be set");
+            assertNotNull(result.getResult(), "Expected non-null result");
+
+            final WindowStoreIterator<String> iterator = result.getResult();
+            assertTrue(iterator.hasNext(), "Expected at least one result");
+
+            KeyValue<Long, String> kv = iterator.next();
+            assertEquals(1000L, kv.key, "Expected first window timestamp");
+            assertEquals("value1", kv.value, "WindowKeyQuery should return the 
plain value");
+
+            assertTrue(iterator.hasNext(), "Expected second result");
+            kv = iterator.next();
+            assertEquals(5000L, kv.key, "Expected second window timestamp");
+            assertEquals("value2", kv.value, "WindowKeyQuery should return the 
plain value");
+
+            assertFalse(iterator.hasNext(), "Expected no more results");
+            iterator.close();
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldHandleWindowRangeQuerySuccessfully() {
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.persistentWindowStore(
+                    "typed-range-adapter-test",
+                    ofMillis(RETENTION_PERIOD),
+                    ofMillis(WINDOW_SIZE),
+                    false),
+                Serdes.String(),
+                Serdes.String())
+            .withLoggingDisabled()
+            .build();
+
+        store.init(context, store);
+
+        try {
+            final Headers headers1 = new RecordHeaders();
+            headers1.add("source", "key1".getBytes());
+            store.put("key1", ValueTimestampHeaders.make("value1", 1000L, 
headers1), 1000L);
+
+            final Headers headers2 = new RecordHeaders();
+            headers2.add("source", "key2".getBytes());
+            store.put("key2", ValueTimestampHeaders.make("value2", 5000L, 
headers2), 5000L);
+
+            final Headers headers3 = new RecordHeaders();
+            headers3.add("source", "key3".getBytes());
+            store.put("key3", ValueTimestampHeaders.make("value3", 3000L, 
headers3), 3000L);
+
+            final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+            assertInstanceOf(PlainToHeadersWindowStoreAdapter.class, wrapped,
+                "Expected PlainToHeadersWindowStoreAdapter for plain window 
store");
+
+            // Query at typed level - For plain stores, WindowRangeQuery 
returns plain V (String), not ValueTimestampHeaders
+            final WindowRangeQuery<String, ValueTimestampHeaders<String>> 
query = WindowRangeQuery.withWindowStartRange(
+                Instant.ofEpochMilli(0),
+                Instant.ofEpochMilli(10000L)
+            );
+            final QueryResult<KeyValueIterator<Windowed<String>, String>> 
result =
+                (QueryResult<KeyValueIterator<Windowed<String>, String>>) 
(QueryResult<?>) store.query(query, PositionBound.unbounded(), new 
QueryConfig(false));
+
+            assertTrue(result.isSuccess(), "Expected query to succeed");
+            assertNotNull(result.getResult(), "Expected result iterator to be 
present");
+            assertNotNull(result.getPosition(), "Expected position to be set");
+
+            final KeyValueIterator<Windowed<String>, String> iterator = 
result.getResult();
+
+            assertTrue(iterator.hasNext(), "Expected at least one result");
+            KeyValue<Windowed<String>, String> kv = iterator.next();
+            assertEquals("key1", kv.key.key(), "Expected first key");
+            assertEquals(1000L, kv.key.window().start(), "Expected first 
window start");
+            assertEquals("value1", kv.value, "WindowRangeQuery should return 
the plain value");
+
+            assertTrue(iterator.hasNext(), "Expected second result");
+            kv = iterator.next();
+            assertEquals("key2", kv.key.key(), "Expected second key");
+            assertEquals(5000L, kv.key.window().start(), "Expected second 
window start");
+            assertEquals("value2", kv.value, "WindowRangeQuery should return 
the plain value");
+
+            assertTrue(iterator.hasNext(), "Expected third result");
+            kv = iterator.next();
+            assertEquals("key3", kv.key.key(), "Expected third key");
+            assertEquals(3000L, kv.key.window().start(), "Expected third 
window start");
+            assertEquals("value3", kv.value, "WindowRangeQuery should return 
the plain value");
+
+            assertFalse(iterator.hasNext(), "Expected no more results");
+            iterator.close();
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoForWindowKeyQueryWhenRequested() {
+        final WindowKeyQuery<Bytes, byte[]> query = 
WindowKeyQuery.withKeyAndWindowStartRange(
+            new Bytes("test-key".getBytes()),
+            Instant.ofEpochMilli(0),
+            Instant.ofEpochMilli(Long.MAX_VALUE)
+        );
+        final PositionBound positionBound = PositionBound.unbounded();
+        final QueryConfig config = new QueryConfig(true); // Enable execution 
info
+
+        final QueryResult<WindowStoreIterator<byte[]>> result = 
adapter.query(query, positionBound, config);
+
+        assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution 
info to be collected");
+        boolean foundAdapterInfo = false;
+        for (final String info : result.getExecutionInfo()) {
+            if (info.contains("Handled in") && 
info.contains(PlainToHeadersWindowStoreAdapter.class.getName())) {
+                foundAdapterInfo = true;
+                break;
+            }
+        }
+        assertTrue(foundAdapterInfo, "Expected execution info to mention the 
adapter class");
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoForWindowRangeQueryWhenRequested() {
+        final WindowRangeQuery<Bytes, byte[]> query = 
WindowRangeQuery.withWindowStartRange(
+            Instant.ofEpochMilli(0),
+            Instant.ofEpochMilli(Long.MAX_VALUE)
+        );
+        final PositionBound positionBound = PositionBound.unbounded();
+        final QueryConfig config = new QueryConfig(true); // Enable execution 
info
+
+        final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+            adapter.query(query, positionBound, config);
+
+        assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution 
info to be collected");
+        boolean foundAdapterInfo = false;
+        for (final String info : result.getExecutionInfo()) {
+            if (info.contains("Handled in") && 
info.contains(PlainToHeadersWindowStoreAdapter.class.getName())) {
+                foundAdapterInfo = true;
+                break;
+            }
+        }
+        assertTrue(foundAdapterInfo, "Expected execution info to mention the 
adapter class");
+    }
+
+    @Test
+    public void shouldNotCollectExecutionInfoWhenNotRequested() {
+        final WindowKeyQuery<Bytes, byte[]> query = 
WindowKeyQuery.withKeyAndWindowStartRange(
+            new Bytes("test-key".getBytes()),
+            Instant.ofEpochMilli(0),
+            Instant.ofEpochMilli(Long.MAX_VALUE)
+        );
+        final PositionBound positionBound = PositionBound.unbounded();
+        final QueryConfig config = new QueryConfig(false); // Disable 
execution info
+
+        final QueryResult<WindowStoreIterator<byte[]>> result = 
adapter.query(query, positionBound, config);
+
+        boolean foundAdapterInfo = false;
+        for (final String info : result.getExecutionInfo()) {
+            if 
(info.contains(PlainToHeadersWindowStoreAdapter.class.getName())) {
+                foundAdapterInfo = true;
+                break;
+            }
+        }
+        assertFalse(foundAdapterInfo, "Expected no execution info from adapter 
when not requested");
+    }
+}

Reply via email to