This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 148a2707265 KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders 
(5/N)  (#21455)
148a2707265 is described below

commit 148a2707265e618d2a9bc1009081cf570983672f
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 25 03:02:32 2026 +0100

    KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (5/N)  (#21455)
    
    This PR adds required classes or modifies the existing ones to build the
    `TimestampedKeyValueStoreWithHeaders` introduced in  KIP-1271.
    
    Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../TimestampedKeyValueStoreWithHeadersTest.java   | 557 +++++++++++++++++++++
 .../internals/AbstractReadWriteDecorator.java      |  15 +-
 .../org/apache/kafka/streams/state/Stores.java     |  49 +-
 .../state/internals/CachingKeyValueStore.java      |  18 +-
 .../state/internals/KeyValueStoreBuilder.java      |   2 +-
 .../state/internals/ListValueStoreBuilder.java     |   2 +-
 ...MeteredTimestampedKeyValueStoreWithHeaders.java |   4 +-
 .../RocksDBKeyValueBytesStoreSupplier.java         |  19 +-
 .../RocksDBTimestampedStoreWithHeaders.java        |  11 +
 .../internals/TimestampedKeyValueStoreBuilder.java |   2 +-
 ...imestampedKeyValueStoreBuilderWithHeaders.java} |  77 +--
 .../TimestampedToHeadersIteratorAdapter.java       |  59 +++
 .../TimestampedToHeadersStoreAdapter.java          | 200 ++++++++
 .../CachingInMemoryKeyValueStoreTest.java          |   6 +-
 .../RocksDBTimestampedStoreWithHeadersTest.java    |  17 +
 ...stampedKeyValueStoreBuilderWithHeadersTest.java | 422 ++++++++++++++++
 .../apache/kafka/streams/TopologyTestDriver.java   |  34 ++
 17 files changed, 1442 insertions(+), 52 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
new file mode 100644
index 00000000000..767615bd105
--- /dev/null
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class TimestampedKeyValueStoreWithHeadersTest {
+
+    private static final String STORE_NAME = "headers-store";
+
+    private String inputStream;
+    private String outputStream;
+    private long baseTimestamp;
+
+    private KafkaStreams kafkaStreams;
+
+    private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    private static final Headers HEADERS1 = new RecordHeaders()
+        .add("source", "test".getBytes())
+        .add("version", "1.0".getBytes());
+
+    private static final Headers HEADERS2 = new RecordHeaders()
+        .add("source", "test".getBytes())
+        .add("version", "2.0".getBytes());
+
+    private static final Headers EMPTY_HEADERS = new RecordHeaders();
+
+    public TestInfo testInfo;
+
+    @BeforeAll
+    public static void before() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterAll
+    public static void after() {
+        CLUSTER.stop();
+    }
+
+    @BeforeEach
+    public void beforeTest(final TestInfo testInfo) throws 
InterruptedException {
+        this.testInfo = testInfo;
+        final String uniqueTestName = safeUniqueTestName(testInfo);
+        inputStream = "input-stream-" + uniqueTestName;
+        outputStream = "output-stream-" + uniqueTestName;
+        CLUSTER.createTopic(inputStream);
+        CLUSTER.createTopic(outputStream);
+
+        baseTimestamp = CLUSTER.time.milliseconds();
+
+    }
+
+    @AfterEach
+    public void afterTest() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void shouldPutGetAndDelete() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data with headers
+        int numRecordsProduced = 0;
+
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp, HEADERS1,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 5, HEADERS2,
+            KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c5"));
+
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 2,
+            EMPTY_HEADERS,
+            KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, 
null));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            numRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertEquals(receivedRecord.value, 0);
+        }
+    }
+
+    @Test
+    public void shouldSetChangelogTopicProperties() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce record (and wait for result) to create changelog
+        produceDataToTopicWithHeaders(inputStream, baseTimestamp, new 
RecordHeaders(), KeyValue.pair(0, "foo"));
+
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            1);
+
+        // verify changelog topic properties
+        final String changelogTopic = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + 
"-changelog";
+        final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
+        assertEquals(changelogTopicConfig.getProperty("cleanup.policy"), 
"compact");
+    }
+
+    @Test
+    public void shouldRestore() throws Exception {
+        StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce source data with headers
+        final Map<Integer, Optional<ValueTimestampHeaders<String>>> 
expectedData = new HashMap<>();
+        int initialRecordsProduced = 0;
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp, HEADERS1,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+        expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a0", 
baseTimestamp, HEADERS1)));
+        expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b0", 
baseTimestamp, HEADERS1)));
+        expectedData.put(3, Optional.empty());  // null value
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 5, HEADERS2,
+            KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c5"));
+        expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a5", 
baseTimestamp + 5, HEADERS2)));
+        expectedData.put(2, Optional.empty());  // null value
+        expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c5", 
baseTimestamp + 5, HEADERS2)));
+
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 10, EMPTY_HEADERS,
+            KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, 
"c10"));
+        expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a10", 
baseTimestamp + 10, EMPTY_HEADERS)));
+        expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b10", 
baseTimestamp + 10, EMPTY_HEADERS)));
+        expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c10", 
baseTimestamp + 10, EMPTY_HEADERS)));
+
+        // wait for output
+        IntegrationTestUtils.waitUntilMinRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app - use processor WITHOUT validation of initial data, 
just write to store
+        streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records to verify restored store works correctly
+        final Headers finalHeaders = new RecordHeaders().add("final", 
"true".getBytes());
+        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 12, finalHeaders,
+            KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertEquals(receivedRecord.value, 0);
+        }
+    }
+
+    @Test
+    public void shouldManualUpgradeFromTimestampedToHeaders() throws Exception 
{
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilder(
+                    Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(TimestampedStoreContentCheckerProcessor::new, STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build());
+    }
+
+    private void shouldManualUpgradeFromTimestampedToHeaders(final Topology 
originalTopology) throws Exception {
+        // build original timestamped (legacy) topology and start app
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(originalTopology, props);
+        kafkaStreams.start();
+
+        // produce source data to legacy timestamped store (without headers)
+        int initialRecordsProduced = 0;
+        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp,
+            KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
+        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp + 5,
+            KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c5"));
+        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp + 2,
+            KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, 
null));
+
+        // wait for output and verify
+        List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertEquals(receivedRecord.value, 0);
+        }
+
+        // wipe out state store to trigger restore process on restart
+        kafkaStreams.close();
+        kafkaStreams.cleanUp();
+
+        // restart app with headers-aware store to test upgrade path
+        // The store should migrate legacy timestamped data (without headers)
+        // and add empty headers to existing data
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder
+            .addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.Integer(),
+                    Serdes.String()
+                )
+            )
+            .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+            .process(() -> new 
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+        kafkaStreams.start();
+
+        // produce additional records with headers to verify upgraded store 
works
+        final Headers upgradedHeaders = new RecordHeaders().add("upgraded", 
"true".getBytes());
+        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 12, upgradedHeaders,
+            KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, 
"c12"));
+
+        // wait for output and verify
+        receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            TestUtils.consumerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerDeserializer.class,
+                IntegerDeserializer.class),
+            outputStream,
+            initialRecordsProduced + additionalRecordsProduced);
+
+        for (final KeyValue<Integer, Integer> receivedRecord : 
receivedRecords) {
+            // verify zero failed checks for each record
+            assertEquals(receivedRecord.value, 0);
+        }
+    }
+
+    private Properties props() {
+        final String safeTestName = safeUniqueTestName(testInfo);
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    /**
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceDataToTopic(final String topic,
+                                         final long timestamp,
+                                         final KeyValue<Integer, String>... 
keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            topic,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            timestamp);
+        return keyValues.length;
+    }
+
+
+
+    /**
+     * Produce records with headers.
+     *
+     * @return number of records produced
+     */
+    @SuppressWarnings("varargs")
+    @SafeVarargs
+    private final int produceDataToTopicWithHeaders(final String topic,
+                                                     final long timestamp,
+                                                     final Headers headers,
+                                                     final KeyValue<Integer, 
String>... keyValues) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            topic,
+            Arrays.asList(keyValues),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+        return keyValues.length;
+    }
+
+    /**
+     * Processor for validating expected contents of a timestamped store with 
headers, and forwards
+     * the number of failed checks downstream for consumption.
+     */
+    private static class TimestampedStoreWithHeadersContentCheckerProcessor 
implements Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private TimestampedKeyValueStoreWithHeaders<Integer, String> store;
+
+        // whether the processor should write records to the store as they 
arrive.
+        private final boolean writeToStore;
+        // in-memory copy of seen data, to validate for testing purposes.
+        private final Map<Integer, Optional<ValueTimestampHeaders<String>>> 
data;
+
+        TimestampedStoreWithHeadersContentCheckerProcessor(final boolean 
writeToStore) {
+            this.writeToStore = writeToStore;
+            this.data = new HashMap<>();
+        }
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            if (writeToStore) {
+                final ValueTimestampHeaders<String> valueTimestampHeaders =
+                    ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers());
+                store.put(record.key(), valueTimestampHeaders);
+                data.put(record.key(), 
Optional.ofNullable(valueTimestampHeaders));
+            }
+
+            // check expected contents of store, and signal completion by 
writing number of failures to downstream
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        /**
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, 
Optional<ValueTimestampHeaders<String>>> keyWithValueTimestampHeaders : 
data.entrySet()) {
+                final Integer key = keyWithValueTimestampHeaders.getKey();
+                final ValueTimestampHeaders<String> 
expectedValueTimestampHeaders =
+                    keyWithValueTimestampHeaders.getValue().orElse(null);
+
+                // validate get from store
+                final ValueTimestampHeaders<String> 
actualValueTimestampHeaders = store.get(key);
+                if (!Objects.equals(actualValueTimestampHeaders, 
expectedValueTimestampHeaders)) {
+                    failedChecks++;
+                }
+            }
+            return failedChecks;
+        }
+    }
+
+    /**
+     * Processor for validating expected contents of a timestamped store 
(without headers).
+     * Used for testing the upgrade path from TimestampedKeyValueStore to 
TimestampedKeyValueStoreWithHeaders.
+     */
+    private static class TimestampedStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
+
+        private ProcessorContext<Integer, Integer> context;
+        private TimestampedKeyValueStore<Integer, String> store;
+
+        // in-memory copy of seen data, to validate for testing purposes.
+        private final Map<Integer, Optional<ValueAndTimestamp<String>>> data;
+
+        TimestampedStoreContentCheckerProcessor() {
+            this.data = new HashMap<>();
+        }
+
+        @Override
+        public void init(final ProcessorContext<Integer, Integer> context) {
+            this.context = context;
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<Integer, String> record) {
+            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(record.value(), record.timestamp());
+            store.put(record.key(), valueAndTimestamp);
+            data.put(record.key(), Optional.ofNullable(valueAndTimestamp));
+
+            // check expected contents of store, and signal completion by 
writing
+            // number of failures to downstream
+            final int failedChecks = checkStoreContents();
+            context.forward(record.withValue(failedChecks));
+        }
+
+        /**
+         * @return number of failed checks
+         */
+        private int checkStoreContents() {
+            int failedChecks = 0;
+            for (final Map.Entry<Integer, Optional<ValueAndTimestamp<String>>> 
keyWithValueAndTimestamp : data.entrySet()) {
+                final Integer key = keyWithValueAndTimestamp.getKey();
+                final ValueAndTimestamp<String> valueAndTimestamp = 
keyWithValueAndTimestamp.getValue().orElse(null);
+
+                // validate get from store
+                final ValueAndTimestamp<String> record = store.get(key);
+                if (!Objects.equals(record, valueAndTimestamp)) {
+                    failedChecks++;
+                }
+            }
+            return failedChecks;
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 569adad58c3..8748843f034 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -26,8 +26,10 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.VersionedRecord;
 import org.apache.kafka.streams.state.WindowStore;
@@ -61,7 +63,9 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
     }
 
     static StateStore wrapWithReadWriteStore(final StateStore store) {
-        if (store instanceof TimestampedKeyValueStore) {
+        if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+            return new 
TimestampedKeyValueStoreReadWriteDecoratorWithHeaders<>((TimestampedKeyValueStoreWithHeaders<?,
 ?>) store);
+        } else if (store instanceof TimestampedKeyValueStore) {
             return new 
TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) 
store);
         } else if (store instanceof VersionedKeyValueStore) {
             return new 
VersionedKeyValueStoreReadWriteDecorator<>((VersionedKeyValueStore<?, ?>) 
store);
@@ -326,4 +330,13 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
             return wrapped().fetch(keyFrom, keyTo);
         }
     }
+
+    static class TimestampedKeyValueStoreReadWriteDecoratorWithHeaders<K, V>
+        extends KeyValueStoreReadWriteDecorator<K, ValueTimestampHeaders<V>>
+        implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+        TimestampedKeyValueStoreReadWriteDecoratorWithHeaders(final 
TimestampedKeyValueStoreWithHeaders<K, V> inner) {
+            super(inner);
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 101c5c7e145..e36fe042b6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesSto
 import 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders;
 import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
 import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -94,7 +95,7 @@ public final class Stores {
      */
     public static KeyValueBytesStoreSupplier persistentKeyValueStore(final 
String name) {
         Objects.requireNonNull(name, "name cannot be null");
-        return new RocksDBKeyValueBytesStoreSupplier(name, false);
+        return new RocksDBKeyValueBytesStoreSupplier(name, false, false);
     }
 
     /**
@@ -113,7 +114,33 @@ public final class Stores {
      */
     public static KeyValueBytesStoreSupplier 
persistentTimestampedKeyValueStore(final String name) {
         Objects.requireNonNull(name, "name cannot be null");
-        return new RocksDBKeyValueBytesStoreSupplier(name, true);
+        return new RocksDBKeyValueBytesStoreSupplier(name, true, false);
+    }
+
+    /**
+     * Create a persistent {@link KeyValueBytesStoreSupplier} that stores 
headers along with timestamps.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link 
#timestampedKeyValueStoreBuilderWithHeaders(KeyValueBytesStoreSupplier, Serde, 
Serde)}
+     * to build a {@link TimestampedKeyValueStoreWithHeaders}.
+     * <p>
+     * The store will persist key-value pairs along with record timestamps and 
headers,
+     * using the format specified in KIP-1271. This allows state stores to 
maintain
+     * full record context including headers for downstream processing.
+     * <p>
+     * If you want to create a {@link KeyValueStore}, {@link 
TimestampedKeyValueStore}, or
+     * {@link VersionedKeyValueStore} you should use {@link 
#persistentKeyValueStore(String)},
+     * {@link #persistentTimestampedKeyValueStore(String)}, or
+     * {@link #persistentVersionedKeyValueStore(String, Duration)}, 
respectively,
+     * to create a store supplier instead.
+     *
+     * @param name  name of the store (cannot be {@code null})
+     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be 
used
+     * to build a persistent key-(headers/timestamp/value) store
+     */
+    public static KeyValueBytesStoreSupplier 
persistentTimestampedKeyValueStoreWithHeaders(final String name) {
+        Objects.requireNonNull(name, "name cannot be null");
+        return new RocksDBKeyValueBytesStoreSupplier(name, true, true);
     }
 
     /**
@@ -563,4 +590,22 @@ public final class Stores {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, 
Time.SYSTEM);
     }
+
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link 
TimestampedKeyValueStoreWithHeaders}.
+     *
+     * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be 
{@code null})
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is 
{@code null} for put operations,
+     *                      it is treated as delete
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of {@link StoreBuilder} than can build a {@link 
KeyValueStore}
+     */
+    public static <K, V> StoreBuilder<TimestampedKeyValueStoreWithHeaders<K, 
V>> timestampedKeyValueStoreBuilderWithHeaders(final KeyValueBytesStoreSupplier 
supplier,
+                                                                               
                                             final Serde<K> keySerde,
+                                                                               
                                             final Serde<V> valueSerde) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
+        return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, 
keySerde, valueSerde, Time.SYSTEM);
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index c6304814727..e8588d9146a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -65,7 +65,7 @@ public class CachingKeyValueStore
     private Thread streamThread;
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final Position position;
-    private final boolean timestampedSchema;
+    private final CacheType cacheType;
 
     @FunctionalInterface
     public interface CacheQueryHandler {
@@ -89,10 +89,10 @@ public class CachingKeyValueStore
         );
 
 
-    CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying, final 
boolean timestampedSchema) {
+    CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying, final 
CacheType cacheType) {
         super(underlying);
         position = Position.emptyPosition();
-        this.timestampedSchema = timestampedSchema;
+        this.cacheType = cacheType;
     }
 
     @Override
@@ -130,6 +130,10 @@ public class CachingKeyValueStore
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
 
+        if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
+            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+        }
+
         final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
         final QueryResult<R> result;
 
@@ -187,7 +191,7 @@ public class CachingKeyValueStore
                 final LRUCacheEntry lruCacheEntry = 
internalContext.cache().get(cacheName, key);
                 if (lruCacheEntry != null) {
                     final byte[] rawValue;
-                    if (timestampedSchema && 
!WrappedStateStore.isTimestamped(wrapped()) && 
!StoreQueryUtils.isAdapter(wrapped())) {
+                    if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE && 
!WrappedStateStore.isTimestamped(wrapped()) && 
!StoreQueryUtils.isAdapter(wrapped())) {
                         rawValue = 
ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value());
                     } else {
                         rawValue = lruCacheEntry.value();
@@ -506,4 +510,10 @@ public class CachingKeyValueStore
             lock.writeLock().unlock();
         }
     }
+
+    public enum CacheType {
+        KEY_VALUE_STORE,
+        TIMESTAMPED_KEY_VALUE_STORE,
+        TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
index a93656f15d8..0cfbe2d575a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
@@ -52,7 +52,7 @@ public class KeyValueStoreBuilder<K, V> extends 
AbstractStoreBuilder<K, V, KeyVa
         if (!enableCaching) {
             return inner;
         }
-        return new CachingKeyValueStore(inner, false);
+        return new CachingKeyValueStore(inner, 
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
     }
 
     private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final 
KeyValueStore<Bytes, byte[]> inner) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
index 653246a7e69..92b4c4d96e9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
@@ -51,7 +51,7 @@ public class ListValueStoreBuilder<K, V> extends 
AbstractStoreBuilder<K, V, KeyV
         if (!enableCaching) {
             return inner;
         }
-        return new CachingKeyValueStore(inner, false);
+        return new CachingKeyValueStore(inner, 
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
     }
 
     private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final 
KeyValueStore<Bytes, byte[]> inner) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 4e955c98f1a..df36c04b2d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -102,12 +102,12 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-        throw new UnsupportedOperationException("Querying is not supported for 
" + getClass().getSimpleName());
+        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
     }
 
     @Override
     public Position getPosition() {
-        throw new UnsupportedOperationException("Position is not supported for 
" + getClass().getSimpleName());
+        throw new UnsupportedOperationException("Position is not supported by 
timestamped key-value stores with headers yet.");
     }
 
     protected Bytes keyBytes(final K key, final Headers headers) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
index 6f5d6d547db..a2d3426c6c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
@@ -24,11 +24,18 @@ public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupp
 
     private final String name;
     private final boolean returnTimestampedStore;
+    private final boolean returnHeadersStore;
 
     public RocksDBKeyValueBytesStoreSupplier(final String name,
-                                             final boolean 
returnTimestampedStore) {
+                                             final boolean 
returnTimestampedStore,
+                                             final boolean returnHeadersStore) 
{
         this.name = name;
         this.returnTimestampedStore = returnTimestampedStore;
+        this.returnHeadersStore = returnHeadersStore;
+        if (returnHeadersStore && !returnTimestampedStore) {
+            throw new IllegalStateException(
+                "RocksDBKeyValueBytesStoreSupplier cannot return a headers 
store without also returning a timestamped store!");
+        }
     }
 
     @Override
@@ -38,9 +45,13 @@ public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupp
 
     @Override
     public KeyValueStore<Bytes, byte[]> get() {
-        return returnTimestampedStore ?
-            new RocksDBTimestampedStore(name, metricsScope()) :
-            new RocksDBStore(name, metricsScope());
+        if (returnHeadersStore && returnTimestampedStore) {
+            return new RocksDBTimestampedStoreWithHeaders(name, 
metricsScope());
+        }
+        if (returnTimestampedStore) {
+            return new RocksDBTimestampedStore(name, metricsScope());
+        }
+        return new RocksDBStore(name, metricsScope());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index c4dbb6ea259..52e4ec68bdb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -18,6 +18,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
@@ -148,4 +152,11 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
         }
     }
 
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+    }
+
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index ded0efe65f0..28516b4c8a7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -76,7 +76,7 @@ public class TimestampedKeyValueStoreBuilder<K, V>
         if (!enableCaching) {
             return inner;
         }
-        return new CachingKeyValueStore(inner, true);
+        return new CachingKeyValueStore(inner, 
CachingKeyValueStore.CacheType.TIMESTAMPED_KEY_VALUE_STORE);
     }
 
     private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final 
KeyValueStore<Bytes, byte[]> inner) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
similarity index 66%
copy from 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
copy to 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index ded0efe65f0..0c09eb2027d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -21,76 +21,88 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.TimestampedBytesStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
 
 import java.util.List;
 import java.util.Objects;
 
-public class TimestampedKeyValueStoreBuilder<K, V>
-    extends AbstractStoreBuilder<K, ValueAndTimestamp<V>, 
TimestampedKeyValueStore<K, V>> {
+/**
+ * Builder for {@link TimestampedKeyValueStoreWithHeaders} instances.
+ *
+ * This is analogous to {@link TimestampedKeyValueStoreBuilder}, but uses
+ * {@link ValueTimestampHeaders} as the value wrapper and wires up the
+ * header-aware store stack (change-logging, caching, metering).
+ */
+public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
+    extends AbstractStoreBuilder<K, ValueTimestampHeaders<V>, 
TimestampedKeyValueStoreWithHeaders<K, V>> {
 
     private final KeyValueBytesStoreSupplier storeSupplier;
 
-    public TimestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier 
storeSupplier,
-                                           final Serde<K> keySerde,
-                                           final Serde<V> valueSerde,
-                                           final Time time) {
+    public TimestampedKeyValueStoreBuilderWithHeaders(final 
KeyValueBytesStoreSupplier storeSupplier,
+                                                      final Serde<K> keySerde,
+                                                      final Serde<V> 
valueSerde,
+                                                      final Time time) {
         super(
             storeSupplier.name(),
             keySerde,
-            valueSerde == null ? null : new 
ValueAndTimestampSerde<>(valueSerde),
-            time);
+            valueSerde == null ? null : new 
ValueTimestampHeadersSerde<>(valueSerde),
+            time
+        );
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's 
metricsScope can't be null");
         this.storeSupplier = storeSupplier;
     }
 
     @Override
-    public TimestampedKeyValueStore<K, V> build() {
+    public TimestampedKeyValueStoreWithHeaders<K, V> build() {
         KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
-        if (!(store instanceof TimestampedBytesStore)) {
+
+        if (!(store instanceof HeadersBytesStore)) {
             if (store.persistent()) {
-                store = new 
KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+                store = new TimestampedToHeadersStoreAdapter(store);
             } else {
-                store = new InMemoryTimestampedKeyValueStoreMarker(store);
+                store = new 
InMemoryTimestampedKeyValueStoreWithHeadersMarker(store);
             }
         }
-        return new MeteredTimestampedKeyValueStore<>(
+
+        return new MeteredTimestampedKeyValueStoreWithHeaders<>(
             maybeWrapCaching(maybeWrapLogging(store)),
             storeSupplier.metricsScope(),
             time,
             keySerde,
-            valueSerde);
+            valueSerde
+        );
     }
 
     private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final 
KeyValueStore<Bytes, byte[]> inner) {
         if (!enableCaching) {
             return inner;
         }
-        return new CachingKeyValueStore(inner, true);
+        return new CachingKeyValueStore(inner, 
CachingKeyValueStore.CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS);
     }
 
     private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final 
KeyValueStore<Bytes, byte[]> inner) {
         if (!enableLogging) {
             return inner;
         }
-        return new ChangeLoggingTimestampedKeyValueBytesStore(inner);
+        return new 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(inner);
     }
 
-    private static final class InMemoryTimestampedKeyValueStoreMarker
+    private static final class 
InMemoryTimestampedKeyValueStoreWithHeadersMarker
         extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]>
-        implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
+        implements KeyValueStore<Bytes, byte[]>, HeadersBytesStore {
 
-        private InMemoryTimestampedKeyValueStoreMarker(final 
KeyValueStore<Bytes, byte[]> wrapped) {
+        private InMemoryTimestampedKeyValueStoreWithHeadersMarker(final 
KeyValueStore<Bytes, byte[]> wrapped) {
             super(wrapped);
             if (wrapped.persistent()) {
                 throw new IllegalArgumentException("Provided store must not be 
a persistent store, but it is.");
@@ -159,16 +171,15 @@ public class TimestampedKeyValueStoreBuilder<K, V>
 
         @Override
         public <R> QueryResult<R> query(final Query<R> query,
-            final PositionBound positionBound,
-            final QueryConfig config) {
-
-            final long start = config.isCollectExecutionInfo() ? 
System.nanoTime() : -1L;
-            final QueryResult<R> result = wrapped().query(query, 
positionBound, config);
-            if (config.isCollectExecutionInfo()) {
-                final long end = System.nanoTime();
-                result.addExecutionInfo("Handled in " + getClass() + " in " + 
(end - start) + "ns");
-            }
-            return result;
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+
+            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported by timestamped key-value stores with headers yet.");
+        }
+
+        @Override
+        public Position getPosition() {
+            throw new UnsupportedOperationException("Position is not supported 
by timestamped key-value stores with headers yet.");
         }
 
         @Override
@@ -176,4 +187,4 @@ public class TimestampedKeyValueStoreBuilder<K, V>
             return false;
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
new file mode 100644
index 00000000000..6a0a23e1c36
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders} 
and
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore}.
+ *
+ * @see TimestampedToHeadersStoreAdapter
+ */
+
+class TimestampedToHeadersIteratorAdapter<K> implements KeyValueIterator<K, 
byte[]> {
+    private final KeyValueIterator<K, byte[]> innerIterator;
+
+    public TimestampedToHeadersIteratorAdapter(final KeyValueIterator<K, 
byte[]> innerIterator) {
+        this.innerIterator = innerIterator;
+    }
+
+    @Override
+    public void close() {
+        innerIterator.close();
+    }
+
+    @Override
+    public K peekNextKey() {
+        return innerIterator.peekNextKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return innerIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<K, byte[]> next() {
+        final KeyValue<K, byte[]> timestampedKeyValue = innerIterator.next();
+        return KeyValue.pair(timestampedKeyValue.key, 
convertToHeaderFormat(timestampedKeyValue.value));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
new file mode 100644
index 00000000000..cbc3bd1608b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and
+ * {@link TimestampedKeyValueStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedKeyValueStore} (without 
headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to 
translate between
+ * the timestamped {@code byte[]} format and the timestamped-with-headers 
{@code byte[]} format.
+ *
+ * @see TimestampedToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class TimestampedToHeadersStoreAdapter implements KeyValueStore<Bytes, 
byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    TimestampedToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) 
{
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        if (!(store instanceof TimestampedBytesStore)) {
+            throw new IllegalArgumentException("Provided store must be a 
timestamped store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw timestamped value (timestamp + value) from serialized 
ValueTimestampHeaders.
+     * This strips the headers portion but keeps timestamp and value intact.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [timestamp(8)][value]
+     */
+    static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers, keep timestamp + value
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestampAndHeaders) {
+        store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueWithTimestampAndHeaders) {
+        return convertToHeaderFormat(store.putIfAbsent(
+            key,
+            rawTimestampedValue(valueWithTimestampAndHeaders)));
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueWithTimestampAndHeaders = entry.value;
+            store.put(entry.key, 
rawTimestampedValue(valueWithTimestampAndHeaders));
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        return convertToHeaderFormat(store.delete(key));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        store.init(stateStoreContext, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return convertToHeaderFormat(store.get(key));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
+        return new TimestampedToHeadersIteratorAdapter<>(store.range(from, 
to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+                                                        final Bytes to) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.reverseRange(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.reverseAll());
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
     final PS prefixKeySerializer) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return store.approximateNumEntries();
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 4a0579b8ef0..7af5b68e462 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -85,7 +85,7 @@ public class CachingInMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest
         final String storeName = "store";
         underlyingStore = new InMemoryKeyValueStore(storeName);
         cacheFlushListener = new CacheFlushListenerStub<>(new 
StringDeserializer(), new StringDeserializer());
-        store = new CachingKeyValueStore(underlyingStore, false);
+        store = new CachingKeyValueStore(underlyingStore, 
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
         store.setFlushListener(cacheFlushListener, false);
         cache = new ThreadCache(new LogContext("testCache "), 
maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext<>(null, null, null, null, 
cache);
@@ -110,7 +110,7 @@ public class CachingInMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest
     @Test
     public void shouldDelegateInit() {
         final KeyValueStore<Bytes, byte[]> inner = 
mock(InMemoryKeyValueStore.class);
-        final CachingKeyValueStore outer = new CachingKeyValueStore(inner, 
false);
+        final CachingKeyValueStore outer = new CachingKeyValueStore(inner, 
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
         when(inner.name()).thenReturn("store");
         outer.init(context, outer);
         verify(inner).init(context, outer);
@@ -179,7 +179,7 @@ public class CachingInMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest
     private void setUpCloseTests() {
         underlyingStore = mock(KeyValueStore.class);
         when(underlyingStore.name()).thenReturn("store-name");
-        store = new CachingKeyValueStore(underlyingStore, false);
+        store = new CachingKeyValueStore(underlyingStore, 
CachingKeyValueStore.CacheType.TIMESTAMPED_KEY_VALUE_STORE);
         cache = mock(ThreadCache.class);
         context = new 
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, 
cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, 
new RecordHeaders()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index 7b1bef67ad3..98a74df76c4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import org.junit.jupiter.api.Test;
@@ -579,6 +582,20 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         }
     }
 
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnQuery() {
+        rocksDBStore.init(context, rocksDBStore);
+
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test".getBytes()));
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+                () -> rocksDBStore.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
+        );
+
+        assertTrue(exception.getMessage().contains("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet."));
+    }
+
     private byte[] wrapTimestampedValue(final byte[] value) {
         // Format: [timestamp(8 bytes)][value]
         // Use the numeric value as timestamp
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
new file mode 100644
index 00000000000..3929a11667a
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class TimestampedKeyValueStoreBuilderWithHeadersTest {
+
+    @Mock
+    private KeyValueBytesStoreSupplier supplier;
+    @Mock
+    private RocksDBTimestampedStoreWithHeaders inner;
+    private TimestampedKeyValueStoreBuilderWithHeaders<String, String> builder;
+
+    private void setUpWithoutInner() {
+        when(supplier.name()).thenReturn("name");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+    }
+
+    private void setUp() {
+        when(supplier.get()).thenReturn(inner);
+        setUpWithoutInner();
+    }
+
+    @Test
+    public void shouldHaveMeteredStoreAsOuterStore() {
+        setUp();
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder.build();
+        assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class, 
store);
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreByDefault() {
+        setUp();
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder.build();
+        assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class, 
store);
+        final StateStore next = ((WrappedStateStore) store).wrapped();
+        
assertInstanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class, 
next);
+    }
+
+    @Test
+    public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+        setUp();
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder.withLoggingDisabled().build();
+        final StateStore next = ((WrappedStateStore) store).wrapped();
+        assertEquals(next, inner);
+    }
+
+    @Test
+    public void shouldHaveCachingStoreWhenEnabled() {
+        setUp();
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder.withCachingEnabled().build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class, 
store);
+        assertInstanceOf(CachingKeyValueStore.class, wrapped);
+    }
+
+    @Test
+    public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+        setUp();
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingEnabled(Collections.emptyMap())
+                .build();
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class, 
store);
+        
assertInstanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class, 
wrapped);
+        assertEquals(((WrappedStateStore) wrapped).wrapped(), inner);
+    }
+
+    @Test
+    public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+        setUp();
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingEnabled(Collections.emptyMap())
+                .withCachingEnabled()
+                .build();
+        final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        final WrappedStateStore changeLogging = (WrappedStateStore) 
caching.wrapped();
+        assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class, 
store);
+        assertInstanceOf(CachingKeyValueStore.class, caching);
+        
assertInstanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class, 
changeLogging);
+        assertEquals(changeLogging.wrapped(), inner);
+    }
+
+    @Test
+    public void shouldNotWrapTimestampedByteStore() {
+        setUp();
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("name", "metrics-scope"));
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+        assertInstanceOf(RocksDBTimestampedStoreWithHeaders.class, 
((WrappedStateStore) store).wrapped());
+    }
+
+    @Test
+    public void shouldWrapTimestampKeyValueStoreAsHeadersStore() {
+        setUp();
+        when(supplier.get()).thenReturn(new RocksDBTimestampedStore("name", 
"metrics-scope"));
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+        assertInstanceOf(TimestampedToHeadersStoreAdapter.class, 
((WrappedStateStore) store).wrapped());
+    }
+
+    @Test
+    public void shouldThrowNullPointerIfInnerIsNull() {
+        setUpWithoutInner();
+        assertThrows(NullPointerException.class, () ->
+                new TimestampedKeyValueStoreBuilderWithHeaders<>(null, 
Serdes.String(), Serdes.String(), new MockTime()));
+    }
+
+    @Test
+    public void shouldNotThrowNullPointerIfKeySerdeIsNull() {
+        setUpWithoutInner();
+        // does not throw
+        new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, null, 
Serdes.String(), new MockTime());
+    }
+
+    @Test
+    public void shouldNotThrowNullPointerIfValueSerdeIsNull() {
+        setUpWithoutInner();
+        // does not throw
+        new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, 
Serdes.String(), null, new MockTime());
+    }
+
+    @Test
+    public void shouldThrowNullPointerIfTimeIsNull() {
+        setUpWithoutInner();
+        assertThrows(NullPointerException.class, () ->
+                new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, 
Serdes.String(), Serdes.String(), null));
+    }
+
+    @Test
+    public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+        setUpWithoutInner();
+        when(supplier.metricsScope()).thenReturn(null);
+
+        final Exception e = assertThrows(NullPointerException.class,
+                () -> new 
TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, Serdes.String(), 
Serdes.String(), new MockTime()));
+        assertTrue(e.getMessage().contains("storeSupplier's metricsScope can't 
be null"));
+    }
+
+    @Test
+    public void shouldThrowUsingIQv2ForInMemoryStores() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final KeyQuery<String, ValueTimestampHeaders<String>> query =
+                KeyQuery.withKey("test-key");
+
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+                () -> wrapped.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
+        );
+
+        assertTrue(exception.getMessage().contains(
+            "Queries (IQv2) are not supported by timestamped key-value stores 
with headers yet."));
+    }
+
+    @Test
+    public void shouldThrowWhenUsingIQv2InHeadersStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped);
+
+        final KeyQuery<String, ValueTimestampHeaders<String>> query =
+                KeyQuery.withKey("test-key");
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+                () -> wrapped.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
+        );
+
+        assertTrue(exception.getMessage().contains("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet."));
+    }
+
+    @Test
+    public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new RocksDBStore("test-store", 
"metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final IllegalArgumentException exception = assertThrows(
+                IllegalArgumentException.class,
+                () -> 
builder.withLoggingDisabled().withCachingDisabled().build()
+        );
+
+        assertTrue(exception.getMessage().contains("Provided store must be a 
timestamped store"));
+    }
+
+    @Test
+    public void shouldThrowUsingIQv2ForNativeHeadersStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertInstanceOf(RocksDBTimestampedStoreWithHeaders.class, wrapped);
+
+        final KeyQuery<String, ValueTimestampHeaders<String>> query =
+                KeyQuery.withKey("test-key");
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+                () -> wrapped.query(query, PositionBound.unbounded(), new 
QueryConfig(false))
+        );
+
+        assertTrue(exception.getMessage().contains("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet."));
+    }
+
+    @Test
+    public void shouldThrowOnGetPositionForInMemoryStores() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+                store::getPosition
+        );
+
+        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
+    }
+
+    @Test
+    public void shouldThrowOnGetPositionForInMemoryStoreMarker() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        // Unwrap to get directly to the 
InMemoryTimestampedKeyValueStoreWithHeadersMarker
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertInstanceOf(KeyValueStore.class, wrapped);
+        assertInstanceOf(HeadersBytesStore.class, wrapped);
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+            wrapped::getPosition
+        );
+
+        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
+    }
+
+    @Test
+    public void shouldThrowOnGetPositionForHeadersStoreAdapter() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+                store::getPosition
+        );
+
+        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
+    }
+
+    @Test
+    public void shouldThrowOnGetPositionForNativeHeadersStore() {
+        when(supplier.name()).thenReturn("test-store");
+        when(supplier.metricsScope()).thenReturn("metricScope");
+        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+                supplier,
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+                .withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+
+        final UnsupportedOperationException exception = assertThrows(
+                UnsupportedOperationException.class,
+                store::getPosition
+        );
+
+        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
+    }
+
+}
\ No newline at end of file
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 65fcb58c24f..6ebf325bda0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -82,8 +82,10 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -874,6 +876,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
      * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -905,6 +908,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
      * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -985,6 +989,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getTimestampedKeyValueStore(String)
      * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -1012,6 +1017,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -1035,6 +1041,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
@@ -1045,6 +1052,30 @@ public class TopologyTestDriver implements Closeable {
         return store instanceof VersionedKeyValueStore ? 
(VersionedKeyValueStore<K, V>) store : null;
     }
 
+    /**
+     * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the 
test case instructs the topology to
+     * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, 
and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link 
TimestampedKeyValueStoreWithHeaders} has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
+     * @see #getVersionedKeyValueStore(String)
+     * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
+     * @see #getSessionStore(String)
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>> 
getTimestampedKeyValueStoreWithHeaders(final String name) {
+        final StateStore store = getStateStore(name, false);
+        return store instanceof TimestampedKeyValueStoreWithHeaders ? 
(TimestampedKeyValueStoreWithHeaders<K, V>) store : null;
+    }
+
     /**
      * Get the {@link WindowStore} or {@link TimestampedWindowStore} with the 
given name.
      * The store can be a "regular" or global store.
@@ -1064,6 +1095,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
      * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
      */
@@ -1091,6 +1123,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
      * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getSessionStore(String)
      */
@@ -1116,6 +1149,7 @@ public class TopologyTestDriver implements Closeable {
      * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> SessionStore<K, V> getSessionStore(final String name) {


Reply via email to