This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0be98d7e502 MINOR: Add store type (headers, timestamped, KV) downgrade 
tests (#21503)
0be98d7e502 is described below

commit 0be98d7e50234252c6e3ef31856d5cb8d3d095fa
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Feb 24 19:31:23 2026 -0800

    MINOR: Add store type (headers, timestamped, KV) downgrade tests (#21503)
    
    - Couple of smaller cleanups and adding missing test case.
    - Aligning existing test for ts-store to new header-store test.
    - Add an optimization for `putIfAbsent` in top level RocksDB store.
    - Add new unit and integration tests for downgrading.
    
    Reviewers: Alieh Saeedi <[email protected]>, Bill Bejeck
     <[email protected]>
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../integration/StoreUpgradeIntegrationTest.java   | 104 +++++++++++++++++++++
 .../streams/state/internals/RocksDBStore.java      |  36 ++++++-
 .../RocksDBTimestampedStoreWithHeaders.java        |   8 +-
 .../internals/RocksDBTimestampedStoreTest.java     |  74 ++++++++++-----
 .../RocksDBTimestampedStoreWithHeadersTest.java    |  68 ++++++++++++--
 5 files changed, 258 insertions(+), 32 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index f0286d4fe93..cb248138d0e 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -953,6 +954,109 @@ public class StoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
+    @Test
+    public void shouldFailDowngradeFromTimestampedToRegularKeyValueStore() 
throws Exception {
+        final Properties props = props();
+        setupAndPopulateTimestampedStore(props);
+        kafkaStreams = null;
+
+        // Attempt to downgrade to regular key-value store - this should fail
+        final StreamsBuilder streamsBuilderForRegularStore = new 
StreamsBuilder();
+
+        streamsBuilderForRegularStore.addStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore(STORE_NAME),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForRegularStore.build(), 
props);
+
+        boolean exceptionThrown = false;
+        try {
+            kafkaStreams.start();
+        } catch (final Exception e) {
+            Throwable cause = e;
+            while (cause != null) {
+                if (cause instanceof ProcessorStateException &&
+                    cause.getMessage() != null &&
+                    cause.getMessage().contains("timestamped key-value store") 
&&
+                    cause.getMessage().contains("Downgrade from timestamped to 
regular store is not supported")) {
+                    exceptionThrown = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            if (!exceptionThrown) {
+                throw new AssertionError("Expected ProcessorStateException 
about downgrade not being supported, but got: " + e.getMessage(), e);
+            }
+        } finally {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+        }
+
+        if (!exceptionThrown) {
+            throw new AssertionError("Expected ProcessorStateException to be 
thrown when attempting to downgrade from timestamped to regular store");
+        }
+    }
+
+    @Test
+    public void 
shouldSuccessfullyDowngradeFromTimestampedToRegularKeyValueStoreAfterCleanup() 
throws Exception {
+        final Properties props = props();
+        setupAndPopulateTimestampedStore(props);
+
+        kafkaStreams.cleanUp(); // Delete local state
+        kafkaStreams = null;
+
+        // Now downgrade to regular key-value store - this should succeed 
because we cleaned up
+        final StreamsBuilder streamsBuilderForRegularStore = new 
StreamsBuilder();
+
+        streamsBuilderForRegularStore.addStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.persistentKeyValueStore(STORE_NAME),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForRegularStore.build(), 
props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyPlainCount(3, asList(
+            KeyValue.pair(1, 1L),
+            KeyValue.pair(2, 1L),
+            KeyValue.pair(3, 1L)));
+
+        kafkaStreams.close();
+    }
+
+    private void setupAndPopulateTimestampedStore(final Properties props) 
throws Exception {
+        final StreamsBuilder streamsBuilderForTimestampedStore = new 
StreamsBuilder();
+
+        streamsBuilderForTimestampedStore.addStateStore(
+            Stores.timestampedKeyValueStoreBuilder(
+                Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                Serdes.Integer(),
+                Serdes.Long()))
+            .<Integer, Integer>stream(inputStream)
+            .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new 
KafkaStreams(streamsBuilderForTimestampedStore.build(), props);
+        kafkaStreams.start();
+
+        final long timestamp1 = CLUSTER.time.milliseconds();
+        processKeyValueAndVerifyCountWithTimestamp(1, timestamp1, 
singletonList(
+            KeyValue.pair(1, ValueAndTimestamp.make(1L, timestamp1))));
+
+        final long timestamp2 = CLUSTER.time.milliseconds() + 10;
+        processKeyValueAndVerifyCountWithTimestamp(2, timestamp2, asList(
+            KeyValue.pair(1, ValueAndTimestamp.make(1L, timestamp1)),
+            KeyValue.pair(2, ValueAndTimestamp.make(1L, timestamp2))));
+
+        kafkaStreams.close();
+    }
+
     private static class KeyValueProcessor implements Processor<Integer, 
Integer, Void, Void> {
         private KeyValueStore<Integer, Long> store;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d4705ad62d1..debc2078c62 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -69,6 +69,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -299,6 +300,39 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         try {
             final List<byte[]> allExisting = 
RocksDB.listColumnFamilies(userSpecifiedOptions, absolutePath);
 
+            // Check for unexpected column families
+            for (final byte[] existingFamily : allExisting) {
+                final boolean isExpected = allDescriptors.stream()
+                        .anyMatch(descriptor -> 
Arrays.equals(descriptor.getName(), existingFamily));
+                if (!isExpected) {
+                    if (Arrays.equals(existingFamily, 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME)) {
+                        throw new ProcessorStateException(
+                                "Store " + name + " is a timestamped key-value 
store and cannot be opened as a regular key-value store. " +
+                                "Downgrade from timestamped to regular store 
is not supported directly. " +
+                                "To downgrade, you can delete the local state 
in the state directory, and rebuild the store as regular key-value store from 
the changelog.");
+                    }
+                    if (Arrays.equals(existingFamily, 
RocksDBTimestampedStoreWithHeaders.TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME)) {
+                        final boolean openingAsTimestampedStore = 
allDescriptors.stream()
+                                .anyMatch(descriptor -> 
Arrays.equals(descriptor.getName(), 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME));
+                        if (openingAsTimestampedStore) {
+                            throw new ProcessorStateException(
+                                    "Store " + name + " is a headers-aware 
store and cannot be opened as a timestamped store. " +
+                                    "Downgrade from headers-aware to 
timestamped store is not supported. " +
+                                    "To downgrade, you can delete the local 
state in the state directory, and rebuild the store as timestamped store from 
the changelog.");
+                        } else {
+                            throw new ProcessorStateException(
+                                    "Store " + name + " is a headers-aware 
store and cannot be opened as a regular key-value store. " +
+                                    "Downgrade from headers-aware to regular 
store is not supported.");
+                        }
+                    }
+
+                    final String unexpectedFamily = new String(existingFamily, 
StandardCharsets.UTF_8);
+                    throw new ProcessorStateException(
+                            "Unexpected column family '" + unexpectedFamily + 
"' found in store " + name + ". " +
+                            "The store may have been created with incompatible 
settings.");
+                }
+            }
+
             final List<ColumnFamilyDescriptor> existingDescriptors = new 
LinkedList<>();
             existingDescriptors.add(defaultColumnFamilyDescriptor);
             existingDescriptors.addAll(extraDescriptors.stream()
@@ -391,7 +425,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
                                            final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         final byte[] originalValue = get(key);
-        if (originalValue == null) {
+        if (originalValue == null && value != null) {
             put(key, value);
         }
         return originalValue;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index 52e4ec68bdb..f451a202168 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -54,7 +54,7 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
     private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
         RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME;
 
-    private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+    static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
         "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
 
     public RocksDBTimestampedStoreWithHeaders(final String name,
@@ -91,7 +91,7 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
     }
 
     private void openInUpgradeMode(final DBOptions dbOptions,
-                                              final ColumnFamilyOptions 
columnFamilyOptions) {
+                                   final ColumnFamilyOptions 
columnFamilyOptions) {
         final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
             dbOptions,
             // we have to open the default CF to be able to open the legacy 
CF, but we won't use it
@@ -127,7 +127,7 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
     }
 
     private void openInRegularMode(final DBOptions dbOptions,
-                              final ColumnFamilyOptions columnFamilyOptions) {
+                                   final ColumnFamilyOptions 
columnFamilyOptions) {
         final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
             dbOptions,
             // we have to open the default CF to be able to open the legacy 
CF, but we won't use it
@@ -143,7 +143,7 @@ public class RocksDBTimestampedStoreWithHeaders extends 
RocksDBStore implements
     }
 
     private void verifyAndCloseEmptyDefaultColumnFamily(final 
ColumnFamilyHandle columnFamilyHandle) {
-        try (final RocksIterator defaultIter = 
db.newIterator(columnFamilyHandle)) {
+        try (columnFamilyHandle; final RocksIterator defaultIter = 
db.newIterator(columnFamilyHandle)) {
             defaultIter.seekToFirst();
             if (defaultIter.isValid()) {
                 throw new ProcessorStateException("Cannot upgrade directly 
from key-value store to headers-aware store for " + name + ". " +
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index c3bb67cd524..2a666a2659b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
 import org.hamcrest.core.IsNull;
@@ -42,6 +43,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
 
@@ -163,34 +165,39 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
         assertThat(rocksDBStore.approximateNumEntries(), is(5L));
 
         // should add new key8 to new CF
-        rocksDBStore.put(new Bytes("key8".getBytes()), 
"timestamp+88888888".getBytes());
+        rocksDBStore.put(new Bytes("key8new".getBytes()), 
"timestamp+88888888".getBytes());
         // one delete on old CF, one put on new CF
         // approx: 3 entries on old CF, 2 in new CF
         assertThat(rocksDBStore.approximateNumEntries(), is(5L));
 
+        rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+        // one delete on old CF, one put on new CF
+        // approx: 2 entries on old CF, 1 in new CF
+        assertThat(rocksDBStore.approximateNumEntries(), is(3L));
+
         // putIfAbsent()
 
         // should migrate key4 from old to new CF with old value
         assertThat(rocksDBStore.putIfAbsent(new Bytes("key4".getBytes()), 
"timestamp+4444".getBytes()).length, is(8 + 4));
         // one delete on old CF, one put on new CF
-        // approx: 2 entries on old CF, 3 in new CF
-        assertThat(rocksDBStore.approximateNumEntries(), is(5L));
+        // approx: 1 entries on old CF, 2 in new CF
+        assertThat(rocksDBStore.approximateNumEntries(), is(3L));
 
         // should add new key11 to new CF
-        assertThat(rocksDBStore.putIfAbsent(new Bytes("key11".getBytes()), 
"timestamp+11111111111".getBytes()), new IsNull<>());
+        assertThat(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()), 
"timestamp+11111111111".getBytes()), new IsNull<>());
         // one delete on old CF, one put on new CF
-        // approx: 1 entries on old CF, 4 in new CF
-        assertThat(rocksDBStore.approximateNumEntries(), is(5L));
+        // approx: 0 entries on old CF, 3 in new CF
+        assertThat(rocksDBStore.approximateNumEntries(), is(3L));
 
         // should not delete key5 but migrate to new CF
         assertThat(rocksDBStore.putIfAbsent(new Bytes("key5".getBytes()), 
null).length, is(8 + 5));
         // one delete on old CF, one put on new CF
-        // approx: 0 entries on old CF, 5 in new CF
-        assertThat(rocksDBStore.approximateNumEntries(), is(5L));
+        // approx: 0 entries on old CF, 4 in new CF
+        assertThat(rocksDBStore.approximateNumEntries(), is(4L));
 
         // should be no-op on both CF
-        assertThat(rocksDBStore.putIfAbsent(new Bytes("key12".getBytes()), 
null), new IsNull<>());
-        // two delete operation, however, only one is counted because old CF 
count was zero before already
+        assertThat(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()), 
null), new IsNull<>());
+        // one delete operation, however, not counted because old CF count was 
zero before already
         // approx: 0 entries on old CF, 4 in new CF
         assertThat(rocksDBStore.approximateNumEntries(), is(4L));
 
@@ -221,7 +228,7 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
             }
             {
                 final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-                assertArrayEquals("key11".getBytes(), keyValue.key.get());
+                assertArrayEquals("key11new".getBytes(), keyValue.key.get());
                 assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, 
keyValue.value);
             }
             {
@@ -249,7 +256,7 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
             }
             {
                 final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-                assertArrayEquals("key8".getBytes(), keyValue.key.get());
+                assertArrayEquals("key8new".getBytes(), keyValue.key.get());
                 assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value);
             }
             assertFalse(itAll.hasNext());
@@ -280,7 +287,7 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
         try (final KeyValueIterator<Bytes, byte[]> itAll = 
rocksDBStore.reverseAll()) {
             {
                 final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-                assertArrayEquals("key8".getBytes(), keyValue.key.get());
+                assertArrayEquals("key8new".getBytes(), keyValue.key.get());
                 assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value);
             }
             {
@@ -308,7 +315,7 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
             }
             {
                 final KeyValue<Bytes, byte[]> keyValue = itAll.next();
-                assertArrayEquals("key11".getBytes(), keyValue.key.get());
+                assertArrayEquals("key11new".getBytes(), keyValue.key.get());
                 assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, 
keyValue.value);
             }
             {
@@ -351,7 +358,7 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
             }
             {
                 final KeyValue<Bytes, byte[]> keyValue = it.next();
-                assertArrayEquals("key11".getBytes(), keyValue.key.get());
+                assertArrayEquals("key11new".getBytes(), keyValue.key.get());
                 assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 
'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, 
keyValue.value);
             }
             assertFalse(it.hasNext());
@@ -388,9 +395,10 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
             assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new 
IsNull<>());
             assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new 
IsNull<>());
             assertThat(db.get(noTimestampColumnFamily, 
"key7".getBytes()).length, is(7));
-            assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new 
IsNull<>());
-            assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()), 
new IsNull<>());
-            assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()), 
new IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key8new".getBytes()), 
new IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key9new".getBytes()), 
new IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key11new".getBytes()), 
new IsNull<>());
+            assertThat(db.get(noTimestampColumnFamily, "key12new".getBytes()), 
new IsNull<>());
 
             assertThat(db.get(withTimestampColumnFamily, 
"unknown".getBytes()), new IsNull<>());
             assertThat(db.get(withTimestampColumnFamily, 
"key1".getBytes()).length, is(8 + 1));
@@ -400,9 +408,10 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
             assertThat(db.get(withTimestampColumnFamily, 
"key5".getBytes()).length, is(8 + 5));
             assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()), 
new IsNull<>());
             assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()), 
new IsNull<>());
-            assertThat(db.get(withTimestampColumnFamily, 
"key8".getBytes()).length, is(18));
-            assertThat(db.get(withTimestampColumnFamily, 
"key11".getBytes()).length, is(21));
-            assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), 
new IsNull<>());
+            assertThat(db.get(withTimestampColumnFamily, 
"key8new".getBytes()).length, is(18));
+            assertThat(db.get(noTimestampColumnFamily, "key9new".getBytes()), 
new IsNull<>());
+            assertThat(db.get(withTimestampColumnFamily, 
"key11new".getBytes()).length, is(21));
+            assertThat(db.get(withTimestampColumnFamily, 
"key12new".getBytes()), new IsNull<>());
         } catch (final RuntimeException fatal) {
             errorOccurred = true;
         } finally {
@@ -464,6 +473,29 @@ public class RocksDBTimestampedStoreTest extends 
RocksDBStoreTest {
         }
     }
 
+    @Test
+    public void shouldNotSupportDowngradeFromTimestampedToPlainKeyValueStore() 
{
+        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.put(new Bytes("key1".getBytes()), 
"timestamped-value1".getBytes());
+        rocksDBStore.put(new Bytes("key2".getBytes()), 
"timestamped-value2".getBytes());
+        rocksDBStore.close();
+
+        final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+        try {
+            final ProcessorStateException exception = assertThrows(
+                ProcessorStateException.class,
+                () -> kvStore.init(context, kvStore)
+            );
+
+            assertThat(exception.getMessage(), is(
+                "Store " + DB_NAME + " is a timestamped key-value store and 
cannot be opened as a regular key-value store. " +
+                "Downgrade from timestamped to regular store is not supported 
directly. " +
+                "To downgrade, you can delete the local state in the state 
directory, and rebuild the store as regular key-value store from the 
changelog."));
+        } finally {
+            kvStore.close();
+        }
+    }
+
     private void prepareOldStore() {
         final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME, 
METRICS_SCOPE);
         try {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index 98a74df76c4..ab863d4e5fc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -39,6 +39,7 @@ import org.rocksdb.RocksIterator;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static java.util.Arrays.asList;
@@ -177,19 +178,24 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
         assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on legacy CF, 2 in headers-aware CF after adding new key8new with 
put()");
 
+        rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+        // one delete on old CF, one put on new CF, but count is off by two 
due to deletes not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 2 
entries on legacy CF, 1 in headers-aware CF after adding new key8new with 
put()");
+
         // putIfAbsent() - tests migration on conditional write
 
         assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()), 
"headers+timestamp+11111111111".getBytes()),
             "Expected null return value for putIfAbsent on non-existing 
key11new, and new key should be added to headers-aware CF");
         // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
-        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 2 
entries on legacy CF, 3 in headers-aware CF after adding new key11new with 
putIfAbsent()");
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 1 
entries on legacy CF, 2 in headers-aware CF after adding new key11new with 
putIfAbsent()");
 
         assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new 
Bytes("key5".getBytes()), null).length,
             "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
-        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 1 
entry on legacy CF, 4 in headers-aware CF after migrating key5 with 
putIfAbsent(null)");
+        // one delete on old CF, one put on new CF, due to `get()` migration
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 0 
entry on legacy CF, 3 in headers-aware CF after migrating key5 with 
putIfAbsent(null)");
 
         assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()), 
null));
-        // two delete operation, however, only one is counted because old CF 
count can not be less than 0
+        // no delete operation, because key12new is unknown
         assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 0 
entries on legacy CF, 3 in headers-aware CF after putIfAbsent with null on 
non-existing key12new");
 
         // delete() - tests migration on delete
@@ -410,7 +416,10 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         assertNull(db.get(legacyTimestampedColumnFamily, "key6".getBytes())); 
// migrated
         assertEquals(8 + 7, db.get(legacyTimestampedColumnFamily, 
"key7".getBytes()).length); // not migrated
         assertNull(db.get(legacyTimestampedColumnFamily, 
"key8new".getBytes()));
+        assertNull(db.get(legacyTimestampedColumnFamily, 
"key9new".getBytes()));
         assertNull(db.get(legacyTimestampedColumnFamily, 
"key11new".getBytes()));
+        assertNull(db.get(legacyTimestampedColumnFamily, 
"key12new".getBytes()));
+
     }
 
     private void verifyHeadersColumnFamily(final RocksDB db, final 
ColumnFamilyHandle headersColumnFamily) throws Exception {
@@ -424,6 +433,7 @@ public class RocksDBTimestampedStoreWithHeadersTest extends 
RocksDBStoreTest {
         assertNull(db.get(headersColumnFamily, "key6".getBytes())); // 
migrated by delete() => deleted
         assertNull(db.get(headersColumnFamily, "key7".getBytes())); // not 
migrated, should still be in legacy column family
         assertEquals("headers+timestamp+88888888".getBytes().length, 
db.get(headersColumnFamily, "key8new".getBytes()).length); // added by put() => 
value is inserted without any conversion
+        assertNull(db.get(headersColumnFamily, "key9new".getBytes()));
         assertEquals("headers+timestamp+11111111111".getBytes().length, 
db.get(headersColumnFamily, "key11new".getBytes()).length); // inserted (newly 
added) by putIfAbsent() => value is inserted without any conversion
         assertNull(db.get(headersColumnFamily, "key12new".getBytes())); // 
putIfAbsent with null value on non-existing key should not create any entry
     }
@@ -525,11 +535,11 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
             boolean hasLegacy = false;
 
             for (final byte[] cf : existingCFs) {
-                if (java.util.Arrays.equals(cf, 
RocksDB.DEFAULT_COLUMN_FAMILY)) {
+                if (Arrays.equals(cf, RocksDB.DEFAULT_COLUMN_FAMILY)) {
                     hasDefault = true;
-                } else if (java.util.Arrays.equals(cf, 
"keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8))) {
+                } else if (Arrays.equals(cf, 
"keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8))) {
                     hasHeadersAware = true;
-                } else if (java.util.Arrays.equals(cf, 
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8))) {
+                } else if (Arrays.equals(cf, 
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8))) {
                     hasLegacy = true;
                 }
             }
@@ -549,6 +559,52 @@ public class RocksDBTimestampedStoreWithHeadersTest 
extends RocksDBStoreTest {
         }
     }
 
+    @Test
+    public void shouldNotSupportDowngradeFromHeadersAwareToRegularStore() {
+        // prepare headers-aware store with data
+        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.put(new Bytes("key1".getBytes()), 
"headers-aware-value1".getBytes());
+        rocksDBStore.put(new Bytes("key2".getBytes()), 
"headers-aware-value2".getBytes());
+        rocksDBStore.close();
+
+        final RocksDBStore regularStore = new RocksDBStore(DB_NAME, 
METRICS_SCOPE);
+        try {
+            final ProcessorStateException exception = assertThrows(
+                ProcessorStateException.class,
+                () -> regularStore.init(context, regularStore)
+            );
+
+            assertTrue(exception.getMessage().contains("Store " + DB_NAME + " 
is a headers-aware store"));
+            assertTrue(exception.getMessage().contains("cannot be opened as a 
regular key-value store"));
+            assertTrue(exception.getMessage().contains("Downgrade from 
headers-aware to regular store is not supported"));
+        } finally {
+            regularStore.close();
+        }
+    }
+
+    @Test
+    public void shouldNotSupportDowngradeFromHeadersAwareToTimestampedStore() {
+        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.put(new Bytes("key1".getBytes()), 
"headers-aware-value1".getBytes());
+        rocksDBStore.put(new Bytes("key2".getBytes()), 
"headers-aware-value2".getBytes());
+        rocksDBStore.close();
+
+        final RocksDBTimestampedStore timestampedStore = new 
RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE);
+        try {
+            final ProcessorStateException exception = assertThrows(
+                ProcessorStateException.class,
+                () -> timestampedStore.init(context, timestampedStore)
+            );
+
+            assertTrue(exception.getMessage().contains("Store " + DB_NAME + " 
is a headers-aware store"));
+            assertTrue(exception.getMessage().contains("cannot be opened as a 
timestamped store"));
+            assertTrue(exception.getMessage().contains("Downgrade from 
headers-aware to timestamped store is not supported"));
+            assertTrue(exception.getMessage().contains("To downgrade, you can 
delete the local state in the state directory, and rebuild the store as 
timestamped store from the changelog"));
+        } finally {
+            timestampedStore.close();
+        }
+    }
+
     private void prepareKeyValueStore() {
         // Create a plain RocksDBStore (key-value, not timestamped) with data 
in default column family
         final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);

Reply via email to