This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7ed35ab26b7 KAFKA-20134: Implement TimestampedWindowStoreWithHeaders
(5/N) (#21497)
7ed35ab26b7 is described below
commit 7ed35ab26b773f29f4c45210bf088f6065e1bfeb
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Mar 3 07:39:44 2026 +0000
KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (5/N) (#21497)
This PR adds required classes or modifies the existing ones to build the
`TimestampedWindowStoreWithHeaders` introduced in KIP-1271.
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../TimestampedWindowStoreWithHeadersTest.java | 551 +++++++++++++++++++++
.../internals/AbstractReadWriteDecorator.java | 12 +
.../org/apache/kafka/streams/state/Stores.java | 50 ++
.../MeteredTimestampedWindowStoreWithHeaders.java | 2 +-
.../TimestampedToHeadersIteratorAdapter.java | 3 +
.../TimestampedToHeadersWindowStoreAdapter.java | 234 +++++++++
.../TimestampedWindowStoreWithHeadersBuilder.java | 229 +++++++++
...mestampedWindowStoreWithHeadersBuilderTest.java | 205 ++++++++
.../apache/kafka/streams/TopologyTestDriver.java | 34 ++
9 files changed, 1319 insertions(+), 1 deletion(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
new file mode 100644
index 00000000000..1ae592e8901
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class TimestampedWindowStoreWithHeadersTest {
+
+ private static final String STORE_NAME = "headers-window-store";
+ private static final long WINDOW_SIZE_MS = 100L;
+ private static final long RETENTION_MS = 1000L;
+
+ private String inputStream;
+ private String outputStream;
+ private long baseTimestamp;
+
+ private KafkaStreams kafkaStreams;
+
+ private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+
+ private static final Headers HEADERS1 = new RecordHeaders()
+ .add("source", "test".getBytes())
+ .add("version", "1.0".getBytes());
+
+ private static final Headers HEADERS2 = new RecordHeaders()
+ .add("source", "test".getBytes())
+ .add("version", "2.0".getBytes());
+
+ private static final Headers EMPTY_HEADERS = new RecordHeaders();
+
+ public TestInfo testInfo;
+
+ @BeforeAll
+ public static void before() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterAll
+ public static void after() {
+ CLUSTER.stop();
+ }
+
+ @BeforeEach
+ public void beforeTest(final TestInfo testInfo) throws
InterruptedException {
+ this.testInfo = testInfo;
+ final String uniqueTestName = safeUniqueTestName(testInfo);
+ inputStream = "input-stream-" + uniqueTestName;
+ outputStream = "output-stream-" + uniqueTestName;
+ CLUSTER.createTopic(inputStream);
+ CLUSTER.createTopic(outputStream);
+
+ baseTimestamp = CLUSTER.time.milliseconds();
+ }
+
+ @AfterEach
+ public void afterTest() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ kafkaStreams.cleanUp();
+ }
+ }
+
+ @Test
+ public void shouldPutFetchAndDelete() throws Exception {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce source data with headers
+ int numRecordsProduced = 0;
+
+ // Window 1: [baseTimestamp, baseTimestamp + WINDOW_SIZE_MS)
+ numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp, HEADERS1,
+ KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3,
null));
+
+ // Window 1: updates in same window
+ numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + 50, HEADERS2,
+ KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3,
"c50"));
+
+ // Window 2: [baseTimestamp + WINDOW_SIZE_MS, baseTimestamp + 2 *
WINDOW_SIZE_MS)
+ numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + WINDOW_SIZE_MS,
+ EMPTY_HEADERS,
+ KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"),
KeyValue.pair(3, null));
+
+ final List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ numRecordsProduced);
+
+ receivedRecords.forEach(receivedRecord -> assertEquals(0,
receivedRecord.value));
+ }
+
+ @Test
+ public void shouldSetChangelogTopicProperties() throws Exception {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ produceDataToTopicWithHeaders(inputStream, baseTimestamp, new
RecordHeaders(), KeyValue.pair(0, "foo"));
+
+ IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ 1);
+
+ // verify changelog topic properties
+ final String changelogTopic =
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME +
"-changelog";
+ final Properties changelogTopicConfig =
CLUSTER.getLogConfig(changelogTopic);
+ assertEquals("compact",
changelogTopicConfig.getProperty("cleanup.policy"));
+ }
+
+ @Test
+ public void shouldRestore() throws Exception {
+ StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ int initialRecordsProduced = 0;
+
+ initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp, HEADERS1,
+ KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3,
null));
+
+ initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + 50, HEADERS2,
+ KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3,
"c50"));
+
+ initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + WINDOW_SIZE_MS, EMPTY_HEADERS,
+ KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"),
KeyValue.pair(3, "c100"));
+
+ IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced);
+
+ // wipe out state store to trigger restore process on restart
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+
+ // restart app - use processor WITHOUT validation of initial data,
just write to store
+ streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce additional records to verify restored store works correctly
+ final Headers finalHeaders = new RecordHeaders().add("final",
"true".getBytes());
+ final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS,
finalHeaders,
+ KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"),
KeyValue.pair(3, "c200"));
+
+ final List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced + additionalRecordsProduced);
+
+ receivedRecords.forEach(receivedRecord -> assertEquals(0,
receivedRecord.value));
+ }
+
+ @Test
+ public void shouldManualUpgradeFromTimestampedToHeaders() throws Exception
{
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(TimestampedWindowStoreContentCheckerProcessor::new,
STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build());
+ }
+
+ private void shouldManualUpgradeFromTimestampedToHeaders(final Topology
originalTopology) throws Exception {
+ // build original timestamped (legacy) topology and start app
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(originalTopology, props);
+ kafkaStreams.start();
+
+ // produce source data to legacy timestamped store (without headers)
+ int initialRecordsProduced = 0;
+ initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp,
+ KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3,
null));
+ initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp + 50,
+ KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3,
"c50"));
+ initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp + WINDOW_SIZE_MS,
+ KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"),
KeyValue.pair(3, null));
+
+ List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced);
+
+ receivedRecords.forEach(receivedRecord -> assertEquals(0,
receivedRecord.value));
+
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+
+ // restart app with headers-aware store to test upgrade path
+ // The store should migrate legacy timestamped data (without headers)
+ // and add empty headers to existing data
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce additional records with headers to verify upgraded store
works
+ final Headers upgradedHeaders = new RecordHeaders().add("upgraded",
"true".getBytes());
+ final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS,
upgradedHeaders,
+ KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"),
KeyValue.pair(3, "c200"));
+
+ receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced + additionalRecordsProduced);
+
+ receivedRecords.forEach(receivedRecord -> assertEquals(0,
receivedRecord.value));
+ }
+
+ private Properties props() {
+ final String safeTestName = safeUniqueTestName(testInfo);
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ return streamsConfiguration;
+ }
+
+ /**
+ * @return number of records produced
+ */
+ @SuppressWarnings("varargs")
+ @SafeVarargs
+ private final int produceDataToTopic(final String topic,
+ final long timestamp,
+ final KeyValue<Integer, String>...
keyValues) {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ topic,
+ Arrays.asList(keyValues),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class),
+ timestamp);
+ return keyValues.length;
+ }
+
+ /**
+ * Produce records with headers.
+ *
+ * @return number of records produced
+ */
+ @SuppressWarnings("varargs")
+ @SafeVarargs
+ private int produceDataToTopicWithHeaders(final String topic,
+ final long timestamp,
+ final Headers headers,
+ final KeyValue<Integer,
String>... keyValues) {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ topic,
+ Arrays.asList(keyValues),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+ return keyValues.length;
+ }
+
+ /**
+ * Processor for validating expected contents of a timestamped window
store with headers, and forwards
+ * the number of failed checks downstream for consumption.
+ */
+ private static class
TimestampedWindowStoreWithHeadersContentCheckerProcessor implements
Processor<Integer, String, Integer, Integer> {
+
+ private ProcessorContext<Integer, Integer> context;
+ private TimestampedWindowStoreWithHeaders<Integer, String> store;
+
+ // whether the processor should write records to the store as they
arrive.
+ private final boolean writeToStore;
+ // in-memory copy of seen data, to validate for testing purposes.
+ // Maps key -> windowStartTime -> ValueTimestampHeaders
+ private final Map<Integer, Map<Long,
Optional<ValueTimestampHeaders<String>>>> data;
+
+ TimestampedWindowStoreWithHeadersContentCheckerProcessor(final boolean
writeToStore) {
+ this.writeToStore = writeToStore;
+ this.data = new HashMap<>();
+ }
+
+ @Override
+ public void init(final ProcessorContext<Integer, Integer> context) {
+ this.context = context;
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<Integer, String> record) {
+ final long windowStartTime = record.timestamp() -
(record.timestamp() % WINDOW_SIZE_MS);
+
+ if (writeToStore) {
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
+ ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers());
+ store.put(record.key(), valueTimestampHeaders,
windowStartTime);
+
+ data.computeIfAbsent(record.key(), k -> new HashMap<>());
+ data.get(record.key()).put(windowStartTime,
Optional.ofNullable(valueTimestampHeaders));
+ }
+
+ //
+ final int failedChecks = checkStoreContents();
+ context.forward(record.withValue(failedChecks));
+ }
+
+ /**
+ * Check expected contents of store, and signal completion by writing
number of failures to downstream
+ * @return number of failed checks
+ */
+ private int checkStoreContents() {
+ int failedChecks = 0;
+ for (final Map.Entry<Integer, Map<Long,
Optional<ValueTimestampHeaders<String>>>> keyEntry : data.entrySet()) {
+ final Integer key = keyEntry.getKey();
+
+ for (final Map.Entry<Long,
Optional<ValueTimestampHeaders<String>>> windowEntry :
keyEntry.getValue().entrySet()) {
+ final Long windowStartTime = windowEntry.getKey();
+ final ValueTimestampHeaders<String>
expectedValueTimestampHeaders =
+ windowEntry.getValue().orElse(null);
+
+ // validate fetch from store
+ try (final
WindowStoreIterator<ValueTimestampHeaders<String>> iterator =
+ store.fetch(key, windowStartTime,
windowStartTime)) {
+ final ValueTimestampHeaders<String>
actualValueTimestampHeaders =
+ iterator.hasNext() ? iterator.next().value : null;
+ if (!Objects.equals(actualValueTimestampHeaders,
expectedValueTimestampHeaders)) {
+ failedChecks++;
+ }
+ }
+ }
+ }
+ return failedChecks;
+ }
+ }
+
+ /**
+ * Processor for validating expected contents of a timestamped window
store (without headers).
+ * Used for testing the upgrade path from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ */
+ private static class TimestampedWindowStoreContentCheckerProcessor
implements Processor<Integer, String, Integer, Integer> {
+
+ private ProcessorContext<Integer, Integer> context;
+ private TimestampedWindowStore<Integer, String> store;
+
+ // in-memory copy of seen data, to validate for testing purposes.
+ // Maps key -> windowStartTime -> ValueAndTimestamp
+ private final Map<Integer, Map<Long,
Optional<ValueAndTimestamp<String>>>> data;
+
+ TimestampedWindowStoreContentCheckerProcessor() {
+ this.data = new HashMap<>();
+ }
+
+ @Override
+ public void init(final ProcessorContext<Integer, Integer> context) {
+ this.context = context;
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<Integer, String> record) {
+ final long windowStartTime = record.timestamp() -
(record.timestamp() % WINDOW_SIZE_MS);
+
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(record.value(), record.timestamp());
+ store.put(record.key(), valueAndTimestamp, windowStartTime);
+
+ data.computeIfAbsent(record.key(), k -> new HashMap<>());
+ data.get(record.key()).put(windowStartTime,
Optional.ofNullable(valueAndTimestamp));
+
+ final int failedChecks = checkStoreContents();
+ context.forward(record.withValue(failedChecks));
+ }
+
+ /**
+ * Check expected contents of store, and signal completion by writing
+ * @return number of failed checks
+ */
+ private int checkStoreContents() {
+ int failedChecks = 0;
+ for (final Map.Entry<Integer, Map<Long,
Optional<ValueAndTimestamp<String>>>> keyEntry : data.entrySet()) {
+ final Integer key = keyEntry.getKey();
+
+ for (final Map.Entry<Long,
Optional<ValueAndTimestamp<String>>> windowEntry :
keyEntry.getValue().entrySet()) {
+ final Long windowStartTime = windowEntry.getKey();
+ final ValueAndTimestamp<String> expectedValueAndTimestamp
= windowEntry.getValue().orElse(null);
+
+ // validate fetch from store
+ try (final WindowStoreIterator<ValueAndTimestamp<String>>
iterator =
+ store.fetch(key, windowStartTime,
windowStartTime)) {
+ final ValueAndTimestamp<String>
actualValueAndTimestamp =
+ iterator.hasNext() ? iterator.next().value : null;
+ if (!Objects.equals(actualValueAndTimestamp,
expectedValueAndTimestamp)) {
+ failedChecks++;
+ }
+ }
+ }
+ }
+ return failedChecks;
+ }
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 8748843f034..3b3d57298fe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
@@ -71,6 +72,8 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
return new
VersionedKeyValueStoreReadWriteDecorator<>((VersionedKeyValueStore<?, ?>)
store);
} else if (store instanceof KeyValueStore) {
return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>)
store);
+ } else if (store instanceof TimestampedWindowStoreWithHeaders) {
+ return new
TimestampedWindowStoreWithHeadersReadWriteDecorator<>((TimestampedWindowStoreWithHeaders<?,
?>) store);
} else if (store instanceof TimestampedWindowStore) {
return new
TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>)
store);
} else if (store instanceof WindowStore) {
@@ -272,6 +275,15 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
}
}
+ static class TimestampedWindowStoreWithHeadersReadWriteDecorator<K, V>
+ extends WindowStoreReadWriteDecorator<K, ValueTimestampHeaders<V>>
+ implements TimestampedWindowStoreWithHeaders<K, V> {
+
+ TimestampedWindowStoreWithHeadersReadWriteDecorator(final
TimestampedWindowStoreWithHeaders<K, V> inner) {
+ super(inner);
+ }
+ }
+
static class SessionStoreReadWriteDecorator<K, AGG>
extends AbstractReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
implements SessionStore<K, AGG> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index b1e8234f3c4..1ad4adc6e91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -32,6 +32,7 @@ import
org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
import
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
+import
org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder;
import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -332,6 +333,37 @@ public final class Stores {
return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates, true);
}
+ /**
+ * Creates a persistent {@link WindowBytesStoreSupplier} that preserves
timestamps and headers.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store
(cannot be negative)
+ * @param windowSize size of the windows (cannot be negative)
+ * @param retainDuplicates whether or not to retain duplicates
+ * @return an instance of {@link WindowBytesStoreSupplier}
+ * @throws IllegalArgumentException if {@code retentionPeriod} is smaller
than {@code windowSize}
+ */
+ public static WindowBytesStoreSupplier
persistentTimestampedWindowStoreWithHeaders(final String name,
+
final Duration retentionPeriod,
+
final Duration windowSize,
+
final boolean retainDuplicates) throws IllegalArgumentException {
+ Objects.requireNonNull(name, "name cannot be null");
+ final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+ final long retentionMs = validateMillisecondDuration(retentionPeriod,
rpMsgPrefix);
+ final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize,
"windowSize");
+ final long windowSizeMs = validateMillisecondDuration(windowSize,
wsMsgPrefix);
+
+ final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+ return new RocksDbWindowBytesStoreSupplier(
+ name,
+ retentionMs,
+ defaultSegmentInterval,
+ windowSizeMs,
+ retainDuplicates,
+
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
+ }
+
private static WindowBytesStoreSupplier persistentWindowStore(final String
name,
final
Duration retentionPeriod,
final
Duration windowSize,
@@ -598,6 +630,24 @@ public final class Stores {
return new TimestampedWindowStoreBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link
TimestampedWindowStoreWithHeaders}.
+ *
+ * @param supplier a {@link WindowBytesStoreSupplier} (cannot be
{@code null})
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of {@link StoreBuilder} that can build a {@link
TimestampedWindowStoreWithHeaders}
+ */
+ public static <K, V> StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>>
timestampedWindowStoreWithHeadersBuilder(
+ final WindowBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
+ return new TimestampedWindowStoreWithHeadersBuilder<>(supplier,
keySerde, valueSerde, Time.SYSTEM);
+ }
+
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link
SessionStore}.
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index c8c6d187124..e982365da17 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -66,7 +66,7 @@ class MeteredTimestampedWindowStoreWithHeaders<K, V>
@Override
public void put(final K key, final ValueTimestampHeaders<V> value, final
long windowStartTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
- final Headers headers = value.headers() == null ? new RecordHeaders()
: value.headers();
+ final Headers headers = value == null || value.headers() == null ? new
RecordHeaders() : value.headers();
try {
maybeMeasureLatency(
() -> wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers), windowStartTimestamp),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
index 6a0a23e1c36..569fe10fe25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
@@ -54,6 +54,9 @@ class TimestampedToHeadersIteratorAdapter<K> implements
KeyValueIterator<K, byte
@Override
public KeyValue<K, byte[]> next() {
final KeyValue<K, byte[]> timestampedKeyValue = innerIterator.next();
+ if (timestampedKeyValue == null) {
+ return null;
+ }
return KeyValue.pair(timestampedKeyValue.key,
convertToHeaderFormat(timestampedKeyValue.value));
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
new file mode 100644
index 00000000000..f53cbc68285
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Map;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * Adapter for backward compatibility between {@link
TimestampedWindowStoreWithHeaders}
+ * and {@link TimestampedWindowStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedWindowStore} (without
headers) when building
+ * a {@code TimestampedWindowStoreWithHeaders}, this adapter translates
between the timestamped
+ * {@code byte[]} format and the timestamped-with-headers {@code byte[]}
format.
+ * <p>
+ * Format conversion:
+ * <ul>
+ * <li>Write: {@code [headers][timestamp][value]} → {@code
[timestamp][value]} (strip headers)</li>
+ * <li>Read: {@code [timestamp][value]} → {@code
[headers][timestamp][value]} (add empty headers)</li>
+ * </ul>
+ */
+public class TimestampedToHeadersWindowStoreAdapter implements
WindowStore<Bytes, byte[]> {
+ private final WindowStore<Bytes, byte[]> store;
+
+ public TimestampedToHeadersWindowStoreAdapter(final WindowStore<Bytes,
byte[]> store) {
+ if (!store.persistent()) {
+ throw new IllegalArgumentException("Provided store must be a
persistent store, but it is not.");
+ }
+ if (!(store instanceof TimestampedBytesStore)) {
+ throw new IllegalArgumentException("Provided store must be a
timestamped store, but it is not.");
+ }
+ this.store = store;
+ }
+
+ /**
+ * Extract raw timestamped value (timestamp + value) from serialized
ValueTimestampHeaders.
+ * This strips the headers portion but keeps timestamp and value intact.
+ *
+ * Format conversion:
+ * Input: [headersSize(varint)][headers][timestamp(8)][value]
+ * Output: [timestamp(8)][value]
+ */
+ // TODO: should be extract to util class, tracked by KAFKA-20205
+ static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ // Skip headers, keep timestamp + value
+ buffer.position(buffer.position() + headersSize);
+
+ final byte[] result = new byte[buffer.remaining()];
+ buffer.get(result);
+ return result;
+ }
+
+ @Override
+ public void put(final Bytes key, final byte[]
valueWithTimestampAndHeaders, final long windowStartTimestamp) {
+ store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders),
windowStartTimestamp);
+ }
+
+ @Override
+ public byte[] fetch(final Bytes key, final long timestamp) {
+ return convertToHeaderFormat(store.fetch(key, timestamp));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> fetch(final Bytes key, final long
timeFrom, final long timeTo) {
+ return new
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.fetch(key, timeFrom,
timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.fetch(key, timeFrom,
timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final
long timeFrom, final long timeTo) {
+ return new
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key,
timeFrom, timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
TimestampedWindowToHeadersWindowStoreIteratorAdapter(store.backwardFetch(key,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
keyFrom, final Bytes keyTo,
+ final long
timeFrom, final long timeTo) {
+ return new TimestampedToHeadersIteratorAdapter<>(store.fetch(keyFrom,
keyTo, timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
keyFrom, final Bytes keyTo,
+ final Instant
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new TimestampedToHeadersIteratorAdapter<>(store.fetch(keyFrom,
keyTo, timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes
keyFrom, final Bytes keyTo,
+ final long
timeFrom, final long timeTo) {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes
keyFrom, final Bytes keyTo,
+ final
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.backwardFetch(keyFrom, keyTo,
timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long
timeFrom, final long timeTo) {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant
timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final
long timeFrom, final long timeTo) {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final
Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+ return new TimestampedToHeadersIteratorAdapter<>(store.all());
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+ return new TimestampedToHeadersIteratorAdapter<>(store.backwardAll());
+ }
+
+ @Override
+ public String name() {
+ return store.name();
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ store.init(context, root);
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ store.commit(changelogOffsets);
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return store.isOpen();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped window stores with headers yet.");
+ }
+
+ @Override
+ public Position getPosition() {
+ return store.getPosition();
+ }
+
+ /**
+ * Iterator adapter for WindowStoreIterator that converts timestamp-only
values
+ * to timestamp-with-headers format by adding empty headers.
+ */
+ private static class TimestampedWindowToHeadersWindowStoreIteratorAdapter
+ extends TimestampedToHeadersIteratorAdapter<Long>
+ implements WindowStoreIterator<byte[]> {
+
+ TimestampedWindowToHeadersWindowStoreIteratorAdapter(final
KeyValueIterator<Long, byte[]> innerIterator) {
+ super(innerIterator);
+ }
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
new file mode 100644
index 00000000000..9fc363bb40e
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * Store builder for {@link TimestampedWindowStoreWithHeaders}.
+ * <p>
+ * This builder creates header-aware timestamped window stores that preserve
record headers
+ * alongside values and timestamps. It wraps the underlying bytes store with
the necessary
+ * layers (logging, caching, metering) to provide a fully-functional store.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class TimestampedWindowStoreWithHeadersBuilder<K, V>
+ extends AbstractStoreBuilder<K, ValueTimestampHeaders<V>,
TimestampedWindowStoreWithHeaders<K, V>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TimestampedWindowStoreWithHeadersBuilder.class);
+
+ private final WindowBytesStoreSupplier storeSupplier;
+
+ public TimestampedWindowStoreWithHeadersBuilder(final
WindowBytesStoreSupplier storeSupplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Time time) {
+ super(storeSupplier.name(), keySerde, valueSerde == null ? null : new
ValueTimestampHeadersSerde<>(valueSerde), time);
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's
metricsScope can't be null");
+ this.storeSupplier = storeSupplier;
+ }
+
+ @Override
+ public TimestampedWindowStoreWithHeaders<K, V> build() {
+ WindowStore<Bytes, byte[]> store = storeSupplier.get();
+
+ if (!(store instanceof HeadersBytesStore)) {
+ if (store.persistent()) {
+ store = new TimestampedToHeadersWindowStoreAdapter(store);
+ } else {
+ store = new
InMemoryTimestampedWindowStoreWithHeadersMarker(store);
+ }
+ }
+
+ if (storeSupplier.retainDuplicates() && enableCaching) {
+ LOG.warn("Disabling caching for {} since store was configured to
retain duplicates", storeSupplier.name());
+ enableCaching = false;
+ }
+
+ return new MeteredTimestampedWindowStoreWithHeaders<>(
+ maybeWrapCaching(maybeWrapLogging(store)),
+ storeSupplier.windowSize(),
+ storeSupplier.metricsScope(),
+ time,
+ keySerde,
+ valueSerde);
+ }
+
+ private WindowStore<Bytes, byte[]> maybeWrapCaching(final
WindowStore<Bytes, byte[]> inner) {
+ if (!enableCaching) {
+ return inner;
+ }
+
+ final boolean isTimeOrdered = isTimeOrderedStore(inner);
+ if (isTimeOrdered) {
+ return new TimeOrderedCachingWindowStore(
+ inner,
+ storeSupplier.windowSize(),
+ storeSupplier.segmentIntervalMs());
+ }
+
+ return new CachingWindowStore(
+ inner,
+ storeSupplier.windowSize(),
+ storeSupplier.segmentIntervalMs());
+ }
+
+ private boolean isTimeOrderedStore(final StateStore stateStore) {
+ if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
+ return true;
+ }
+ if (stateStore instanceof WrappedStateStore) {
+ return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>)
stateStore).wrapped());
+ }
+ return false;
+ }
+
+ private WindowStore<Bytes, byte[]> maybeWrapLogging(final
WindowStore<Bytes, byte[]> inner) {
+ if (!enableLogging) {
+ return inner;
+ }
+ return new ChangeLoggingTimestampedWindowBytesStoreWithHeaders(inner,
storeSupplier.retainDuplicates());
+ }
+
+ public long retentionPeriod() {
+ return storeSupplier.retentionPeriod();
+ }
+
+ /**
+ * Marker wrapper for in-memory window stores that support both timestamps
and headers.
+ * <p>
+ * This wrapper indicates that the underlying store understands the
value-with-headers format.
+ * The actual in-memory store doesn't need to change since it operates on
raw bytes.
+ */
+ private static final class InMemoryTimestampedWindowStoreWithHeadersMarker
+ extends WrappedStateStore<WindowStore<Bytes, byte[]>, Bytes, byte[]>
+ implements WindowStore<Bytes, byte[]>, TimestampedBytesStore,
HeadersBytesStore {
+
+ private InMemoryTimestampedWindowStoreWithHeadersMarker(final
WindowStore<Bytes, byte[]> wrapped) {
+ super(wrapped);
+ if (wrapped.persistent()) {
+ throw new IllegalArgumentException("Provided store must not be
a persistent store, but it is.");
+ }
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] value,
+ final long windowStartTimestamp) {
+ wrapped().put(key, value, windowStartTimestamp);
+ }
+
+ @Override
+ public byte[] fetch(final Bytes key,
+ final long time) {
+ return wrapped().fetch(key, time);
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> fetch(final Bytes key,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped().fetch(key, timeFrom, timeTo);
+ }
+
+ @Override
+ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped().backwardFetch(key, timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes
keyFrom,
+ final Bytes
keyTo,
+ final long
timeFrom,
+ final long
timeTo) {
+ return wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final
Bytes keyFrom,
+ final
Bytes keyTo,
+ final
long timeFrom,
+ final
long timeTo) {
+ return wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long
timeFrom,
+ final long
timeTo) {
+ return wrapped().fetchAll(timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFetchAll(final long timeFrom,
+
final long timeTo) {
+ return wrapped().backwardFetchAll(timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+ return wrapped().all();
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+ return wrapped().backwardAll();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped window stores with headers yet.");
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
new file mode 100644
index 00000000000..304a84c6976
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class TimestampedWindowStoreWithHeadersBuilderTest {
+ private static final String STORE_NAME = "name";
+ private static final String METRICS_SCOPE = "metricsScope";
+
+ @Mock
+ private WindowBytesStoreSupplier supplier;
+ @Mock
+ private RocksDBTimestampedWindowStoreWithHeaders
timestampedStoreWithHeaders;
+
+ private TimestampedWindowStoreWithHeadersBuilder<String, String> builder;
+
+ public void setUp() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+ }
+
+ @Test
+ public void shouldHaveMeteredStoreAsOuterStore() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.build();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreByDefault() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.build();
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
next);
+ }
+
+ @Test
+ public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.withLoggingDisabled().build();
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+ assertSame(timestampedStoreWithHeaders, next);
+ }
+
+ @Test
+ public void shouldHaveCachingStoreWhenEnabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.withCachingEnabled().build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ assertInstanceOf(CachingWindowStore.class, wrapped);
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store = builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
wrapped);
+ assertSame(timestampedStoreWithHeaders, ((WrappedStateStore)
wrapped).wrapped());
+ }
+
+ @Test
+ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store = builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .withCachingEnabled()
+ .build();
+ final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore changeLogging = (WrappedStateStore)
caching.wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ assertInstanceOf(CachingWindowStore.class, caching);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
changeLogging);
+ assertSame(timestampedStoreWithHeaders, changeLogging.wrapped());
+ }
+
+ @Test
+ public void shouldNotWrapHeadersByteStore() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedWindowStoreWithHeaders(
+ new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+ "name",
+ "metric-scope",
+ 10L,
+ 5L,
+ new WindowKeySchema()),
+ false,
+ 1L));
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+
+ final TimestampedWindowStoreWithHeaders<String, String> store = builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertInstanceOf(RocksDBTimestampedWindowStoreWithHeaders.class,
((WrappedStateStore) store).wrapped());
+ }
+
+ @Test
+ public void shouldWrapTimestampedStoreAsHeadersStore() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.get()).thenReturn(new RocksDBTimestampedWindowStore(
+ new RocksDBTimestampedSegmentedBytesStore(
+ "name",
+ "metric-scope",
+ 10L,
+ 5L,
+ new WindowKeySchema()),
+ false,
+ 1L));
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+
+ final TimestampedWindowStoreWithHeaders<String, String> store = builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertInstanceOf(TimestampedToHeadersWindowStoreAdapter.class,
((WrappedStateStore) store).wrapped());
+ }
+
+ @Test
+ public void shouldDisableCachingWithRetainDuplicates() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.retainDuplicates()).thenReturn(true);
+ when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+
+ final TimestampedWindowStoreWithHeaders<String, String> store = builder
+ .withCachingEnabled()
+ .withLoggingDisabled()
+ .build();
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ // Caching should be automatically disabled when retainDuplicates is
true
+ assertSame(timestampedStoreWithHeaders, wrapped);
+ }
+
+ @Test
+ public void shouldThrowNullPointerIfInnerIsNull() {
+ assertThrows(NullPointerException.class, () -> new
TimestampedWindowStoreWithHeadersBuilder<>(null, Serdes.String(),
Serdes.String(), new MockTime()));
+ }
+}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 6ebf325bda0..b15f1f2ab27 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -84,6 +84,7 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
@@ -879,6 +880,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
*/
public Map<String, StateStore> getAllStateStores() {
@@ -911,6 +913,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
*/
public StateStore getStateStore(final String name) throws
IllegalArgumentException {
@@ -992,6 +995,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings("unchecked")
@@ -1020,6 +1024,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings("unchecked")
@@ -1044,6 +1049,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings("unchecked")
@@ -1068,6 +1074,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings("unchecked")
@@ -1097,6 +1104,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings("unchecked")
@@ -1133,6 +1141,31 @@ public class TopologyTestDriver implements Closeable {
return store instanceof TimestampedWindowStore ?
(TimestampedWindowStore<K, V>) store : null;
}
+ /**
+ * Get the {@link TimestampedWindowStoreWithHeaders} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the
test case instructs the topology to
+ * {@link TestInputTopic#pipeInput(TestRecord) process an input message},
and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the window store, or {@code null} if no {@link
TimestampedWindowStoreWithHeaders} has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> WindowStore<K, ValueTimestampHeaders<V>>
getTimestampedWindowStoreWithHeaders(final String name) {
+ final StateStore store = getStateStore(name, false);
+ return store instanceof TimestampedWindowStoreWithHeaders ?
(TimestampedWindowStoreWithHeaders<K, V>) store : null;
+ }
+
/**
* Get the {@link SessionStore} with the given name.
* The store can be a "regular" or global store.
@@ -1150,6 +1183,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> SessionStore<K, V> getSessionStore(final String name) {