This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2eacc1cad4d KAFKA-20260: Upgrade path from plain KV store to headers 
store (#21666)
2eacc1cad4d is described below

commit 2eacc1cad4d2dd710dd59a9df049c9f4e59f6807
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Mar 10 11:12:52 2026 +0100

    KAFKA-20260: Upgrade path from plain KV store to headers store (#21666)
    
    Extends headers-aware key-value store migration to support upgrading
    from plain (non-timestamped) stores.
    - Plain-to-Headers Migration Support
    - RocksDB Upgrade Path
    - Implemented `query()` method in the adapter class
    
    Testing:
    - 3 upgrade intergration tests
    - 2 downgrade integration tests
    - unit tests for all added methods/classes
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 380 +++++++++++++++++--
 .../kafka/streams/state/HeadersBytesStore.java     |  26 ++
 .../internals/PlainToHeadersIteratorAdapter.java   |  62 ++++
 .../internals/PlainToHeadersStoreAdapter.java      | 215 +++++++++++
 .../RocksDBTimestampedStoreWithHeaders.java        | 134 ++++---
 ...TimestampedKeyValueStoreBuilderWithHeaders.java |   8 +-
 .../kafka/streams/state/internals/Utils.java       |  48 +++
 .../kafka/streams/state/HeadersBytesStoreTest.java |  65 ++++
 .../internals/PlainToHeadersStoreAdapterTest.java  | 404 +++++++++++++++++++++
 .../RocksDBTimestampedStoreWithHeadersTest.java    | 327 ++++++++++++++++-
 ...stampedKeyValueStoreBuilderWithHeadersTest.java |  25 +-
 .../kafka/streams/state/internals/UtilsTest.java   |  50 +++
 12 files changed, 1627 insertions(+), 117 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index a9a3c9a9e32..145720c4374 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -133,10 +134,10 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
 
         streamsBuilderForOldStore.addStateStore(
-            Stores.timestampedKeyValueStoreBuilder(
-                persistentStore ? 
Stores.persistentTimestampedKeyValueStore(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
-                Serdes.String(),
-                Serdes.String()))
+                Stores.timestampedKeyValueStoreBuilder(
+                    persistentStore ? 
Stores.persistentTimestampedKeyValueStore(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
             .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .process(TimestampedKeyValueProcessor::new, STORE_NAME);
 
@@ -154,10 +155,10 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
 
         streamsBuilderForNewStore.addStateStore(
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
-                persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
-                Serdes.String(),
-                Serdes.String()))
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
             .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
 
@@ -184,10 +185,10 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
 
         streamsBuilderForOldStore.addStateStore(
-            Stores.timestampedKeyValueStoreBuilder(
-                Stores.persistentTimestampedKeyValueStore(STORE_NAME),
-                Serdes.String(),
-                Serdes.String()))
+                Stores.timestampedKeyValueStoreBuilder(
+                    Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
             .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .process(TimestampedKeyValueProcessor::new, STORE_NAME);
 
@@ -207,10 +208,10 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
 
         streamsBuilderForNewStore.addStateStore(
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
-                Stores.persistentTimestampedKeyValueStore(STORE_NAME),
-                Serdes.String(),
-                Serdes.String()))
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
             .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
 
@@ -233,6 +234,125 @@ public class HeadersStoreUpgradeIntegrationTest {
         kafkaStreams.close();
     }
 
+    @Test
+    public void 
shouldMigrateInMemoryPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+    }
+
+    private void 
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
 boolean persistentStore) throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+                Stores.keyValueStoreBuilder(
+                    persistentStore ? 
Stores.persistentKeyValueStore(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyValue("key1", "value1");
+        final long lastUpdateKeyOne = persistentStore ? -1L : 
CLUSTER.time.milliseconds() - 1L;
+
+        processKeyValueAndVerifyValue("key2", "value2");
+        final long lastUpdateKeyTwo = persistentStore ? -1L : 
CLUSTER.time.milliseconds() - 1L;
+
+        processKeyValueAndVerifyValue("key3", "value3");
+        final long lastUpdateKeyThree = persistentStore ? -1L : 
CLUSTER.time.milliseconds() - 1L;
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
+        kafkaStreams.start();
+
+        // Verify legacy data can be read with empty headers and timestamp
+        verifyLegacyValuesWithEmptyHeaders("key1", "value1", lastUpdateKeyOne);
+        verifyLegacyValuesWithEmptyHeaders("key2", "value2", lastUpdateKeyTwo);
+        verifyLegacyValuesWithEmptyHeaders("key3", "value3", 
lastUpdateKeyThree);
+
+        // Process new records with headers
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "test".getBytes());
+
+        processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3", 
333L, headers, headers);
+        processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4", 
444L, headers, headers);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void 
shouldProxyPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi() 
throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+                Stores.keyValueStoreBuilder(
+                    Stores.persistentKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyValue("key1", "value1");
+        processKeyValueAndVerifyValue("key2", "value2");
+        processKeyValueAndVerifyValue("key3", "value3");
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    Stores.persistentKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
+        kafkaStreams.start();
+
+        // Verify legacy data can be read with empty headers
+        verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1L);
+        verifyLegacyValuesWithEmptyHeaders("key2", "value2", -1L);
+        verifyLegacyValuesWithEmptyHeaders("key3", "value3", -1L);
+
+        // Process new records with headers
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+        final Headers expectedHeaders = new RecordHeaders();
+
+        processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3", 
333L, -1, headers, expectedHeaders);
+        processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4", 
444L, -1, headers, expectedHeaders);
+
+        kafkaStreams.close();
+    }
+
     private <K, V> void processKeyValueAndVerifyTimestampedValue(final K key,
                                                                  final V value,
                                                                  final long 
timestamp)
@@ -269,6 +389,41 @@ public class HeadersStoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
+    private <K, V> void processKeyValueAndVerifyValue(final K key,
+                                                      final V value)
+        throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            CLUSTER.time,
+            false);
+
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, V> store =
+                        IntegrationTestUtils.getStore(STORE_NAME, 
kafkaStreams, QueryableStoreTypes.keyValueStore());
+
+                    if (store == null) {
+                        return false;
+                    }
+
+                    final V result = store.get(key);
+                    return result != null && result.equals(value);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
+    }
+
     private <K, V> void verifyLegacyTimestampedValue(final K key,
                                                      final V value,
                                                      final long timestamp)
@@ -296,7 +451,6 @@ public class HeadersStoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
-
     private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final 
K key,
                                                                         final 
V value,
                                                                         final 
long timestamp,
@@ -338,6 +492,48 @@ public class HeadersStoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
+    private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final 
K key,
+                                                                        final 
V value,
+                                                                        final 
long timestamp,
+                                                                        final 
long expectedTimestamp,
+                                                                        final 
Headers headers,
+                                                                        final 
Headers expectedHeaders)
+        throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
+                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+                    if (store == null)
+                        return false;
+
+                    final ValueTimestampHeaders<V> result = store.get(key);
+                    return result != null
+                        && result.value().equals(value)
+                        && result.timestamp() == expectedTimestamp
+                        && result.headers().equals(expectedHeaders);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
+    }
+
     private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
                                                            final V value,
                                                            final long 
timestamp) throws Exception {
@@ -365,6 +561,45 @@ public class HeadersStoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
+    private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
+                                                           final V value) 
throws Exception {
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
+                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+                    if (store == null)
+                        return false;
+
+                    final ValueTimestampHeaders<V> result = store.get(key);
+                    return result != null
+                        && result.value().equals(value)
+                        && result.headers().toArray().length == 0;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
+    }
+
+    private static class KeyValueProcessor implements Processor<String, 
String, Void, Void> {
+        private KeyValueStore<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            store.put(record.key(), record.value());
+        }
+    }
+
     private static class TimestampedKeyValueProcessor implements 
Processor<String, String, Void, Void> {
         private TimestampedKeyValueStore<String, String> store;
 
@@ -547,10 +782,10 @@ public class HeadersStoreUpgradeIntegrationTest {
     }
 
     private void processWindowedKeyValueWithHeadersAndVerify(final String key,
-                                                              final String 
value,
-                                                              final long 
timestamp,
-                                                              final Headers 
headers,
-                                                              final Headers 
expectedHeaders) throws Exception {
+                                                             final String 
value,
+                                                             final long 
timestamp,
+                                                             final Headers 
headers,
+                                                             final Headers 
expectedHeaders) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             inputStream,
             singletonList(KeyValue.pair(key, value)),
@@ -599,8 +834,8 @@ public class HeadersStoreUpgradeIntegrationTest {
     }
 
     private void verifyWindowValueWithEmptyHeaders(final String key,
-                                                    final String value,
-                                                    final long timestamp) 
throws Exception {
+                                                   final String value,
+                                                   final long timestamp) 
throws Exception {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
@@ -681,6 +916,78 @@ public class HeadersStoreUpgradeIntegrationTest {
         }
     }
 
+    @Test
+    public void 
shouldFailDowngradeFromTimestampedKeyValueStoreWithHeadersToPlainKeyValueStore()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateKeyValueStoreWithHeaders(props);
+        kafkaStreams = null;
+
+        // Attempt to downgrade to plain key-value store
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                    Stores.persistentKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+        boolean exceptionThrown = false;
+        try {
+            kafkaStreams.start();
+        } catch (final Exception e) {
+            Throwable cause = e;
+            while (cause != null) {
+                if (cause instanceof ProcessorStateException &&
+                    cause.getMessage() != null &&
+                    cause.getMessage().contains("headers-aware") &&
+                    cause.getMessage().contains("Downgrade")) {
+                    exceptionThrown = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            if (!exceptionThrown) {
+                throw new AssertionError("Expected ProcessorStateException 
about downgrade not being supported, but got: " + e.getMessage(), e);
+            }
+        } finally {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+        }
+
+        if (!exceptionThrown) {
+            throw new AssertionError("Expected ProcessorStateException to be 
thrown when attempting to downgrade from headers-aware to plain key-value 
store");
+        }
+    }
+
+    @Test
+    public void 
shouldSuccessfullyDowngradeFromTimestampedKeyValueStoreWithHeadersToPlainKeyValueStoreAfterCleanup()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateKeyValueStoreWithHeaders(props);
+
+        kafkaStreams.cleanUp(); // Delete local state
+        kafkaStreams = null;
+
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.keyValueStoreBuilder(
+                    Stores.persistentKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyValue("key3", "value3");
+        processKeyValueAndVerifyValue("key4", "value4");
+
+        kafkaStreams.close();
+    }
+
     @Test
     public void 
shouldFailDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStore()
 throws Exception {
         final Properties props = props();
@@ -757,6 +1064,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         kafkaStreams.close();
     }
 
+
     @Test
     public void 
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
 throws Exception {
         final Properties props = props();
@@ -913,6 +1221,19 @@ public class HeadersStoreUpgradeIntegrationTest {
         return CLUSTER.time.milliseconds();
     }
 
+    private void produceRecordWithHeaders(final String key, final String 
value, final long timestamp) throws Exception {
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "test".getBytes());
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(), 
StringSerializer.class, StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+    }
+
     private void setupAndPopulateKeyValueStoreWithHeaders(final Properties 
props) throws Exception {
         final StreamsBuilder headersBuilder = new StreamsBuilder();
         headersBuilder.addStateStore(
@@ -934,17 +1255,4 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         kafkaStreams.close();
     }
-
-    private void produceRecordWithHeaders(final String key, final String 
value, final long timestamp) throws Exception {
-        final Headers headers = new RecordHeaders();
-        headers.add("source", "test".getBytes());
-
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            inputStream,
-            singletonList(KeyValue.pair(key, value)),
-            TestUtils.producerConfig(CLUSTER.bootstrapServers(), 
StringSerializer.class, StringSerializer.class),
-            headers,
-            timestamp,
-            false);
-    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 7170ba9c7e8..06f85595047 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -58,4 +58,30 @@ public interface HeadersBytesStore {
             .put(valueAndTimestamp)
             .array();
     }
+
+    static byte[] convertFromPlainToHeaderFormat(final byte[] value) {
+        if (value == null) {
+            return null;
+        }
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][payload]
+        // For empty headers and timestamp=-1:
+        //   headersSize = varint(0) = [0x00]
+        //   headersBytes = [] (empty, 0 bytes)
+        //   timestamp = -1 (8 bytes)
+        // Result: [0x00][timestamp=-1][payload]
+        final byte[] result = new byte[1 + 8 + value.length];
+        result[0] = 0x00; // empty headers
+        // timestamp = -1 (8 bytes in big-endian)
+        result[1] = (byte) 0xFF;
+        result[2] = (byte) 0xFF;
+        result[3] = (byte) 0xFF;
+        result[4] = (byte) 0xFF;
+        result[5] = (byte) 0xFF;
+        result[6] = (byte) 0xFF;
+        result[7] = (byte) 0xFF;
+        result[8] = (byte) 0xFF;
+        System.arraycopy(value, 0, result, 9, value.length);
+        return result;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersIteratorAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersIteratorAdapter.java
new file mode 100644
index 00000000000..18b277b1156
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersIteratorAdapter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders} 
and
+ * plain {@link org.apache.kafka.streams.state.KeyValueStore}.
+ *
+ * @see PlainToHeadersStoreAdapter
+ */
+
+class PlainToHeadersIteratorAdapter<K> implements KeyValueIterator<K, byte[]> {
+    private final KeyValueIterator<K, byte[]> innerIterator;
+
+    public PlainToHeadersIteratorAdapter(final KeyValueIterator<K, byte[]> 
innerIterator) {
+        this.innerIterator = innerIterator;
+    }
+
+    @Override
+    public void close() {
+        innerIterator.close();
+    }
+
+    @Override
+    public K peekNextKey() {
+        return innerIterator.peekNextKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return innerIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<K, byte[]> next() {
+        final KeyValue<K, byte[]> plainKeyValue = innerIterator.next();
+        if (plainKeyValue == null) {
+            return null;
+        }
+        return KeyValue.pair(plainKeyValue.key, 
convertFromPlainToHeaderFormat(plainKeyValue.value));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
new file mode 100644
index 00000000000..4d762b82392
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and plain {@link KeyValueStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code KeyValueStore} (without 
timestamp or headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to 
translate between
+ * the plain {@code byte[]} format and the timestamped-with-headers {@code 
byte[]} format.
+ *
+ * @see PlainToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class PlainToHeadersStoreAdapter implements KeyValueStore<Bytes, 
byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    PlainToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        if (store instanceof TimestampedBytesStore) {
+            throw new IllegalArgumentException("Provided store must be a plain 
(non-timestamped) key value store, but it is timestamped.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestampAndHeaders) {
+        store.put(key, rawPlainValue(valueWithTimestampAndHeaders));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueWithTimestampAndHeaders) {
+        return convertFromPlainToHeaderFormat(store.putIfAbsent(
+            key,
+            rawPlainValue(valueWithTimestampAndHeaders)));
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueWithTimestampAndHeaders = entry.value;
+            store.put(entry.key, rawPlainValue(valueWithTimestampAndHeaders));
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        return convertFromPlainToHeaderFormat(store.delete(key));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        store.init(stateStoreContext, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
+
+        // Handle KeyQuery: convert byte[] result from plain to headers format
+        if (query instanceof KeyQuery) {
+            final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>) 
query;
+            final QueryResult<byte[]> rawResult = store.query(keyQuery, 
positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final byte[] convertedValue = 
convertFromPlainToHeaderFormat(rawResult.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
convertedValue);
+            } else {
+                result = (QueryResult<R>) rawResult;
+            }
+        } else if (query instanceof RangeQuery) {
+            // Handle RangeQuery: wrap iterator to convert values
+            final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes, 
byte[]>) query;
+            final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                    store.query(rangeQuery, positionBound, config);
+
+            if (rawResult.isSuccess()) {
+                final KeyValueIterator<Bytes, byte[]> convertedIterator =
+                        new 
PlainToHeadersIteratorAdapter<>(rawResult.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
convertedIterator);
+            } else {
+                result = (QueryResult<R>) rawResult;
+            }
+        } else {
+            // For other query types, delegate to the underlying store
+            result = store.query(query, positionBound, config);
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns"
+            );
+        }
+
+        return result;
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return convertFromPlainToHeaderFormat(store.get(key));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
+        return new PlainToHeadersIteratorAdapter<>(store.range(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+                                                        final Bytes to) {
+        return new PlainToHeadersIteratorAdapter<>(store.reverseRange(from, 
to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return new PlainToHeadersIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        return new PlainToHeadersIteratorAdapter<>(store.reverseAll());
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
     final PS prefixKeySerializer) {
+        return new PlainToHeadersIteratorAdapter<>(store.prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return store.approximateNumEntries();
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index b776e34591c..93f32a11691 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -48,15 +48,15 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
     private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
 
     /**
-     * Legacy column family name - must match {@code 
RocksDBTimestampedStore#TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME} 
+     * Legacy column family name - must match {@code 
RocksDBTimestampedStore#TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME}
      */
-
     private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
         RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME;
 
     static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
         "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
 
+
     public RocksDBTimestampedStoreWithHeaders(final String name,
                                               final String metricsScope) {
         super(name, metricsScope);
@@ -71,7 +71,7 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
     @Override
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        // Check if we're upgrading from RocksDBTimestampedStore (which uses 
keyValueWithTimestamp CF)
+        // Check if we're upgrading from RocksDBTimestampedStore or from plain 
RocksDBStore
         final List<byte[]> existingCFs;
         try (final Options options = new Options(dbOptions, new 
ColumnFamilyOptions())) {
             existingCFs = RocksDB.listColumnFamilies(options, 
dbDir.getAbsolutePath());
@@ -79,40 +79,51 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
             throw new ProcessorStateException("Error listing column families 
for store " + name, e);
         }
 
-
-        final boolean upgradingFromTimestampedStore = existingCFs.stream()
+        final boolean hasTimestampedCF = existingCFs.stream()
             .anyMatch(cf -> Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME));
 
-        if (upgradingFromTimestampedStore) {
-            openInUpgradeMode(dbOptions, columnFamilyOptions);
+        if (hasTimestampedCF) {
+            // Upgrading from timestamped store - use 2 CFs: 
LEGACY_TIMESTAMPED + HEADERS
+            openFromTimestampedStore(dbOptions, columnFamilyOptions); // needs 
to check that default-CF has no data
         } else {
-            openInRegularMode(dbOptions, columnFamilyOptions);
+            openFromDefaultStore(dbOptions, columnFamilyOptions);
         }
+
     }
 
-    @SuppressWarnings("SynchronizeOnNonFinalField")
-    @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
-        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
-        final QueryResult<R> result;
+    private void openFromDefaultStore(final DBOptions dbOptions,
+                                      final ColumnFamilyOptions 
columnFamilyOptions) {
 
-        synchronized (position) {
-            result = QueryResult.forUnknownQueryType(query, this);
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+            dbOptions,
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
+        );
 
-            if (config.isCollectExecutionInfo()) {
-                result.addExecutionInfo(
-                    "Handled in " + this.getClass() + " in " + 
(System.nanoTime() - start) + "ns"
+        final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
+        final ColumnFamilyHandle headersCf = columnFamilies.get(1);
+
+        // Check if default CF has data (plain store upgrade)
+        try (final RocksIterator defaultIter = db.newIterator(defaultCf)) {
+            defaultIter.seekToFirst();
+            if (defaultIter.isValid()) {
+                log.info("Opening store {} in upgrade mode from plain key 
value store", name);
+                cfAccessor = new DualColumnFamilyAccessor(
+                    defaultCf,
+                    headersCf,
+                    HeadersBytesStore::convertFromPlainToHeaderFormat,
+                    this
                 );
+            } else {
+                log.info("Opening store {} in regular headers-aware mode", 
name);
+                cfAccessor = new SingleColumnFamilyAccessor(headersCf);
+                defaultCf.close();
             }
-            result.setPosition(position.copy());
         }
-        return result;
     }
 
-    private void openInUpgradeMode(final DBOptions dbOptions,
-                                   final ColumnFamilyOptions 
columnFamilyOptions) {
+    private void openFromTimestampedStore(final DBOptions dbOptions,
+                                          final ColumnFamilyOptions 
columnFamilyOptions) {
         final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
             dbOptions,
             // we have to open the default CF to be able to open the legacy 
CF, but we won't use it
@@ -121,56 +132,71 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
             new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
         );
 
-        verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+        // verify and close empty Default ColumnFamily
+        try (final RocksIterator defaultIter = 
db.newIterator(columnFamilies.get(0))) {
+            defaultIter.seekToFirst();
+            if (defaultIter.isValid()) {
+                // Close all column family handles before throwing
+                columnFamilies.get(0).close();
+                columnFamilies.get(1).close();
+                columnFamilies.get(2).close();
+                throw new ProcessorStateException(
+                    "Inconsistent store state for " + name + ". " +
+                        "Cannot have both plain (DEFAULT) and timestamped data 
simultaneously. " +
+                        "Headers store can upgrade from either plain or 
timestamped format, but not both."
+                );
+            }
+            // close default column family handle
+            columnFamilies.get(0).close();
+        }
 
-        final ColumnFamilyHandle legacyCf = columnFamilies.get(1);
+        final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
         final ColumnFamilyHandle headersCf = columnFamilies.get(2);
 
-        // Check if legacy CF has data
-        try (final RocksIterator legacyIter = db.newIterator(legacyCf)) {
+        // Check if legacy timestamped CF has data
+        try (final RocksIterator legacyIter = 
db.newIterator(legacyTimestampedCf)) {
             legacyIter.seekToFirst();
             if (legacyIter.isValid()) {
-                log.info("Opening store {} in upgrade mode", name);
-                cfAccessor = new DualColumnFamilyAccessor(legacyCf, headersCf,
-                    HeadersBytesStore::convertToHeaderFormat, this);
+                log.info("Opening store {} in upgrade mode from timestamped 
store", name);
+                cfAccessor = new DualColumnFamilyAccessor(
+                    legacyTimestampedCf,
+                    headersCf,
+                    HeadersBytesStore::convertToHeaderFormat,
+                    this
+                );
             } else {
                 log.info("Opening store {} in regular headers-aware mode", 
name);
                 cfAccessor = new SingleColumnFamilyAccessor(headersCf);
                 try {
-                    db.dropColumnFamily(legacyCf);
+                    db.dropColumnFamily(legacyTimestampedCf);
                 } catch (final RocksDBException e) {
                     throw new RuntimeException(e);
                 } finally {
-                    legacyCf.close();
+                    legacyTimestampedCf.close();
                 }
             }
-        } 
+        }
     }
 
-    private void openInRegularMode(final DBOptions dbOptions,
-                                   final ColumnFamilyOptions 
columnFamilyOptions) {
-        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
-            dbOptions,
-            // we have to open the default CF to be able to open the legacy 
CF, but we won't use it
-            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
-            new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
-        );
-
-        verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+    @SuppressWarnings("SynchronizeOnNonFinalField")
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result;
 
-        final ColumnFamilyHandle headersCf = columnFamilies.get(1);
-        log.info("Opening store {} in regular headers-aware mode", name);
-        cfAccessor = new SingleColumnFamilyAccessor(headersCf);
-    }
+        synchronized (position) {
+            result = QueryResult.forUnknownQueryType(query, this);
 
-    private void verifyAndCloseEmptyDefaultColumnFamily(final 
ColumnFamilyHandle columnFamilyHandle) {
-        try (columnFamilyHandle; final RocksIterator defaultIter = 
db.newIterator(columnFamilyHandle)) {
-            defaultIter.seekToFirst();
-            if (defaultIter.isValid()) {
-                throw new ProcessorStateException("Cannot upgrade directly 
from key-value store to headers-aware store for " + name + ". " +
-                    "Please first upgrade to RocksDBTimestampedStore, then 
upgrade to RocksDBTimestampedStoreWithHeaders.");
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + this.getClass() + " in " + 
(System.nanoTime() - start) + "ns"
+                );
             }
+            result.setPosition(position.copy());
         }
+        return result;
     }
 
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index 8e7c43a9ee8..d2b294b6716 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 
@@ -65,7 +66,12 @@ public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
 
         if (!(store instanceof HeadersBytesStore)) {
             if (store.persistent()) {
-                store = new TimestampedToHeadersStoreAdapter(store);
+                // Persistent store: use adapter based on whether it's 
timestamped or plain
+                if (store instanceof TimestampedBytesStore) {
+                    store = new TimestampedToHeadersStoreAdapter(store);
+                } else {
+                    store = new PlainToHeadersStoreAdapter(store);
+                }
             } else {
                 store = new 
InMemoryTimestampedKeyValueStoreWithHeadersMarker(store);
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
new file mode 100644
index 00000000000..6afce4a92be
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+
+    /**
+     * Extract raw plain value from serialized ValueTimestampHeaders.
+     * This strips both the headers and timestamp portions.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [value]
+     */
+    static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers and timestamp (8 bytes)
+        buffer.position(buffer.position() + headersSize + 8);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
index 1ce81ebbead..37cacde22c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
@@ -64,4 +64,69 @@ public class HeadersBytesStoreTest {
         assertEquals(0, headersSize, "Empty headers should have headersSize = 
0");
         assertEquals(0, buffer.remaining(), "No payload bytes for empty 
value");
     }
+
+    @Test
+    public void shouldReturnNullWhenConvertingNullPlainValue() {
+        final byte[] result = 
HeadersBytesStore.convertFromPlainToHeaderFormat(null);
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldConvertPlainValueToHeaderFormatWithTimestamp() {
+        final byte[] plainValue = "test-value".getBytes();
+
+        final byte[] converted = 
HeadersBytesStore.convertFromPlainToHeaderFormat(plainValue);
+
+        assertNotNull(converted);
+        // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)][value]
+        assertEquals(1 + 8 + plainValue.length, converted.length);
+
+        // Verify empty headers marker
+        assertEquals(0x00, converted[0], "First byte for empty header should 
be 0x00");
+
+        // Verify timestamp = -1 (all 0xFF bytes)
+        for (int i = 1; i <= 8; i++) {
+            assertEquals((byte) 0xFF, converted[i], "Timestamp byte " + (i - 
1) + " should be 0xFF for -1");
+        }
+
+        // Verify payload
+        final byte[] actualPayload = Arrays.copyOfRange(converted, 9, 
converted.length);
+        assertArrayEquals(plainValue, actualPayload);
+    }
+
+    @Test
+    public void shouldConvertEmptyPlainValueToHeaderFormat() {
+        final byte[] emptyValue = new byte[0];
+
+        final byte[] converted = 
HeadersBytesStore.convertFromPlainToHeaderFormat(emptyValue);
+
+        assertNotNull(converted);
+        // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)]
+        assertEquals(9, converted.length, "Converted empty value should have 
headers + timestamp");
+
+        final ByteBuffer buffer = ByteBuffer.wrap(converted);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        assertEquals(0, headersSize, "Empty headers should have headersSize = 
0");
+
+        // Verify timestamp = -1
+        final long timestamp = buffer.getLong();
+        assertEquals(-1L, timestamp, "Timestamp should be -1 for plain value 
upgrade");
+    }
+
+    @Test
+    public void shouldConvertPlainValueWithCorrectByteOrder() {
+        final byte[] plainValue = new byte[]{0x01, 0x02, 0x03};
+
+        final byte[] converted = 
HeadersBytesStore.convertFromPlainToHeaderFormat(plainValue);
+
+        // Expected: [0x00][0xFF x 8][0x01, 0x02, 0x03]
+        final byte[] expected = new byte[]{
+            0x00,                                           // empty headers
+            (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF,  // timestamp 
-1 (high 4 bytes)
+            (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF,  // timestamp 
-1 (low 4 bytes)
+            0x01, 0x02, 0x03                                // payload
+        };
+
+        assertArrayEquals(expected, converted);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapterTest.java
new file mode 100644
index 00000000000..72cd7e9e2ae
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapterTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class PlainToHeadersStoreAdapterTest {
+
+    @Mock
+    private KeyValueStore<Bytes, byte[]> mockStore;
+
+    @Mock
+    private KeyValueIterator<Bytes, byte[]> mockIterator;
+
+    private PlainToHeadersStoreAdapter adapter;
+
+    private PlainToHeadersStoreAdapter createAdapter() {
+        when(mockStore.persistent()).thenReturn(true);
+        return new PlainToHeadersStoreAdapter(mockStore);
+    }
+
+    @Test
+    public void shouldThrowIfStoreIsNotPersistent() {
+        when(mockStore.persistent()).thenReturn(false);
+
+        final IllegalArgumentException exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> new PlainToHeadersStoreAdapter(mockStore)
+        );
+
+        assertTrue(exception.getMessage().contains("Provided store must be a 
persistent store"));
+    }
+
+    @Test
+    public void shouldThrowIfStoreIsTimestamped() {
+        final RocksDBTimestampedStore timestampedStore = new 
RocksDBTimestampedStore("test", "scope");
+
+        final IllegalArgumentException exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> new PlainToHeadersStoreAdapter(timestampedStore)
+        );
+
+        assertTrue(exception.getMessage().contains("Provided store must be a 
plain (non-timestamped)"));
+    }
+
+    @Test
+    public void shouldPutRawPlainValueToStore() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("key".getBytes());
+        final byte[] plainValue = "value".getBytes();
+        final byte[] valueWithHeaders = 
convertFromPlainToHeaderFormat(plainValue);
+
+        adapter.put(key, valueWithHeaders);
+
+        verify(mockStore).put(eq(key), eq(plainValue));
+    }
+
+    @Test
+    public void shouldGetAndConvertFromPlainToHeaderFormat() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("key".getBytes());
+        final byte[] plainValue = "value".getBytes();
+        when(mockStore.get(key)).thenReturn(plainValue);
+
+        final byte[] result = adapter.get(key);
+
+        assertArrayEquals(convertFromPlainToHeaderFormat(plainValue), result);
+    }
+
+    @Test
+    public void shouldReturnNullWhenStoreReturnsNull() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("key".getBytes());
+        when(mockStore.get(key)).thenReturn(null);
+
+        final byte[] result = adapter.get(key);
+
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldPutIfAbsentAndConvertResult() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("key".getBytes());
+        final byte[] plainValue = "value".getBytes();
+        final byte[] valueWithHeaders = 
convertFromPlainToHeaderFormat(plainValue);
+        final byte[] oldPlainValue = "oldValue".getBytes();
+        when(mockStore.putIfAbsent(eq(key), 
eq(plainValue))).thenReturn(oldPlainValue);
+
+        final byte[] result = adapter.putIfAbsent(key, valueWithHeaders);
+
+        assertArrayEquals(convertFromPlainToHeaderFormat(oldPlainValue), 
result);
+    }
+
+    @Test
+    public void shouldDeleteAndConvertResult() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("key".getBytes());
+        final byte[] oldPlainValue = "oldValue".getBytes();
+        when(mockStore.delete(key)).thenReturn(oldPlainValue);
+
+        final byte[] result = adapter.delete(key);
+
+        assertArrayEquals(convertFromPlainToHeaderFormat(oldPlainValue), 
result);
+    }
+
+    @Test
+    public void shouldPutAllEntries() {
+        adapter = createAdapter();
+        final Bytes key1 = new Bytes("key1".getBytes());
+        final Bytes key2 = new Bytes("key2".getBytes());
+        final byte[] value1 = 
convertFromPlainToHeaderFormat("value1".getBytes());
+        final byte[] value2 = 
convertFromPlainToHeaderFormat("value2".getBytes());
+
+        final List<KeyValue<Bytes, byte[]>> entries = Arrays.asList(
+            KeyValue.pair(key1, value1),
+            KeyValue.pair(key2, value2)
+        );
+
+        adapter.putAll(entries);
+
+        verify(mockStore).put(eq(key1), eq("value1".getBytes()));
+        verify(mockStore).put(eq(key2), eq("value2".getBytes()));
+    }
+
+    @Test
+    public void shouldWrapRangeIterator() {
+        adapter = createAdapter();
+        final Bytes from = new Bytes("a".getBytes());
+        final Bytes to = new Bytes("z".getBytes());
+        when(mockStore.range(from, to)).thenReturn(mockIterator);
+
+        final KeyValueIterator<Bytes, byte[]> result = adapter.range(from, to);
+
+        assertNotNull(result);
+        assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+    }
+
+    @Test
+    public void shouldWrapReverseRangeIterator() {
+        adapter = createAdapter();
+        final Bytes from = new Bytes("a".getBytes());
+        final Bytes to = new Bytes("z".getBytes());
+        when(mockStore.reverseRange(from, to)).thenReturn(mockIterator);
+
+        final KeyValueIterator<Bytes, byte[]> result = 
adapter.reverseRange(from, to);
+
+        assertNotNull(result);
+        assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+    }
+
+    @Test
+    public void shouldWrapAllIterator() {
+        adapter = createAdapter();
+        when(mockStore.all()).thenReturn(mockIterator);
+
+        final KeyValueIterator<Bytes, byte[]> result = adapter.all();
+
+        assertNotNull(result);
+        assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+    }
+
+    @Test
+    public void shouldWrapReverseAllIterator() {
+        adapter = createAdapter();
+        when(mockStore.reverseAll()).thenReturn(mockIterator);
+
+        final KeyValueIterator<Bytes, byte[]> result = adapter.reverseAll();
+
+        assertNotNull(result);
+        assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+    }
+
+    @Test
+    public void shouldWrapPrefixScanIterator() {
+        adapter = createAdapter();
+        when(mockStore.prefixScan(any(), any())).thenReturn(mockIterator);
+
+        final KeyValueIterator<Bytes, byte[]> result = 
adapter.prefixScan("prefix", (topic, data) -> data.getBytes());
+
+        assertNotNull(result);
+        assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+    }
+
+    @Test
+    public void shouldHandleKeyQuery() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("test-key".getBytes());
+        final byte[] plainValue = "test-value".getBytes();
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+        final QueryResult<byte[]> mockResult = 
QueryResult.forResult(plainValue);
+        when(mockStore.query(eq(query), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn(mockResult);
+
+        final QueryResult<byte[]> result = adapter.query(query, 
PositionBound.unbounded(), new QueryConfig(false));
+
+        assertTrue(result.isSuccess());
+        assertArrayEquals(convertFromPlainToHeaderFormat(plainValue), 
result.getResult());
+    }
+
+    @Test
+    public void shouldHandleKeyQueryWithNullResult() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("test-key".getBytes());
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+        final QueryResult<byte[]> mockResult = QueryResult.forResult(null);
+        when(mockStore.query(eq(query), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn(mockResult);
+
+        final QueryResult<byte[]> result = adapter.query(query, 
PositionBound.unbounded(), new QueryConfig(false));
+
+        assertTrue(result.isSuccess());
+        assertNull(result.getResult());
+    }
+
+    @Test
+    public void shouldHandleFailedKeyQuery() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("test-key".getBytes());
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+        final QueryResult<byte[]> mockResult = 
QueryResult.forUnknownQueryType(query, mockStore);
+        when(mockStore.query(eq(query), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn(mockResult);
+
+        final QueryResult<byte[]> result = adapter.query(query, 
PositionBound.unbounded(), new QueryConfig(false));
+
+        assertFalse(result.isSuccess());
+    }
+
+    @Test
+    public void shouldHandleRangeQuery() {
+        adapter = createAdapter();
+        final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(
+            new Bytes("a".getBytes()),
+            new Bytes("z".getBytes())
+        );
+
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> mockResult = 
QueryResult.forResult(mockIterator);
+        when(mockStore.query(eq(query), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn(mockResult);
+
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> result = 
adapter.query(
+            query,
+            PositionBound.unbounded(),
+            new QueryConfig(false)
+        );
+
+        assertTrue(result.isSuccess());
+        assertNotNull(result.getResult());
+        assertTrue(result.getResult() instanceof 
PlainToHeadersIteratorAdapter);
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoForKeyQuery() {
+        adapter = createAdapter();
+        final Bytes key = new Bytes("test-key".getBytes());
+        final byte[] plainValue = "test-value".getBytes();
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+        final QueryResult<byte[]> mockResult = 
QueryResult.forResult(plainValue);
+        when(mockStore.query(eq(query), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn(mockResult);
+
+        final QueryResult<byte[]> result = adapter.query(query, 
PositionBound.unbounded(), new QueryConfig(true));
+
+        assertTrue(result.isSuccess());
+        assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution 
info to be collected");
+        final String executionInfo = String.join("\n", 
result.getExecutionInfo());
+        assertTrue(executionInfo.contains("Handled in"), "Expected execution 
info to contain handling information");
+        
assertTrue(executionInfo.contains(PlainToHeadersStoreAdapter.class.getName()),
+            "Expected execution info to mention PlainToHeadersStoreAdapter");
+    }
+
+    @Test
+    public void shouldCollectExecutionInfoForRangeQuery() {
+        adapter = createAdapter();
+        final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(
+            new Bytes("a".getBytes()),
+            new Bytes("z".getBytes())
+        );
+
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> mockResult = 
QueryResult.forResult(mockIterator);
+        when(mockStore.query(eq(query), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn(mockResult);
+
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> result = 
adapter.query(
+            query,
+            PositionBound.unbounded(),
+            new QueryConfig(true)
+        );
+
+        assertTrue(result.isSuccess());
+        assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution 
info to be collected");
+        final String executionInfo = String.join("\n", 
result.getExecutionInfo());
+        assertTrue(executionInfo.contains("Handled in"), "Expected execution 
info to contain handling information");
+    }
+
+    @Test
+    public void shouldDelegateOtherQueryTypesToUnderlyingStore() {
+        adapter = createAdapter();
+        // Create a custom query type that's not KeyQuery or RangeQuery
+        final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("key".getBytes()));
+
+        final QueryResult<byte[]> mockResult = 
QueryResult.forUnknownQueryType(query, mockStore);
+        when(mockStore.query(eq(query), any(PositionBound.class), 
any(QueryConfig.class)))
+            .thenReturn(mockResult);
+
+        final QueryResult<byte[]> result = adapter.query(query, 
PositionBound.unbounded(), new QueryConfig(false));
+
+        // The result should be passed through from the underlying store
+        assertFalse(result.isSuccess());
+    }
+
+    @Test
+    public void shouldDelegateName() {
+        when(mockStore.name()).thenReturn("test-store");
+        adapter = createAdapter();
+
+        assertEquals("test-store", adapter.name());
+    }
+
+    @Test
+    public void shouldReturnTrueForPersistent() {
+        adapter = createAdapter();
+
+        assertTrue(adapter.persistent());
+    }
+
+    @Test
+    public void shouldDelegateIsOpen() {
+        when(mockStore.isOpen()).thenReturn(true);
+        adapter = createAdapter();
+
+        assertTrue(adapter.isOpen());
+    }
+
+    @Test
+    public void shouldDelegateApproximateNumEntries() {
+        when(mockStore.approximateNumEntries()).thenReturn(42L);
+        adapter = createAdapter();
+
+        assertEquals(42L, adapter.approximateNumEntries());
+    }
+
+    @Test
+    public void shouldDelegateGetPosition() {
+        adapter = createAdapter();
+        when(mockStore.getPosition()).thenReturn(null);
+
+        adapter.getPosition();
+
+        verify(mockStore).getPosition();
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index 14c13b25d9a..398b630b764 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -134,16 +134,172 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
     }
 
     @Test
-    public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+    public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws 
Exception {
+        prepareKeyValueStoreWithMultipleKeys();
+
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+        // get() - tests lazy migration on read
+
+        assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())), 
"Expected null for unknown key");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries on DEFAULT CF, 0 in headers-aware CF");
+
+        assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new 
Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(1) = 10 bytes");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6 
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+        // put() - tests migration on write
+
+        rocksDBStore.put(new Bytes("key2".getBytes()), 
"headers+timestamp+22".getBytes());
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5 
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+        rocksDBStore.put(new Bytes("key3".getBytes()), null);
+        // count is off by one, due to two delete operations (even if one does 
not delete anything)
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+        rocksDBStore.put(new Bytes("key8new".getBytes()), 
"headers+timestamp+88888888".getBytes());
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with 
put()");
+
+        rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+        // one delete on old CF, one put on new CF, but count is off by two 
due to deletes not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with 
put()");
+
+        // putIfAbsent() - tests migration on conditional write
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()), 
"headers+timestamp+11111111111".getBytes()),
+            "Expected null return value for putIfAbsent on non-existing 
key11new, and new key should be added to headers-aware CF");
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with 
putIfAbsent()");
+
+        assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new 
Bytes("key5".getBytes()), null).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+        // one delete on old CF, one put on new CF, due to `get()` migration
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with 
putIfAbsent(null)");
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()), 
null));
+        // no delete operation, because key12new is unknown
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on 
non-existing key12new");
+
+        // delete() - tests migration on delete
+
+        assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new 
Bytes("key6".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+        // two delete operation, however, only one is counted because old CF 
count was zero before already
+        assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2 
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with 
delete()");
+
+        // iterators should not trigger migration (read-only)
+        iteratorsShouldNotMigrateDataFromPlain();
+        assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+        rocksDBStore.close();
+
+        // Verify the final state of both column families
+        verifyPlainUpgradeColumnFamilies();
+    }
+
+    @Test
+    public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
         // Prepare a plain key-value store (with data in default column family)
         prepareKeyValueStore();
 
-        // Try to open with RocksDBTimestampedStoreWithHeaders - should throw 
exception
-        final ProcessorStateException exception = 
assertThrows(ProcessorStateException.class,
-            () -> rocksDBStore.init(context, rocksDBStore));
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        // Verify we can read the migrated data
+        assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+        assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+
+        rocksDBStore.close();
+
+        // Verify column family structure
+        verifyPlainStoreUpgrade();
+    }
+
+    private void verifyPlainStoreUpgrade() throws Exception {
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        RocksDB db = null;
+        ColumnFamilyHandle defaultCF = null;
+        ColumnFamilyHandle headersCF = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            defaultCF = columnFamilies.get(0);
+            headersCF = columnFamilies.get(1);
+
+            // After get() is called, data is migrated and deleted from 
DEFAULT CF
+            // DEFAULT CF should be empty for migrated keys
+            assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to 
be deleted from DEFAULT CF after migration");
+            assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to 
be deleted from DEFAULT CF after migration");
 
-        assertTrue(exception.getMessage().contains("Cannot upgrade directly 
from key-value store to headers-aware store"));
-        assertTrue(exception.getMessage().contains("Please first upgrade to 
RocksDBTimestampedStore"));
+            // Headers CF should have the migrated data
+            assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
db.get(headersCF, "key1".getBytes()).length);
+            assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
db.get(headersCF, "key2".getBytes()).length);
+        } finally {
+            if (defaultCF != null) {
+                defaultCF.close();
+            }
+            if (headersCF != null) {
+                headersCF.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    @Test
+    public void shouldFailWhenBothPlainAndTimestampedDataExist() {
+        // This is an invalid state - we can't have both DEFAULT CF with data 
AND LEGACY_TIMESTAMPED CF
+        // Step 1: Create a plain store with two keys
+        final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+        kvStore.init(context, kvStore);
+        kvStore.put(new Bytes("plainKey1".getBytes()), "value1".getBytes());
+        kvStore.put(new Bytes("plainKey2".getBytes()), "value2".getBytes());
+        kvStore.close();
+
+        // Step 2: Open as timestamped store and migrate one key (simulating 
partial migration)
+        final RocksDBTimestampedStore timestampedStore = new 
RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE);
+        timestampedStore.init(context, timestampedStore);
+        timestampedStore.get(new Bytes("plainKey1".getBytes())); // Triggers 
migration of plainKey1 to LEGACY_TIMESTAMPED CF
+        timestampedStore.close();
+        // Now we have: plainKey2 in DEFAULT CF, plainKey1 in 
LEGACY_TIMESTAMPED CF
+
+        // Step 3: Try to open with headers store - should fail
+        final ProcessorStateException exception = assertThrows(
+            ProcessorStateException.class,
+            () -> rocksDBStore.init(context, rocksDBStore)
+        );
+
+        assertTrue(exception.getMessage().contains("Inconsistent store 
state"));
+        assertTrue(exception.getMessage().contains("Cannot have both plain 
(DEFAULT) and timestamped data simultaneously"));
+        assertTrue(exception.getMessage().contains("Headers store can upgrade 
from either plain or timestamped format, but not both."));
     }
 
     @Test
@@ -154,7 +310,7 @@ public class RocksDBTimestampedStoreWithHeadersTest extends 
RocksDBStoreTest {
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
             rocksDBStore.init(context, rocksDBStore);
 
-            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode"));
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from timestamped store"));
         }
 
         assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries in legacy CF and 0 in headers-aware CF before migration");
@@ -462,16 +618,126 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
     }
 
     private void verifyStillInUpgradeMode() {
-        // check that still in upgrade mode
+        // check that still in upgrade mode from timestamped store
         try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
             rocksDBStore.init(context, rocksDBStore);
 
-            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode"));
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from timestamped store"));
         } finally {
             rocksDBStore.close();
         }
     }
 
+    private void iteratorsShouldNotMigrateDataFromPlain() {
+        // iterating should not migrate any data, but return all keys over 
both CFs
+        // Values from DEFAULT CF are converted to header-aware format on the 
fly: [0x00][timestamp=-1][value]
+        try (final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all()) 
{
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key1".getBytes(), keyValue.key.get()); // 
migrated by get()
+                assertEquals(10, keyValue.value.length, "Expected header-aware 
format: varint(1) + empty headers(0) + timestamp(8) + value(1) = 10 bytes for 
key1 from headers-aware CF");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key11new".getBytes(), keyValue.key.get()); 
// inserted by putIfAbsent()
+                assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r', 
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1', 
'1', '1', '1', '1', '1', '1', '1'}, keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key2".getBytes(), keyValue.key.get()); // 
migrated by put()
+                assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r', 
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, 
keyValue.value);
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key4".getBytes(), keyValue.key.get()); // 
not migrated since not accessed, should be converted on-the-fly from DEFAULT CF
+                assertEquals(13, keyValue.value.length,
+                    "Expected header-aware format: varint(1) + empty 
headers(0) + timestamp(8) + value(4) = 13 bytes for key4 from DEFAULT CF");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key5".getBytes(), keyValue.key.get()); // 
migrated by putIfAbsent with null value
+                assertEquals(14, keyValue.value.length, "Expected header-aware 
format: varint(1) + empty headers(0) + timestamp(8) + value(5) = 14 bytes for 
key5 from headers-aware CF");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key7".getBytes(), keyValue.key.get()); // 
not migrated since not accessed
+                assertEquals(16, keyValue.value.length, "Expected header-aware 
format: varint(1) + empty headers(0) + timestamp(8) + value(7) = 16 bytes for 
key7 from DEFAULT CF");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key8new".getBytes(), keyValue.key.get()); 
// inserted by put()
+                assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r', 
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8', 
'8', '8', '8', '8'}, keyValue.value);
+            }
+            assertFalse(itAll.hasNext());
+        }
+    }
+
+    private void verifyPlainUpgradeColumnFamilies() throws Exception {
+        // In upgrade scenario from plain RocksDBStore,
+        // we expect 2 CFs: DEFAULT (legacy), keyValueWithTimestampAndHeaders 
(new)
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        RocksDB db = null;
+        ColumnFamilyHandle defaultCF = null;
+        ColumnFamilyHandle headersCF = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            defaultCF = columnFamilies.get(0);
+            headersCF = columnFamilies.get(1);
+
+            // DEFAULT CF should have migrated keys as null, un-migrated as 
plain values
+            assertNull(db.get(defaultCF, "unknown".getBytes()));
+            assertNull(db.get(defaultCF, "key1".getBytes())); // migrated
+            assertNull(db.get(defaultCF, "key2".getBytes())); // migrated
+            assertNull(db.get(defaultCF, "key3".getBytes())); // deleted
+            assertEquals(4, db.get(defaultCF, "key4".getBytes()).length); // 
not migrated
+            assertNull(db.get(defaultCF, "key5".getBytes())); // migrated
+            assertNull(db.get(defaultCF, "key6".getBytes())); // migrated
+            assertEquals(7, db.get(defaultCF, "key7".getBytes()).length); // 
not migrated
+            assertNull(db.get(defaultCF, "key8new".getBytes()));
+            assertNull(db.get(defaultCF, "key9new".getBytes()));
+            assertNull(db.get(defaultCF, "key11new".getBytes()));
+            assertNull(db.get(defaultCF, "key12new".getBytes()));
+
+            // Headers CF should have all migrated/new keys with header-aware 
format
+            assertNull(db.get(headersCF, "unknown".getBytes()));
+            assertEquals(1 + 0 + 8 + 1, db.get(headersCF, 
"key1".getBytes()).length); // migrated by get()
+            assertEquals("headers+timestamp+22".getBytes().length, 
db.get(headersCF, "key2".getBytes()).length); // migrated by put() => value is 
inserted without any conversion
+            assertNull(db.get(headersCF, "key3".getBytes())); // deleted
+            assertNull(db.get(headersCF, "key4".getBytes())); // not migrated
+            assertEquals(1 + 0 + 8 + 5, db.get(headersCF, 
"key5".getBytes()).length); // migrated by putIfAbsent
+            assertNull(db.get(headersCF, "key6".getBytes())); // migrated by 
delete() => deleted
+            assertNull(db.get(headersCF, "key7".getBytes())); // not migrated
+            assertEquals("headers+timestamp+88888888".getBytes().length, 
db.get(headersCF, "key8new".getBytes()).length); // added by put() => value is 
inserted without any conversion
+            assertNull(db.get(headersCF, "key9new".getBytes()));
+            assertEquals("headers+timestamp+11111111111".getBytes().length, 
db.get(headersCF, "key11new".getBytes()).length); // inserted (newly added) by 
putIfAbsent() => value is inserted without any conversion
+            assertNull(db.get(headersCF, "key12new".getBytes()));
+        } finally {
+            if (defaultCF != null) {
+                defaultCF.close();
+            }
+            if (headersCF != null) {
+                headersCF.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
     private void clearLegacyColumnFamily() throws Exception {
         // clear legacy timestamped CF by deleting remaining keys
         final DBOptions dbOptions = new DBOptions();
@@ -562,6 +828,30 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         }
     }
 
+    @Test
+    public void shouldTransitionFromPlainUpgradeModeToRegularMode() {
+        // Prepare plain store
+        prepareKeyValueStore();
+
+        // Open in upgrade mode
+        rocksDBStore.init(context, rocksDBStore);
+
+        // Migrate all data by reading it (lazy migration deletes from DEFAULT 
CF)
+        rocksDBStore.get(new Bytes("key1".getBytes()));
+        rocksDBStore.get(new Bytes("key2".getBytes()));
+
+        rocksDBStore.close();
+
+        // Reopen - should now be in regular mode since DEFAULT CF is empty 
after migration
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in regular headers-aware mode"));
+        } finally {
+            rocksDBStore.close();
+        }
+    }
+
     @Test
     public void shouldNotSupportDowngradeFromHeadersAwareToRegularStore() {
         // prepare headers-aware store with data
@@ -622,6 +912,25 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         }
     }
 
+    private void prepareKeyValueStoreWithMultipleKeys() {
+        // Create a plain RocksDBStore with multiple keys for comprehensive 
migration testing
+        final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+        try {
+            kvStore.init(context, kvStore);
+
+            // Write plain key-value pairs to default column family
+            kvStore.put(new Bytes("key1".getBytes()), "1".getBytes());
+            kvStore.put(new Bytes("key2".getBytes()), "22".getBytes());
+            kvStore.put(new Bytes("key3".getBytes()), "333".getBytes());
+            kvStore.put(new Bytes("key4".getBytes()), "4444".getBytes());
+            kvStore.put(new Bytes("key5".getBytes()), "55555".getBytes());
+            kvStore.put(new Bytes("key6".getBytes()), "666666".getBytes());
+            kvStore.put(new Bytes("key7".getBytes()), "7777777".getBytes());
+        } finally {
+            kvStore.close();
+        }
+    }
+
     private void prepareTimestampedStore() {
         // Create a legacy RocksDBTimestampedStore to test upgrade scenario
         final RocksDBTimestampedStore timestampedStore = new 
RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
index 8330c746e31..77a3be9ca56 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -211,24 +211,15 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
     }
 
     @Test
-    public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
-        when(supplier.name()).thenReturn("test-store");
-        when(supplier.metricsScope()).thenReturn("metricScope");
-        when(supplier.get()).thenReturn(new RocksDBStore("test-store", 
"metrics-scope"));
-
-        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
-                supplier,
-                Serdes.String(),
-                Serdes.String(),
-                new MockTime()
-        );
-
-        final IllegalArgumentException exception = assertThrows(
-                IllegalArgumentException.class,
-                () -> 
builder.withLoggingDisabled().withCachingDisabled().build()
-        );
+    public void shouldWrapPlainKeyValueStoreAsHeadersStore() {
+        setUp();
+        when(supplier.get()).thenReturn(new RocksDBStore("name", 
"metrics-scope"));
 
-        assertTrue(exception.getMessage().contains("Provided store must be a 
timestamped store"));
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
+            .withLoggingDisabled()
+            .withCachingDisabled()
+            .build();
+        assertInstanceOf(PlainToHeadersStoreAdapter.class, 
((WrappedStateStore) store).wrapped());
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
new file mode 100644
index 00000000000..561743d49d5
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+
+public class UtilsTest {
+
+    @Test
+    public void shouldExtractRawPlainValue() {
+        // Format: [headersSize(varint)][headers][timestamp(8)][value]
+        // Create a value with headers size=0, timestamp=-1, value="test"
+        final byte[] value = "test".getBytes();
+        final ByteBuffer buffer = ByteBuffer.allocate(1 + 8 + value.length);
+        buffer.put((byte) 0x00); // headers size = 0
+        buffer.putLong(-1L); // timestamp = -1
+        buffer.put(value);
+
+        final byte[] result = rawPlainValue(buffer.array());
+
+        assertArrayEquals(value, result);
+    }
+
+    @Test
+    public void shouldReturnNullForNullRawPlainValue() {
+        assertNull(rawPlainValue(null));
+    }
+
+}
\ No newline at end of file

Reply via email to