This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8cad0403741 KAFKA-20261: Upgrade path for plain window store to
headers store (#21710)
8cad0403741 is described below
commit 8cad0403741e7a0f0f19648deb437eb62d303098
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Mar 13 05:48:59 2026 +0000
KAFKA-20261: Upgrade path for plain window store to headers store (#21710)
Extends headers-aware window store migration to support upgrading from
plain (non-timestamped) stores.
- Plain-to-Headers Migration Support
- Implemented query() method in the adapter class
Testing:
- 3 upgrade intergration tests
- 2 downgrade integration tests
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 344 +++++++++++++++++++++
.../MeteredTimestampedWindowStoreWithHeaders.java | 19 +-
.../PlainToHeadersWindowStoreAdapter.java | 238 ++++++++++++++
.../PlainToHeadersWindowStoreIteratorAdapter.java | 35 +++
.../TimestampedWindowStoreWithHeadersBuilder.java | 6 +-
.../PlainToHeadersWindowStoreAdapterTest.java | 303 ++++++++++++++++++
6 files changed, 940 insertions(+), 5 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 145720c4374..ea6d141ad50 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -628,6 +629,123 @@ public class HeadersStoreUpgradeIntegrationTest {
}
}
+ @Test
+ public void
shouldMigrateInMemoryPlainWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentPlainWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+ shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ private void
shouldMigratePlainWindowStoreToTimestampedWindowStoreWithHeaders(final boolean
persistentStore) throws Exception {
+ // Run with old plain WindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.windowStoreBuilder(
+ persistentStore
+ ? Stores.persistentWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime +
100);
+ processPlainWindowedKeyValueAndVerify("key2", "value2", baseTime +
200);
+ processPlainWindowedKeyValueAndVerify("key3", "value3", baseTime +
300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with TimestampedWindowStoreWithHeaders
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1",
baseTime + 100, persistentStore ? -1L : baseTime + 100);
+ verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2",
baseTime + 200, persistentStore ? -1L : baseTime + 200);
+ verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key3", "value3",
baseTime + 300, persistentStore ? -1L : baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyPlainWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processPlainWindowedKeyValueAndVerify("key1", "value1", baseTime +
100);
+ processPlainWindowedKeyValueAndVerify("key2", "value2", baseTime +
200);
+ processPlainWindowedKeyValueAndVerify("key3", "value3", baseTime +
300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key1", "value1",
baseTime + 100, -1L);
+ verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key2", "value2",
baseTime + 200, -1L);
+ verifyPlainWindowValueWithEmptyHeadersAndTimestamp("key3", "value3",
baseTime + 300, -1L);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+ headers.add("version", "2.0".getBytes());
+
+ // In proxy mode with plain store, headers and timestamps are not
preserved
+ final RecordHeaders expectedHeaders = new RecordHeaders();
+
+ processPlainWindowedKeyValueWithHeadersAndVerify("key3",
"value3-updated", baseTime + 350, headers, expectedHeaders);
+ processPlainWindowedKeyValueWithHeadersAndVerify("key4", "value4",
baseTime + 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
@Test
public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
@@ -748,6 +866,136 @@ public class HeadersStoreUpgradeIntegrationTest {
kafkaStreams.close();
}
+ private void processPlainWindowedKeyValueAndVerify(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ List.of(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, String> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final String result = store.fetch(key, windowStart);
+
+ return result != null && result.equals(value);
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify plain window value in time.");
+ }
+
+ private void verifyPlainWindowValueWithEmptyHeadersAndTimestamp(final
String key,
+ final
String value,
+ final long
windowTimestamp,
+ final long
expectedTimestamp) throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = windowTimestamp - (windowTimestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(expectedTimestamp, result.timestamp(), "Timestamp
should be " + expectedTimestamp + " for plain store migration");
+
+ // Verify headers exist but are empty (migrated from plain
store without headers or timestamps)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify plain window value with empty headers
and timestamp in time.");
+ }
+
+ private void processPlainWindowedKeyValueWithHeadersAndVerify(final String
key,
+ final String
value,
+ final long
timestamp,
+ final
Headers headers,
+ final
Headers expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ List.of(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ // For plain window stores, timestamp is always -1 since it's
not preserved
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == -1L
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify plain windowed value with headers in
time.");
+ }
+
private void processWindowedKeyValueAndVerifyTimestamped(final String key,
final String
value,
final long
timestamp) throws Exception {
@@ -878,6 +1126,24 @@ public class HeadersStoreUpgradeIntegrationTest {
}, 60_000L, "Could not verify legacy value with empty headers in
time.");
}
+ /**
+ * Processor for plain WindowStore (without timestamps or headers).
+ */
+ private static class PlainWindowedProcessor implements Processor<String,
String, Void, Void> {
+ private WindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), record.value(), windowStart);
+ }
+ }
+
/**
* Processor for TimestampedWindowStore (without headers).
*/
@@ -1065,6 +1331,54 @@ public class HeadersStoreUpgradeIntegrationTest {
}
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToPlainWindowStore()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateWindowStoreWithHeaders(props,
List.of(KeyValue.pair("key1", 100L)));
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ boolean exceptionThrown = false;
+ try {
+ kafkaStreams.start();
+ } catch (final Exception e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof ProcessorStateException &&
+ cause.getMessage() != null &&
+ cause.getMessage().contains("headers-aware") &&
+ cause.getMessage().contains("Downgrade")) {
+ exceptionThrown = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException
about downgrade not being supported, but got: " + e.getMessage(), e);
+ }
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException to be
thrown when attempting to downgrade from headers-aware to plain window store");
+ }
+ }
+
@Test
public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
final Properties props = props();
@@ -1114,6 +1428,36 @@ public class HeadersStoreUpgradeIntegrationTest {
}
}
+ @Test
+ public void
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersToPlainWindowStoreAfterCleanup()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateWindowStoreWithHeaders(props,
asList(KeyValue.pair("key1", 100L), KeyValue.pair("key2", 200L)));
+
+ kafkaStreams.cleanUp();
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(PlainWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long newTime = CLUSTER.time.milliseconds();
+ processPlainWindowedKeyValueAndVerify("key3", "value3", newTime + 300);
+ processPlainWindowedKeyValueAndVerify("key4", "value4", newTime + 400);
+
+ kafkaStreams.close();
+ }
+
@Test
public void
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfterCleanup()
throws Exception {
final Properties props = props();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 85a2dfe2e77..2131bb4ec96 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -412,12 +412,23 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
private boolean isUnderlyingStoreTimestamped() {
Object store = wrapped();
do {
- if (store instanceof TimestampedBytesStore
- || store instanceof
TimestampedToHeadersWindowStoreAdapter) {
+ // Check adapters first before attempting to unwrap
+ if (store instanceof TimestampedToHeadersWindowStoreAdapter) {
return true;
}
- store = ((WrappedStateStore<?, ?, ?>) store).wrapped();
- } while ((store instanceof WrappedStateStore));
+ if (store instanceof PlainToHeadersWindowStoreAdapter) {
+ return false; // Plain store doesn't preserve timestamps
+ }
+ if (store instanceof TimestampedBytesStore) {
+ return true;
+ }
+ // Only unwrap if it's a WrappedStateStore
+ if (store instanceof WrappedStateStore) {
+ store = ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ } else {
+ break;
+ }
+ } while (true);
return store instanceof TimestampedBytesStore;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
new file mode 100644
index 00000000000..88be98af91b
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.Map;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+
+/**
+ * Adapter for backward compatibility between {@link
TimestampedWindowStoreWithHeaders}
+ * and plain {@link WindowStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code WindowStore} (without
timestamp or headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates
between the plain
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]}
format.
+ * <p>
+ * Format conversion:
+ * <ul>
+ * <li>Write: {@code [headers][timestamp][value]} → {@code [value]} (strip
headers and timestamp)</li>
+ * <li>Read: {@code [value]} → {@code [headers][timestamp][value]} (add
empty headers and timestamp=-1)</li>
+ * </ul>
+ */
+public class PlainToHeadersWindowStoreAdapter implements WindowStore<Bytes,
byte[]> {
+ private final WindowStore<Bytes, byte[]> store;
+
+ public PlainToHeadersWindowStoreAdapter(final WindowStore<Bytes, byte[]>
store) {
+ if (!store.persistent()) {
+ throw new IllegalArgumentException("Provided store must be a
persistent store, but it is not.");
+ }
+ if (store instanceof TimestampedBytesStore) {
+ throw new IllegalArgumentException("Provided store must be a plain
(non-timestamped) window store, but it is timestamped.");
+ }
+ this.store = store;
+ }
+
+ @Override
+ public void put(final Bytes key, final byte[]
valueWithTimestampAndHeaders, final long windowStartTimestamp) {
+ store.put(key, rawPlainValue(valueWithTimestampAndHeaders),
windowStartTimestamp);
+ }
+
+ @Override
+ public byte[] fetch(final Bytes key, final long timestamp) {
+ return convertFromPlainToHeaderFormat(store.fetch(key, timestamp));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> fetch(final Bytes key, final long
timeFrom, final long timeTo) {
+ return new PlainToHeadersWindowStoreIteratorAdapter(store.fetch(key,
timeFrom, timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new PlainToHeadersWindowStoreIteratorAdapter(store.fetch(key,
timeFrom, timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final
long timeFrom, final long timeTo) {
+ return new
PlainToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key, timeFrom,
timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
PlainToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key, timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
keyFrom, final Bytes keyTo,
+ final long
timeFrom, final long timeTo) {
+ return new PlainToHeadersIteratorAdapter<>(store.fetch(keyFrom, keyTo,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
keyFrom, final Bytes keyTo,
+ final Instant
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new PlainToHeadersIteratorAdapter<>(store.fetch(keyFrom, keyTo,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes
keyFrom, final Bytes keyTo,
+ final long
timeFrom, final long timeTo) {
+ return new
PlainToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo, timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes
keyFrom, final Bytes keyTo,
+ final
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
PlainToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo, timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long
timeFrom, final long timeTo) {
+ return new PlainToHeadersIteratorAdapter<>(store.fetchAll(timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new PlainToHeadersIteratorAdapter<>(store.fetchAll(timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final
long timeFrom, final long timeTo) {
+ return new
PlainToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
PlainToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+ return new PlainToHeadersIteratorAdapter<>(store.all());
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+ return new PlainToHeadersIteratorAdapter<>(store.backwardAll());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+
+ // Handle WindowKeyQuery: wrap iterator to convert from plain to
headers format
+ if (query instanceof WindowKeyQuery) {
+ final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
(WindowKeyQuery<Bytes, byte[]>) query;
+ final QueryResult<WindowStoreIterator<byte[]>> rawResult =
store.query(windowKeyQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final WindowStoreIterator<byte[]> wrappedIterator =
+ new
PlainToHeadersWindowStoreIteratorAdapter(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
wrappedIterator);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else if (query instanceof WindowRangeQuery) {
+ // Handle WindowRangeQuery: wrap iterator to convert values
+ final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
(WindowRangeQuery<Bytes, byte[]>) query;
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>
rawResult =
+ store.query(windowRangeQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Windowed<Bytes>, byte[]>
wrappedIterator =
+ new PlainToHeadersIteratorAdapter<>(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
wrappedIterator);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+ // For other query types, delegate to the underlying store
+ result = store.query(query, positionBound, config);
+ }
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (System.nanoTime() -
start) + "ns"
+ );
+ }
+
+ return result;
+ }
+
+ @Override
+ public Position getPosition() {
+ return store.getPosition();
+ }
+
+ @Override
+ public String name() {
+ return store.name();
+ }
+
+ @Override
+ public void init(final StateStoreContext context,
+ final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ store.commit(changelogOffsets);
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return store.isOpen();
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreIteratorAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreIteratorAdapter.java
new file mode 100644
index 00000000000..7227d9fa78a
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreIteratorAdapter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * Iterator adapter for {@link WindowStoreIterator} that converts plain values
+ * to timestamp-with-headers format by adding empty headers and timestamp=-1.
+ * <p>
+ * This extends {@link PlainToHeadersIteratorAdapter} to also implement the
+ * {@link WindowStoreIterator} interface marker.
+ */
+class PlainToHeadersWindowStoreIteratorAdapter
+ extends PlainToHeadersIteratorAdapter<Long>
+ implements WindowStoreIterator<byte[]> {
+
+ PlainToHeadersWindowStoreIteratorAdapter(final WindowStoreIterator<byte[]>
innerIterator) {
+ super(innerIterator);
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index c5957ca6748..3714643d43b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -68,7 +68,11 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
if (!(store instanceof HeadersBytesStore)) {
if (store.persistent()) {
- store = new TimestampedToHeadersWindowStoreAdapter(store);
+ if (store instanceof TimestampedBytesStore) {
+ store = new TimestampedToHeadersWindowStoreAdapter(store);
+ } else {
+ store = new PlainToHeadersWindowStoreAdapter(store);
+ }
} else {
store = new
InMemoryTimestampedWindowStoreWithHeadersMarker(store);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapterTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapterTest.java
new file mode 100644
index 00000000000..6c3400c344e
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapterTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PlainToHeadersWindowStoreAdapterTest {
+
+ private static final long WINDOW_SIZE = 10_000L;
+ private static final long RETENTION_PERIOD = 60_000L;
+ private static final long SEGMENT_INTERVAL = 30_000L;
+
+ private PlainToHeadersWindowStoreAdapter adapter;
+ private RocksDBWindowStore underlyingStore;
+ private InternalMockProcessorContext<String, String> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
+ final SegmentedBytesStore segmentedBytesStore = new
RocksDBSegmentedBytesStore(
+ "iqv2-test-store",
+ "test-metrics-scope",
+ RETENTION_PERIOD,
+ SEGMENT_INTERVAL,
+ new WindowKeySchema()
+ );
+
+ underlyingStore = new RocksDBWindowStore(
+ segmentedBytesStore,
+ false,
+ WINDOW_SIZE
+ );
+
+ adapter = new PlainToHeadersWindowStoreAdapter(underlyingStore);
+ adapter.init(context, adapter);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (adapter != null) {
+ adapter.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldHandleWindowKeyQuerySuccessfully() {
+ final TimestampedWindowStoreWithHeaders<String, String> store =
Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentWindowStore(
+ "typed-adapter-test",
+ ofMillis(RETENTION_PERIOD),
+ ofMillis(WINDOW_SIZE),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .withLoggingDisabled()
+ .build();
+
+ store.init(context, store);
+
+ try {
+ final Headers headers1 = new RecordHeaders();
+ headers1.add("key", "header1".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("value1", 1000L,
headers1), 1000L);
+
+ final Headers headers2 = new RecordHeaders();
+ headers2.add("key", "header2".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("value2", 5000L,
headers2), 5000L);
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(PlainToHeadersWindowStoreAdapter.class, wrapped,
+ "Expected PlainToHeadersWindowStoreAdapter for plain window
store");
+
+ // Query at typed level - For plain stores, WindowKeyQuery returns
plain V (String), not ValueTimestampHeaders
+ final WindowKeyQuery<String, ValueTimestampHeaders<String>> query
= WindowKeyQuery.withKeyAndWindowStartRange(
+ "test-key",
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(10000L)
+ );
+ final QueryResult<WindowStoreIterator<String>> result =
+ (QueryResult<WindowStoreIterator<String>>) (QueryResult<?>)
store.query(query, PositionBound.unbounded(), new QueryConfig(false));
+
+ assertTrue(result.isSuccess(), "Expected query to succeed");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ assertNotNull(result.getResult(), "Expected non-null result");
+
+ final WindowStoreIterator<String> iterator = result.getResult();
+ assertTrue(iterator.hasNext(), "Expected at least one result");
+
+ KeyValue<Long, String> kv = iterator.next();
+ assertEquals(1000L, kv.key, "Expected first window timestamp");
+ assertEquals("value1", kv.value, "WindowKeyQuery should return the
plain value");
+
+ assertTrue(iterator.hasNext(), "Expected second result");
+ kv = iterator.next();
+ assertEquals(5000L, kv.key, "Expected second window timestamp");
+ assertEquals("value2", kv.value, "WindowKeyQuery should return the
plain value");
+
+ assertFalse(iterator.hasNext(), "Expected no more results");
+ iterator.close();
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldHandleWindowRangeQuerySuccessfully() {
+ final TimestampedWindowStoreWithHeaders<String, String> store =
Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentWindowStore(
+ "typed-range-adapter-test",
+ ofMillis(RETENTION_PERIOD),
+ ofMillis(WINDOW_SIZE),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .withLoggingDisabled()
+ .build();
+
+ store.init(context, store);
+
+ try {
+ final Headers headers1 = new RecordHeaders();
+ headers1.add("source", "key1".getBytes());
+ store.put("key1", ValueTimestampHeaders.make("value1", 1000L,
headers1), 1000L);
+
+ final Headers headers2 = new RecordHeaders();
+ headers2.add("source", "key2".getBytes());
+ store.put("key2", ValueTimestampHeaders.make("value2", 5000L,
headers2), 5000L);
+
+ final Headers headers3 = new RecordHeaders();
+ headers3.add("source", "key3".getBytes());
+ store.put("key3", ValueTimestampHeaders.make("value3", 3000L,
headers3), 3000L);
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(PlainToHeadersWindowStoreAdapter.class, wrapped,
+ "Expected PlainToHeadersWindowStoreAdapter for plain window
store");
+
+ // Query at typed level - For plain stores, WindowRangeQuery
returns plain V (String), not ValueTimestampHeaders
+ final WindowRangeQuery<String, ValueTimestampHeaders<String>>
query = WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(10000L)
+ );
+ final QueryResult<KeyValueIterator<Windowed<String>, String>>
result =
+ (QueryResult<KeyValueIterator<Windowed<String>, String>>)
(QueryResult<?>) store.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ assertTrue(result.isSuccess(), "Expected query to succeed");
+ assertNotNull(result.getResult(), "Expected result iterator to be
present");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
result.getResult();
+
+ assertTrue(iterator.hasNext(), "Expected at least one result");
+ KeyValue<Windowed<String>, String> kv = iterator.next();
+ assertEquals("key1", kv.key.key(), "Expected first key");
+ assertEquals(1000L, kv.key.window().start(), "Expected first
window start");
+ assertEquals("value1", kv.value, "WindowRangeQuery should return
the plain value");
+
+ assertTrue(iterator.hasNext(), "Expected second result");
+ kv = iterator.next();
+ assertEquals("key2", kv.key.key(), "Expected second key");
+ assertEquals(5000L, kv.key.window().start(), "Expected second
window start");
+ assertEquals("value2", kv.value, "WindowRangeQuery should return
the plain value");
+
+ assertTrue(iterator.hasNext(), "Expected third result");
+ kv = iterator.next();
+ assertEquals("key3", kv.key.key(), "Expected third key");
+ assertEquals(3000L, kv.key.window().start(), "Expected third
window start");
+ assertEquals("value3", kv.value, "WindowRangeQuery should return
the plain value");
+
+ assertFalse(iterator.hasNext(), "Expected no more results");
+ iterator.close();
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForWindowKeyQueryWhenRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable execution
info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
adapter.query(query, positionBound, config);
+
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ boolean foundAdapterInfo = false;
+ for (final String info : result.getExecutionInfo()) {
+ if (info.contains("Handled in") &&
info.contains(PlainToHeadersWindowStoreAdapter.class.getName())) {
+ foundAdapterInfo = true;
+ break;
+ }
+ }
+ assertTrue(foundAdapterInfo, "Expected execution info to mention the
adapter class");
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForWindowRangeQueryWhenRequested() {
+ final WindowRangeQuery<Bytes, byte[]> query =
WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable execution
info
+
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ adapter.query(query, positionBound, config);
+
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ boolean foundAdapterInfo = false;
+ for (final String info : result.getExecutionInfo()) {
+ if (info.contains("Handled in") &&
info.contains(PlainToHeadersWindowStoreAdapter.class.getName())) {
+ foundAdapterInfo = true;
+ break;
+ }
+ }
+ assertTrue(foundAdapterInfo, "Expected execution info to mention the
adapter class");
+ }
+
+ @Test
+ public void shouldNotCollectExecutionInfoWhenNotRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false); // Disable
execution info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
adapter.query(query, positionBound, config);
+
+ boolean foundAdapterInfo = false;
+ for (final String info : result.getExecutionInfo()) {
+ if
(info.contains(PlainToHeadersWindowStoreAdapter.class.getName())) {
+ foundAdapterInfo = true;
+ break;
+ }
+ }
+ assertFalse(foundAdapterInfo, "Expected no execution info from adapter
when not requested");
+ }
+}