This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2eacc1cad4d KAFKA-20260: Upgrade path from plain KV store to headers
store (#21666)
2eacc1cad4d is described below
commit 2eacc1cad4d2dd710dd59a9df049c9f4e59f6807
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Mar 10 11:12:52 2026 +0100
KAFKA-20260: Upgrade path from plain KV store to headers store (#21666)
Extends headers-aware key-value store migration to support upgrading
from plain (non-timestamped) stores.
- Plain-to-Headers Migration Support
- RocksDB Upgrade Path
- Implemented `query()` method in the adapter class
Testing:
- 3 upgrade intergration tests
- 2 downgrade integration tests
- unit tests for all added methods/classes
Reviewers: Matthias J. Sax <[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 380 +++++++++++++++++--
.../kafka/streams/state/HeadersBytesStore.java | 26 ++
.../internals/PlainToHeadersIteratorAdapter.java | 62 ++++
.../internals/PlainToHeadersStoreAdapter.java | 215 +++++++++++
.../RocksDBTimestampedStoreWithHeaders.java | 134 ++++---
...TimestampedKeyValueStoreBuilderWithHeaders.java | 8 +-
.../kafka/streams/state/internals/Utils.java | 48 +++
.../kafka/streams/state/HeadersBytesStoreTest.java | 65 ++++
.../internals/PlainToHeadersStoreAdapterTest.java | 404 +++++++++++++++++++++
.../RocksDBTimestampedStoreWithHeadersTest.java | 327 ++++++++++++++++-
...stampedKeyValueStoreBuilderWithHeadersTest.java | 25 +-
.../kafka/streams/state/internals/UtilsTest.java | 50 +++
12 files changed, 1627 insertions(+), 117 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index a9a3c9a9e32..145720c4374 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -133,10 +134,10 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
streamsBuilderForOldStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilder(
- persistentStore ?
Stores.persistentTimestampedKeyValueStore(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
- Serdes.String(),
- Serdes.String()))
+ Stores.timestampedKeyValueStoreBuilder(
+ persistentStore ?
Stores.persistentTimestampedKeyValueStore(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
.stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
.process(TimestampedKeyValueProcessor::new, STORE_NAME);
@@ -154,10 +155,10 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
streamsBuilderForNewStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
- persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
- Serdes.String(),
- Serdes.String()))
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
.stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
.process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
@@ -184,10 +185,10 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
streamsBuilderForOldStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilder(
- Stores.persistentTimestampedKeyValueStore(STORE_NAME),
- Serdes.String(),
- Serdes.String()))
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
.stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
.process(TimestampedKeyValueProcessor::new, STORE_NAME);
@@ -207,10 +208,10 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
streamsBuilderForNewStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
- Stores.persistentTimestampedKeyValueStore(STORE_NAME),
- Serdes.String(),
- Serdes.String()))
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
.stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
.process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
@@ -233,6 +234,125 @@ public class HeadersStoreUpgradeIntegrationTest {
kafkaStreams.close();
}
+ @Test
+ public void
shouldMigrateInMemoryPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+ }
+
+ private void
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
boolean persistentStore) throws Exception {
+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+ streamsBuilderForOldStore.addStateStore(
+ Stores.keyValueStoreBuilder(
+ persistentStore ?
Stores.persistentKeyValueStore(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyValue("key1", "value1");
+ final long lastUpdateKeyOne = persistentStore ? -1L :
CLUSTER.time.milliseconds() - 1L;
+
+ processKeyValueAndVerifyValue("key2", "value2");
+ final long lastUpdateKeyTwo = persistentStore ? -1L :
CLUSTER.time.milliseconds() - 1L;
+
+ processKeyValueAndVerifyValue("key3", "value3");
+ final long lastUpdateKeyThree = persistentStore ? -1L :
CLUSTER.time.milliseconds() - 1L;
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+ streamsBuilderForNewStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
+ kafkaStreams.start();
+
+ // Verify legacy data can be read with empty headers and timestamp
+ verifyLegacyValuesWithEmptyHeaders("key1", "value1", lastUpdateKeyOne);
+ verifyLegacyValuesWithEmptyHeaders("key2", "value2", lastUpdateKeyTwo);
+ verifyLegacyValuesWithEmptyHeaders("key3", "value3",
lastUpdateKeyThree);
+
+ // Process new records with headers
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3",
333L, headers, headers);
+ processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4",
444L, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+ streamsBuilderForOldStore.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyValue("key1", "value1");
+ processKeyValueAndVerifyValue("key2", "value2");
+ processKeyValueAndVerifyValue("key3", "value3");
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+
+
+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+ streamsBuilderForNewStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.persistentKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
+ kafkaStreams.start();
+
+ // Verify legacy data can be read with empty headers
+ verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1L);
+ verifyLegacyValuesWithEmptyHeaders("key2", "value2", -1L);
+ verifyLegacyValuesWithEmptyHeaders("key3", "value3", -1L);
+
+ // Process new records with headers
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3",
333L, -1, headers, expectedHeaders);
+ processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4",
444L, -1, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
private <K, V> void processKeyValueAndVerifyTimestampedValue(final K key,
final V value,
final long
timestamp)
@@ -269,6 +389,41 @@ public class HeadersStoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
+ private <K, V> void processKeyValueAndVerifyValue(final K key,
+ final V value)
+ throws Exception {
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ CLUSTER.time,
+ false);
+
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, V> store =
+ IntegrationTestUtils.getStore(STORE_NAME,
kafkaStreams, QueryableStoreTypes.keyValueStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final V result = store.get(key);
+ return result != null && result.equals(value);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
+ }
+
private <K, V> void verifyLegacyTimestampedValue(final K key,
final V value,
final long timestamp)
@@ -296,7 +451,6 @@ public class HeadersStoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
-
private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final
K key,
final
V value,
final
long timestamp,
@@ -338,6 +492,48 @@ public class HeadersStoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
+ private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final
K key,
+ final
V value,
+ final
long timestamp,
+ final
long expectedTimestamp,
+ final
Headers headers,
+ final
Headers expectedHeaders)
+ throws Exception {
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
+ .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
+
+ if (store == null)
+ return false;
+
+ final ValueTimestampHeaders<V> result = store.get(key);
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == expectedTimestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
+ }
+
private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
final V value,
final long
timestamp) throws Exception {
@@ -365,6 +561,45 @@ public class HeadersStoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
+ private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
+ final V value)
throws Exception {
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
+ .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
+
+ if (store == null)
+ return false;
+
+ final ValueTimestampHeaders<V> result = store.get(key);
+ return result != null
+ && result.value().equals(value)
+ && result.headers().toArray().length == 0;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
+ }
+
+ private static class KeyValueProcessor implements Processor<String,
String, Void, Void> {
+ private KeyValueStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ store.put(record.key(), record.value());
+ }
+ }
+
private static class TimestampedKeyValueProcessor implements
Processor<String, String, Void, Void> {
private TimestampedKeyValueStore<String, String> store;
@@ -547,10 +782,10 @@ public class HeadersStoreUpgradeIntegrationTest {
}
private void processWindowedKeyValueWithHeadersAndVerify(final String key,
- final String
value,
- final long
timestamp,
- final Headers
headers,
- final Headers
expectedHeaders) throws Exception {
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputStream,
singletonList(KeyValue.pair(key, value)),
@@ -599,8 +834,8 @@ public class HeadersStoreUpgradeIntegrationTest {
}
private void verifyWindowValueWithEmptyHeaders(final String key,
- final String value,
- final long timestamp)
throws Exception {
+ final String value,
+ final long timestamp)
throws Exception {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
@@ -681,6 +916,78 @@ public class HeadersStoreUpgradeIntegrationTest {
}
}
+ @Test
+ public void
shouldFailDowngradeFromTimestampedKeyValueStoreWithHeadersToPlainKeyValueStore()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateKeyValueStoreWithHeaders(props);
+ kafkaStreams = null;
+
+ // Attempt to downgrade to plain key-value store
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ boolean exceptionThrown = false;
+ try {
+ kafkaStreams.start();
+ } catch (final Exception e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof ProcessorStateException &&
+ cause.getMessage() != null &&
+ cause.getMessage().contains("headers-aware") &&
+ cause.getMessage().contains("Downgrade")) {
+ exceptionThrown = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException
about downgrade not being supported, but got: " + e.getMessage(), e);
+ }
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException to be
thrown when attempting to downgrade from headers-aware to plain key-value
store");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromTimestampedKeyValueStoreWithHeadersToPlainKeyValueStoreAfterCleanup()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateKeyValueStoreWithHeaders(props);
+
+ kafkaStreams.cleanUp(); // Delete local state
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyValue("key3", "value3");
+ processKeyValueAndVerifyValue("key4", "value4");
+
+ kafkaStreams.close();
+ }
+
@Test
public void
shouldFailDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStore()
throws Exception {
final Properties props = props();
@@ -757,6 +1064,7 @@ public class HeadersStoreUpgradeIntegrationTest {
kafkaStreams.close();
}
+
@Test
public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
final Properties props = props();
@@ -913,6 +1221,19 @@ public class HeadersStoreUpgradeIntegrationTest {
return CLUSTER.time.milliseconds();
}
+ private void produceRecordWithHeaders(final String key, final String
value, final long timestamp) throws Exception {
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+ }
+
private void setupAndPopulateKeyValueStoreWithHeaders(final Properties
props) throws Exception {
final StreamsBuilder headersBuilder = new StreamsBuilder();
headersBuilder.addStateStore(
@@ -934,17 +1255,4 @@ public class HeadersStoreUpgradeIntegrationTest {
kafkaStreams.close();
}
-
- private void produceRecordWithHeaders(final String key, final String
value, final long timestamp) throws Exception {
- final Headers headers = new RecordHeaders();
- headers.add("source", "test".getBytes());
-
- IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- inputStream,
- singletonList(KeyValue.pair(key, value)),
- TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
- headers,
- timestamp,
- false);
- }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 7170ba9c7e8..06f85595047 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -58,4 +58,30 @@ public interface HeadersBytesStore {
.put(valueAndTimestamp)
.array();
}
+
+ static byte[] convertFromPlainToHeaderFormat(final byte[] value) {
+ if (value == null) {
+ return null;
+ }
+
+ // Format: [headersSize(varint)][headersBytes][timestamp(8)][payload]
+ // For empty headers and timestamp=-1:
+ // headersSize = varint(0) = [0x00]
+ // headersBytes = [] (empty, 0 bytes)
+ // timestamp = -1 (8 bytes)
+ // Result: [0x00][timestamp=-1][payload]
+ final byte[] result = new byte[1 + 8 + value.length];
+ result[0] = 0x00; // empty headers
+ // timestamp = -1 (8 bytes in big-endian)
+ result[1] = (byte) 0xFF;
+ result[2] = (byte) 0xFF;
+ result[3] = (byte) 0xFF;
+ result[4] = (byte) 0xFF;
+ result[5] = (byte) 0xFF;
+ result[6] = (byte) 0xFF;
+ result[7] = (byte) 0xFF;
+ result[8] = (byte) 0xFF;
+ System.arraycopy(value, 0, result, 9, value.length);
+ return result;
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersIteratorAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersIteratorAdapter.java
new file mode 100644
index 00000000000..18b277b1156
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersIteratorAdapter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders}
and
+ * plain {@link org.apache.kafka.streams.state.KeyValueStore}.
+ *
+ * @see PlainToHeadersStoreAdapter
+ */
+
+class PlainToHeadersIteratorAdapter<K> implements KeyValueIterator<K, byte[]> {
+ private final KeyValueIterator<K, byte[]> innerIterator;
+
+ public PlainToHeadersIteratorAdapter(final KeyValueIterator<K, byte[]>
innerIterator) {
+ this.innerIterator = innerIterator;
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+
+ @Override
+ public K peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, byte[]> next() {
+ final KeyValue<K, byte[]> plainKeyValue = innerIterator.next();
+ if (plainKeyValue == null) {
+ return null;
+ }
+ return KeyValue.pair(plainKeyValue.key,
convertFromPlainToHeaderFormat(plainKeyValue.value));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
new file mode 100644
index 00000000000..4d762b82392
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and plain {@link KeyValueStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code KeyValueStore} (without
timestamp or headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to
translate between
+ * the plain {@code byte[]} format and the timestamped-with-headers {@code
byte[]} format.
+ *
+ * @see PlainToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class PlainToHeadersStoreAdapter implements KeyValueStore<Bytes,
byte[]> {
+ final KeyValueStore<Bytes, byte[]> store;
+
+ PlainToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {
+ if (!store.persistent()) {
+ throw new IllegalArgumentException("Provided store must be a
persistent store, but it is not.");
+ }
+ if (store instanceof TimestampedBytesStore) {
+ throw new IllegalArgumentException("Provided store must be a plain
(non-timestamped) key value store, but it is timestamped.");
+ }
+ this.store = store;
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] valueWithTimestampAndHeaders) {
+ store.put(key, rawPlainValue(valueWithTimestampAndHeaders));
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] valueWithTimestampAndHeaders) {
+ return convertFromPlainToHeaderFormat(store.putIfAbsent(
+ key,
+ rawPlainValue(valueWithTimestampAndHeaders)));
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
+ final byte[] valueWithTimestampAndHeaders = entry.value;
+ store.put(entry.key, rawPlainValue(valueWithTimestampAndHeaders));
+ }
+ }
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ return convertFromPlainToHeaderFormat(store.delete(key));
+ }
+
+ @Override
+ public String name() {
+ return store.name();
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ store.init(stateStoreContext, root);
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ store.commit(changelogOffsets);
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return store.isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+
+ // Handle KeyQuery: convert byte[] result from plain to headers format
+ if (query instanceof KeyQuery) {
+ final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>)
query;
+ final QueryResult<byte[]> rawResult = store.query(keyQuery,
positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final byte[] convertedValue =
convertFromPlainToHeaderFormat(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedValue);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else if (query instanceof RangeQuery) {
+ // Handle RangeQuery: wrap iterator to convert values
+ final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes,
byte[]>) query;
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ store.query(rangeQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> convertedIterator =
+ new
PlainToHeadersIteratorAdapter<>(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedIterator);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+ // For other query types, delegate to the underlying store
+ result = store.query(query, positionBound, config);
+ }
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (System.nanoTime() -
start) + "ns"
+ );
+ }
+
+ return result;
+ }
+
+ @Override
+ public Position getPosition() {
+ return store.getPosition();
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ return convertFromPlainToHeaderFormat(store.get(key));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
+ return new PlainToHeadersIteratorAdapter<>(store.range(from, to));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+ final Bytes to) {
+ return new PlainToHeadersIteratorAdapter<>(store.reverseRange(from,
to));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+ return new PlainToHeadersIteratorAdapter<>(store.all());
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseAll() {
+ return new PlainToHeadersIteratorAdapter<>(store.reverseAll());
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return new PlainToHeadersIteratorAdapter<>(store.prefixScan(prefix,
prefixKeySerializer));
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return store.approximateNumEntries();
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index b776e34591c..93f32a11691 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -48,15 +48,15 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
private static final Logger log =
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
/**
- * Legacy column family name - must match {@code
RocksDBTimestampedStore#TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME}
+ * Legacy column family name - must match {@code
RocksDBTimestampedStore#TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME}
*/
-
private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME;
static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
"keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
public RocksDBTimestampedStoreWithHeaders(final String name,
final String metricsScope) {
super(name, metricsScope);
@@ -71,7 +71,7 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
@Override
void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
- // Check if we're upgrading from RocksDBTimestampedStore (which uses
keyValueWithTimestamp CF)
+ // Check if we're upgrading from RocksDBTimestampedStore or from plain
RocksDBStore
final List<byte[]> existingCFs;
try (final Options options = new Options(dbOptions, new
ColumnFamilyOptions())) {
existingCFs = RocksDB.listColumnFamilies(options,
dbDir.getAbsolutePath());
@@ -79,40 +79,51 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
throw new ProcessorStateException("Error listing column families
for store " + name, e);
}
-
- final boolean upgradingFromTimestampedStore = existingCFs.stream()
+ final boolean hasTimestampedCF = existingCFs.stream()
.anyMatch(cf -> Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME));
- if (upgradingFromTimestampedStore) {
- openInUpgradeMode(dbOptions, columnFamilyOptions);
+ if (hasTimestampedCF) {
+ // Upgrading from timestamped store - use 2 CFs:
LEGACY_TIMESTAMPED + HEADERS
+ openFromTimestampedStore(dbOptions, columnFamilyOptions); // needs
to check that default-CF has no data
} else {
- openInRegularMode(dbOptions, columnFamilyOptions);
+ openFromDefaultStore(dbOptions, columnFamilyOptions);
}
+
}
- @SuppressWarnings("SynchronizeOnNonFinalField")
- @Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
- final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
- final QueryResult<R> result;
+ private void openFromDefaultStore(final DBOptions dbOptions,
+ final ColumnFamilyOptions
columnFamilyOptions) {
- synchronized (position) {
- result = QueryResult.forUnknownQueryType(query, this);
+ final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+ dbOptions,
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions)
+ );
- if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + this.getClass() + " in " +
(System.nanoTime() - start) + "ns"
+ final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
+ final ColumnFamilyHandle headersCf = columnFamilies.get(1);
+
+ // Check if default CF has data (plain store upgrade)
+ try (final RocksIterator defaultIter = db.newIterator(defaultCf)) {
+ defaultIter.seekToFirst();
+ if (defaultIter.isValid()) {
+ log.info("Opening store {} in upgrade mode from plain key
value store", name);
+ cfAccessor = new DualColumnFamilyAccessor(
+ defaultCf,
+ headersCf,
+ HeadersBytesStore::convertFromPlainToHeaderFormat,
+ this
);
+ } else {
+ log.info("Opening store {} in regular headers-aware mode",
name);
+ cfAccessor = new SingleColumnFamilyAccessor(headersCf);
+ defaultCf.close();
}
- result.setPosition(position.copy());
}
- return result;
}
- private void openInUpgradeMode(final DBOptions dbOptions,
- final ColumnFamilyOptions
columnFamilyOptions) {
+ private void openFromTimestampedStore(final DBOptions dbOptions,
+ final ColumnFamilyOptions
columnFamilyOptions) {
final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
dbOptions,
// we have to open the default CF to be able to open the legacy
CF, but we won't use it
@@ -121,56 +132,71 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions)
);
- verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+ // verify and close empty Default ColumnFamily
+ try (final RocksIterator defaultIter =
db.newIterator(columnFamilies.get(0))) {
+ defaultIter.seekToFirst();
+ if (defaultIter.isValid()) {
+ // Close all column family handles before throwing
+ columnFamilies.get(0).close();
+ columnFamilies.get(1).close();
+ columnFamilies.get(2).close();
+ throw new ProcessorStateException(
+ "Inconsistent store state for " + name + ". " +
+ "Cannot have both plain (DEFAULT) and timestamped data
simultaneously. " +
+ "Headers store can upgrade from either plain or
timestamped format, but not both."
+ );
+ }
+ // close default column family handle
+ columnFamilies.get(0).close();
+ }
- final ColumnFamilyHandle legacyCf = columnFamilies.get(1);
+ final ColumnFamilyHandle legacyTimestampedCf = columnFamilies.get(1);
final ColumnFamilyHandle headersCf = columnFamilies.get(2);
- // Check if legacy CF has data
- try (final RocksIterator legacyIter = db.newIterator(legacyCf)) {
+ // Check if legacy timestamped CF has data
+ try (final RocksIterator legacyIter =
db.newIterator(legacyTimestampedCf)) {
legacyIter.seekToFirst();
if (legacyIter.isValid()) {
- log.info("Opening store {} in upgrade mode", name);
- cfAccessor = new DualColumnFamilyAccessor(legacyCf, headersCf,
- HeadersBytesStore::convertToHeaderFormat, this);
+ log.info("Opening store {} in upgrade mode from timestamped
store", name);
+ cfAccessor = new DualColumnFamilyAccessor(
+ legacyTimestampedCf,
+ headersCf,
+ HeadersBytesStore::convertToHeaderFormat,
+ this
+ );
} else {
log.info("Opening store {} in regular headers-aware mode",
name);
cfAccessor = new SingleColumnFamilyAccessor(headersCf);
try {
- db.dropColumnFamily(legacyCf);
+ db.dropColumnFamily(legacyTimestampedCf);
} catch (final RocksDBException e) {
throw new RuntimeException(e);
} finally {
- legacyCf.close();
+ legacyTimestampedCf.close();
}
}
- }
+ }
}
- private void openInRegularMode(final DBOptions dbOptions,
- final ColumnFamilyOptions
columnFamilyOptions) {
- final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
- dbOptions,
- // we have to open the default CF to be able to open the legacy
CF, but we won't use it
- new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
- new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions)
- );
-
- verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+ @SuppressWarnings("SynchronizeOnNonFinalField")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
- final ColumnFamilyHandle headersCf = columnFamilies.get(1);
- log.info("Opening store {} in regular headers-aware mode", name);
- cfAccessor = new SingleColumnFamilyAccessor(headersCf);
- }
+ synchronized (position) {
+ result = QueryResult.forUnknownQueryType(query, this);
- private void verifyAndCloseEmptyDefaultColumnFamily(final
ColumnFamilyHandle columnFamilyHandle) {
- try (columnFamilyHandle; final RocksIterator defaultIter =
db.newIterator(columnFamilyHandle)) {
- defaultIter.seekToFirst();
- if (defaultIter.isValid()) {
- throw new ProcessorStateException("Cannot upgrade directly
from key-value store to headers-aware store for " + name + ". " +
- "Please first upgrade to RocksDBTimestampedStore, then
upgrade to RocksDBTimestampedStoreWithHeaders.");
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + this.getClass() + " in " +
(System.nanoTime() - start) + "ns"
+ );
}
+ result.setPosition(position.copy());
}
+ return result;
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index 8e7c43a9ee8..d2b294b6716 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -65,7 +66,12 @@ public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
if (!(store instanceof HeadersBytesStore)) {
if (store.persistent()) {
- store = new TimestampedToHeadersStoreAdapter(store);
+ // Persistent store: use adapter based on whether it's
timestamped or plain
+ if (store instanceof TimestampedBytesStore) {
+ store = new TimestampedToHeadersStoreAdapter(store);
+ } else {
+ store = new PlainToHeadersStoreAdapter(store);
+ }
} else {
store = new
InMemoryTimestampedKeyValueStoreWithHeadersMarker(store);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
new file mode 100644
index 00000000000..6afce4a92be
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+
+ /**
+ * Extract raw plain value from serialized ValueTimestampHeaders.
+ * This strips both the headers and timestamp portions.
+ *
+ * Format conversion:
+ * Input: [headersSize(varint)][headers][timestamp(8)][value]
+ * Output: [value]
+ */
+ static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ // Skip headers and timestamp (8 bytes)
+ buffer.position(buffer.position() + headersSize + 8);
+
+ final byte[] result = new byte[buffer.remaining()];
+ buffer.get(result);
+ return result;
+ }
+
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
index 1ce81ebbead..37cacde22c0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java
@@ -64,4 +64,69 @@ public class HeadersBytesStoreTest {
assertEquals(0, headersSize, "Empty headers should have headersSize =
0");
assertEquals(0, buffer.remaining(), "No payload bytes for empty
value");
}
+
+ @Test
+ public void shouldReturnNullWhenConvertingNullPlainValue() {
+ final byte[] result =
HeadersBytesStore.convertFromPlainToHeaderFormat(null);
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldConvertPlainValueToHeaderFormatWithTimestamp() {
+ final byte[] plainValue = "test-value".getBytes();
+
+ final byte[] converted =
HeadersBytesStore.convertFromPlainToHeaderFormat(plainValue);
+
+ assertNotNull(converted);
+ // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)][value]
+ assertEquals(1 + 8 + plainValue.length, converted.length);
+
+ // Verify empty headers marker
+ assertEquals(0x00, converted[0], "First byte for empty header should
be 0x00");
+
+ // Verify timestamp = -1 (all 0xFF bytes)
+ for (int i = 1; i <= 8; i++) {
+ assertEquals((byte) 0xFF, converted[i], "Timestamp byte " + (i -
1) + " should be 0xFF for -1");
+ }
+
+ // Verify payload
+ final byte[] actualPayload = Arrays.copyOfRange(converted, 9,
converted.length);
+ assertArrayEquals(plainValue, actualPayload);
+ }
+
+ @Test
+ public void shouldConvertEmptyPlainValueToHeaderFormat() {
+ final byte[] emptyValue = new byte[0];
+
+ final byte[] converted =
HeadersBytesStore.convertFromPlainToHeaderFormat(emptyValue);
+
+ assertNotNull(converted);
+ // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)]
+ assertEquals(9, converted.length, "Converted empty value should have
headers + timestamp");
+
+ final ByteBuffer buffer = ByteBuffer.wrap(converted);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ assertEquals(0, headersSize, "Empty headers should have headersSize =
0");
+
+ // Verify timestamp = -1
+ final long timestamp = buffer.getLong();
+ assertEquals(-1L, timestamp, "Timestamp should be -1 for plain value
upgrade");
+ }
+
+ @Test
+ public void shouldConvertPlainValueWithCorrectByteOrder() {
+ final byte[] plainValue = new byte[]{0x01, 0x02, 0x03};
+
+ final byte[] converted =
HeadersBytesStore.convertFromPlainToHeaderFormat(plainValue);
+
+ // Expected: [0x00][0xFF x 8][0x01, 0x02, 0x03]
+ final byte[] expected = new byte[]{
+ 0x00, // empty headers
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, // timestamp
-1 (high 4 bytes)
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, // timestamp
-1 (low 4 bytes)
+ 0x01, 0x02, 0x03 // payload
+ };
+
+ assertArrayEquals(expected, converted);
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapterTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapterTest.java
new file mode 100644
index 00000000000..72cd7e9e2ae
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapterTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class PlainToHeadersStoreAdapterTest {
+
+ @Mock
+ private KeyValueStore<Bytes, byte[]> mockStore;
+
+ @Mock
+ private KeyValueIterator<Bytes, byte[]> mockIterator;
+
+ private PlainToHeadersStoreAdapter adapter;
+
+ private PlainToHeadersStoreAdapter createAdapter() {
+ when(mockStore.persistent()).thenReturn(true);
+ return new PlainToHeadersStoreAdapter(mockStore);
+ }
+
+ @Test
+ public void shouldThrowIfStoreIsNotPersistent() {
+ when(mockStore.persistent()).thenReturn(false);
+
+ final IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> new PlainToHeadersStoreAdapter(mockStore)
+ );
+
+ assertTrue(exception.getMessage().contains("Provided store must be a
persistent store"));
+ }
+
+ @Test
+ public void shouldThrowIfStoreIsTimestamped() {
+ final RocksDBTimestampedStore timestampedStore = new
RocksDBTimestampedStore("test", "scope");
+
+ final IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> new PlainToHeadersStoreAdapter(timestampedStore)
+ );
+
+ assertTrue(exception.getMessage().contains("Provided store must be a
plain (non-timestamped)"));
+ }
+
+ @Test
+ public void shouldPutRawPlainValueToStore() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("key".getBytes());
+ final byte[] plainValue = "value".getBytes();
+ final byte[] valueWithHeaders =
convertFromPlainToHeaderFormat(plainValue);
+
+ adapter.put(key, valueWithHeaders);
+
+ verify(mockStore).put(eq(key), eq(plainValue));
+ }
+
+ @Test
+ public void shouldGetAndConvertFromPlainToHeaderFormat() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("key".getBytes());
+ final byte[] plainValue = "value".getBytes();
+ when(mockStore.get(key)).thenReturn(plainValue);
+
+ final byte[] result = adapter.get(key);
+
+ assertArrayEquals(convertFromPlainToHeaderFormat(plainValue), result);
+ }
+
+ @Test
+ public void shouldReturnNullWhenStoreReturnsNull() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("key".getBytes());
+ when(mockStore.get(key)).thenReturn(null);
+
+ final byte[] result = adapter.get(key);
+
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldPutIfAbsentAndConvertResult() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("key".getBytes());
+ final byte[] plainValue = "value".getBytes();
+ final byte[] valueWithHeaders =
convertFromPlainToHeaderFormat(plainValue);
+ final byte[] oldPlainValue = "oldValue".getBytes();
+ when(mockStore.putIfAbsent(eq(key),
eq(plainValue))).thenReturn(oldPlainValue);
+
+ final byte[] result = adapter.putIfAbsent(key, valueWithHeaders);
+
+ assertArrayEquals(convertFromPlainToHeaderFormat(oldPlainValue),
result);
+ }
+
+ @Test
+ public void shouldDeleteAndConvertResult() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("key".getBytes());
+ final byte[] oldPlainValue = "oldValue".getBytes();
+ when(mockStore.delete(key)).thenReturn(oldPlainValue);
+
+ final byte[] result = adapter.delete(key);
+
+ assertArrayEquals(convertFromPlainToHeaderFormat(oldPlainValue),
result);
+ }
+
+ @Test
+ public void shouldPutAllEntries() {
+ adapter = createAdapter();
+ final Bytes key1 = new Bytes("key1".getBytes());
+ final Bytes key2 = new Bytes("key2".getBytes());
+ final byte[] value1 =
convertFromPlainToHeaderFormat("value1".getBytes());
+ final byte[] value2 =
convertFromPlainToHeaderFormat("value2".getBytes());
+
+ final List<KeyValue<Bytes, byte[]>> entries = Arrays.asList(
+ KeyValue.pair(key1, value1),
+ KeyValue.pair(key2, value2)
+ );
+
+ adapter.putAll(entries);
+
+ verify(mockStore).put(eq(key1), eq("value1".getBytes()));
+ verify(mockStore).put(eq(key2), eq("value2".getBytes()));
+ }
+
+ @Test
+ public void shouldWrapRangeIterator() {
+ adapter = createAdapter();
+ final Bytes from = new Bytes("a".getBytes());
+ final Bytes to = new Bytes("z".getBytes());
+ when(mockStore.range(from, to)).thenReturn(mockIterator);
+
+ final KeyValueIterator<Bytes, byte[]> result = adapter.range(from, to);
+
+ assertNotNull(result);
+ assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+ }
+
+ @Test
+ public void shouldWrapReverseRangeIterator() {
+ adapter = createAdapter();
+ final Bytes from = new Bytes("a".getBytes());
+ final Bytes to = new Bytes("z".getBytes());
+ when(mockStore.reverseRange(from, to)).thenReturn(mockIterator);
+
+ final KeyValueIterator<Bytes, byte[]> result =
adapter.reverseRange(from, to);
+
+ assertNotNull(result);
+ assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+ }
+
+ @Test
+ public void shouldWrapAllIterator() {
+ adapter = createAdapter();
+ when(mockStore.all()).thenReturn(mockIterator);
+
+ final KeyValueIterator<Bytes, byte[]> result = adapter.all();
+
+ assertNotNull(result);
+ assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+ }
+
+ @Test
+ public void shouldWrapReverseAllIterator() {
+ adapter = createAdapter();
+ when(mockStore.reverseAll()).thenReturn(mockIterator);
+
+ final KeyValueIterator<Bytes, byte[]> result = adapter.reverseAll();
+
+ assertNotNull(result);
+ assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+ }
+
+ @Test
+ public void shouldWrapPrefixScanIterator() {
+ adapter = createAdapter();
+ when(mockStore.prefixScan(any(), any())).thenReturn(mockIterator);
+
+ final KeyValueIterator<Bytes, byte[]> result =
adapter.prefixScan("prefix", (topic, data) -> data.getBytes());
+
+ assertNotNull(result);
+ assertTrue(result instanceof PlainToHeadersIteratorAdapter);
+ }
+
+ @Test
+ public void shouldHandleKeyQuery() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("test-key".getBytes());
+ final byte[] plainValue = "test-value".getBytes();
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+ final QueryResult<byte[]> mockResult =
QueryResult.forResult(plainValue);
+ when(mockStore.query(eq(query), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn(mockResult);
+
+ final QueryResult<byte[]> result = adapter.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ assertTrue(result.isSuccess());
+ assertArrayEquals(convertFromPlainToHeaderFormat(plainValue),
result.getResult());
+ }
+
+ @Test
+ public void shouldHandleKeyQueryWithNullResult() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("test-key".getBytes());
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+ final QueryResult<byte[]> mockResult = QueryResult.forResult(null);
+ when(mockStore.query(eq(query), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn(mockResult);
+
+ final QueryResult<byte[]> result = adapter.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ assertTrue(result.isSuccess());
+ assertNull(result.getResult());
+ }
+
+ @Test
+ public void shouldHandleFailedKeyQuery() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("test-key".getBytes());
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+ final QueryResult<byte[]> mockResult =
QueryResult.forUnknownQueryType(query, mockStore);
+ when(mockStore.query(eq(query), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn(mockResult);
+
+ final QueryResult<byte[]> result = adapter.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ assertFalse(result.isSuccess());
+ }
+
+ @Test
+ public void shouldHandleRangeQuery() {
+ adapter = createAdapter();
+ final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(
+ new Bytes("a".getBytes()),
+ new Bytes("z".getBytes())
+ );
+
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> mockResult =
QueryResult.forResult(mockIterator);
+ when(mockStore.query(eq(query), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn(mockResult);
+
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> result =
adapter.query(
+ query,
+ PositionBound.unbounded(),
+ new QueryConfig(false)
+ );
+
+ assertTrue(result.isSuccess());
+ assertNotNull(result.getResult());
+ assertTrue(result.getResult() instanceof
PlainToHeadersIteratorAdapter);
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForKeyQuery() {
+ adapter = createAdapter();
+ final Bytes key = new Bytes("test-key".getBytes());
+ final byte[] plainValue = "test-value".getBytes();
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(key);
+
+ final QueryResult<byte[]> mockResult =
QueryResult.forResult(plainValue);
+ when(mockStore.query(eq(query), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn(mockResult);
+
+ final QueryResult<byte[]> result = adapter.query(query,
PositionBound.unbounded(), new QueryConfig(true));
+
+ assertTrue(result.isSuccess());
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ final String executionInfo = String.join("\n",
result.getExecutionInfo());
+ assertTrue(executionInfo.contains("Handled in"), "Expected execution
info to contain handling information");
+
assertTrue(executionInfo.contains(PlainToHeadersStoreAdapter.class.getName()),
+ "Expected execution info to mention PlainToHeadersStoreAdapter");
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForRangeQuery() {
+ adapter = createAdapter();
+ final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(
+ new Bytes("a".getBytes()),
+ new Bytes("z".getBytes())
+ );
+
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> mockResult =
QueryResult.forResult(mockIterator);
+ when(mockStore.query(eq(query), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn(mockResult);
+
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> result =
adapter.query(
+ query,
+ PositionBound.unbounded(),
+ new QueryConfig(true)
+ );
+
+ assertTrue(result.isSuccess());
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ final String executionInfo = String.join("\n",
result.getExecutionInfo());
+ assertTrue(executionInfo.contains("Handled in"), "Expected execution
info to contain handling information");
+ }
+
+ @Test
+ public void shouldDelegateOtherQueryTypesToUnderlyingStore() {
+ adapter = createAdapter();
+ // Create a custom query type that's not KeyQuery or RangeQuery
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("key".getBytes()));
+
+ final QueryResult<byte[]> mockResult =
QueryResult.forUnknownQueryType(query, mockStore);
+ when(mockStore.query(eq(query), any(PositionBound.class),
any(QueryConfig.class)))
+ .thenReturn(mockResult);
+
+ final QueryResult<byte[]> result = adapter.query(query,
PositionBound.unbounded(), new QueryConfig(false));
+
+ // The result should be passed through from the underlying store
+ assertFalse(result.isSuccess());
+ }
+
+ @Test
+ public void shouldDelegateName() {
+ when(mockStore.name()).thenReturn("test-store");
+ adapter = createAdapter();
+
+ assertEquals("test-store", adapter.name());
+ }
+
+ @Test
+ public void shouldReturnTrueForPersistent() {
+ adapter = createAdapter();
+
+ assertTrue(adapter.persistent());
+ }
+
+ @Test
+ public void shouldDelegateIsOpen() {
+ when(mockStore.isOpen()).thenReturn(true);
+ adapter = createAdapter();
+
+ assertTrue(adapter.isOpen());
+ }
+
+ @Test
+ public void shouldDelegateApproximateNumEntries() {
+ when(mockStore.approximateNumEntries()).thenReturn(42L);
+ adapter = createAdapter();
+
+ assertEquals(42L, adapter.approximateNumEntries());
+ }
+
+ @Test
+ public void shouldDelegateGetPosition() {
+ adapter = createAdapter();
+ when(mockStore.getPosition()).thenReturn(null);
+
+ adapter.getPosition();
+
+ verify(mockStore).getPosition();
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index 14c13b25d9a..398b630b764 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -134,16 +134,172 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
}
@Test
- public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+ public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws
Exception {
+ prepareKeyValueStoreWithMultipleKeys();
+
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+ // get() - tests lazy migration on read
+
+ assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())),
"Expected null for unknown key");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries on DEFAULT CF, 0 in headers-aware CF");
+
+ assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new
Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(1) = 10 bytes");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+ // put() - tests migration on write
+
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"headers+timestamp+22".getBytes());
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+ rocksDBStore.put(new Bytes("key3".getBytes()), null);
+ // count is off by one, due to two delete operations (even if one does
not delete anything)
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+ rocksDBStore.put(new Bytes("key8new".getBytes()),
"headers+timestamp+88888888".getBytes());
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with
put()");
+
+ rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+ // one delete on old CF, one put on new CF, but count is off by two
due to deletes not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with
put()");
+
+ // putIfAbsent() - tests migration on conditional write
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
"headers+timestamp+11111111111".getBytes()),
+ "Expected null return value for putIfAbsent on non-existing
key11new, and new key should be added to headers-aware CF");
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with
putIfAbsent()");
+
+ assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new
Bytes("key5".getBytes()), null).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+ // one delete on old CF, one put on new CF, due to `get()` migration
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with
putIfAbsent(null)");
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null));
+ // no delete operation, because key12new is unknown
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on
non-existing key12new");
+
+ // delete() - tests migration on delete
+
+ assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new
Bytes("key6".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+ // two delete operation, however, only one is counted because old CF
count was zero before already
+ assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with
delete()");
+
+ // iterators should not trigger migration (read-only)
+ iteratorsShouldNotMigrateDataFromPlain();
+ assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+ rocksDBStore.close();
+
+ // Verify the final state of both column families
+ verifyPlainUpgradeColumnFamilies();
+ }
+
+ @Test
+ public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
// Prepare a plain key-value store (with data in default column family)
prepareKeyValueStore();
- // Try to open with RocksDBTimestampedStoreWithHeaders - should throw
exception
- final ProcessorStateException exception =
assertThrows(ProcessorStateException.class,
- () -> rocksDBStore.init(context, rocksDBStore));
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ // Verify we can read the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+
+ rocksDBStore.close();
+
+ // Verify column family structure
+ verifyPlainStoreUpgrade();
+ }
+
+ private void verifyPlainStoreUpgrade() throws Exception {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultCF = null;
+ ColumnFamilyHandle headersCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultCF = columnFamilies.get(0);
+ headersCF = columnFamilies.get(1);
+
+ // After get() is called, data is migrated and deleted from
DEFAULT CF
+ // DEFAULT CF should be empty for migrated keys
+ assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to
be deleted from DEFAULT CF after migration");
+ assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to
be deleted from DEFAULT CF after migration");
- assertTrue(exception.getMessage().contains("Cannot upgrade directly
from key-value store to headers-aware store"));
- assertTrue(exception.getMessage().contains("Please first upgrade to
RocksDBTimestampedStore"));
+ // Headers CF should have the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
db.get(headersCF, "key1".getBytes()).length);
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
db.get(headersCF, "key2".getBytes()).length);
+ } finally {
+ if (defaultCF != null) {
+ defaultCF.close();
+ }
+ if (headersCF != null) {
+ headersCF.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ @Test
+ public void shouldFailWhenBothPlainAndTimestampedDataExist() {
+ // This is an invalid state - we can't have both DEFAULT CF with data
AND LEGACY_TIMESTAMPED CF
+ // Step 1: Create a plain store with two keys
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ kvStore.init(context, kvStore);
+ kvStore.put(new Bytes("plainKey1".getBytes()), "value1".getBytes());
+ kvStore.put(new Bytes("plainKey2".getBytes()), "value2".getBytes());
+ kvStore.close();
+
+ // Step 2: Open as timestamped store and migrate one key (simulating
partial migration)
+ final RocksDBTimestampedStore timestampedStore = new
RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE);
+ timestampedStore.init(context, timestampedStore);
+ timestampedStore.get(new Bytes("plainKey1".getBytes())); // Triggers
migration of plainKey1 to LEGACY_TIMESTAMPED CF
+ timestampedStore.close();
+ // Now we have: plainKey2 in DEFAULT CF, plainKey1 in
LEGACY_TIMESTAMPED CF
+
+ // Step 3: Try to open with headers store - should fail
+ final ProcessorStateException exception = assertThrows(
+ ProcessorStateException.class,
+ () -> rocksDBStore.init(context, rocksDBStore)
+ );
+
+ assertTrue(exception.getMessage().contains("Inconsistent store
state"));
+ assertTrue(exception.getMessage().contains("Cannot have both plain
(DEFAULT) and timestamped data simultaneously"));
+ assertTrue(exception.getMessage().contains("Headers store can upgrade
from either plain or timestamped format, but not both."));
}
@Test
@@ -154,7 +310,7 @@ public class RocksDBTimestampedStoreWithHeadersTest extends
RocksDBStoreTest {
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
rocksDBStore.init(context, rocksDBStore);
- assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode"));
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from timestamped store"));
}
assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries in legacy CF and 0 in headers-aware CF before migration");
@@ -462,16 +618,126 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
}
private void verifyStillInUpgradeMode() {
- // check that still in upgrade mode
+ // check that still in upgrade mode from timestamped store
try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
rocksDBStore.init(context, rocksDBStore);
- assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode"));
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from timestamped store"));
} finally {
rocksDBStore.close();
}
}
+ private void iteratorsShouldNotMigrateDataFromPlain() {
+ // iterating should not migrate any data, but return all keys over
both CFs
+ // Values from DEFAULT CF are converted to header-aware format on the
fly: [0x00][timestamp=-1][value]
+ try (final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all())
{
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key1".getBytes(), keyValue.key.get()); //
migrated by get()
+ assertEquals(10, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(1) = 10 bytes for
key1 from headers-aware CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
// inserted by putIfAbsent()
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1',
'1', '1', '1', '1', '1', '1', '1'}, keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get()); //
migrated by put()
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'},
keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get()); //
not migrated since not accessed, should be converted on-the-fly from DEFAULT CF
+ assertEquals(13, keyValue.value.length,
+ "Expected header-aware format: varint(1) + empty
headers(0) + timestamp(8) + value(4) = 13 bytes for key4 from DEFAULT CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get()); //
migrated by putIfAbsent with null value
+ assertEquals(14, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(5) = 14 bytes for
key5 from headers-aware CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key7".getBytes(), keyValue.key.get()); //
not migrated since not accessed
+ assertEquals(16, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(7) = 16 bytes for
key7 from DEFAULT CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key8new".getBytes(), keyValue.key.get());
// inserted by put()
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8',
'8', '8', '8', '8'}, keyValue.value);
+ }
+ assertFalse(itAll.hasNext());
+ }
+ }
+
+ private void verifyPlainUpgradeColumnFamilies() throws Exception {
+ // In upgrade scenario from plain RocksDBStore,
+ // we expect 2 CFs: DEFAULT (legacy), keyValueWithTimestampAndHeaders
(new)
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultCF = null;
+ ColumnFamilyHandle headersCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultCF = columnFamilies.get(0);
+ headersCF = columnFamilies.get(1);
+
+ // DEFAULT CF should have migrated keys as null, un-migrated as
plain values
+ assertNull(db.get(defaultCF, "unknown".getBytes()));
+ assertNull(db.get(defaultCF, "key1".getBytes())); // migrated
+ assertNull(db.get(defaultCF, "key2".getBytes())); // migrated
+ assertNull(db.get(defaultCF, "key3".getBytes())); // deleted
+ assertEquals(4, db.get(defaultCF, "key4".getBytes()).length); //
not migrated
+ assertNull(db.get(defaultCF, "key5".getBytes())); // migrated
+ assertNull(db.get(defaultCF, "key6".getBytes())); // migrated
+ assertEquals(7, db.get(defaultCF, "key7".getBytes()).length); //
not migrated
+ assertNull(db.get(defaultCF, "key8new".getBytes()));
+ assertNull(db.get(defaultCF, "key9new".getBytes()));
+ assertNull(db.get(defaultCF, "key11new".getBytes()));
+ assertNull(db.get(defaultCF, "key12new".getBytes()));
+
+ // Headers CF should have all migrated/new keys with header-aware
format
+ assertNull(db.get(headersCF, "unknown".getBytes()));
+ assertEquals(1 + 0 + 8 + 1, db.get(headersCF,
"key1".getBytes()).length); // migrated by get()
+ assertEquals("headers+timestamp+22".getBytes().length,
db.get(headersCF, "key2".getBytes()).length); // migrated by put() => value is
inserted without any conversion
+ assertNull(db.get(headersCF, "key3".getBytes())); // deleted
+ assertNull(db.get(headersCF, "key4".getBytes())); // not migrated
+ assertEquals(1 + 0 + 8 + 5, db.get(headersCF,
"key5".getBytes()).length); // migrated by putIfAbsent
+ assertNull(db.get(headersCF, "key6".getBytes())); // migrated by
delete() => deleted
+ assertNull(db.get(headersCF, "key7".getBytes())); // not migrated
+ assertEquals("headers+timestamp+88888888".getBytes().length,
db.get(headersCF, "key8new".getBytes()).length); // added by put() => value is
inserted without any conversion
+ assertNull(db.get(headersCF, "key9new".getBytes()));
+ assertEquals("headers+timestamp+11111111111".getBytes().length,
db.get(headersCF, "key11new".getBytes()).length); // inserted (newly added) by
putIfAbsent() => value is inserted without any conversion
+ assertNull(db.get(headersCF, "key12new".getBytes()));
+ } finally {
+ if (defaultCF != null) {
+ defaultCF.close();
+ }
+ if (headersCF != null) {
+ headersCF.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
private void clearLegacyColumnFamily() throws Exception {
// clear legacy timestamped CF by deleting remaining keys
final DBOptions dbOptions = new DBOptions();
@@ -562,6 +828,30 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
}
}
+ @Test
+ public void shouldTransitionFromPlainUpgradeModeToRegularMode() {
+ // Prepare plain store
+ prepareKeyValueStore();
+
+ // Open in upgrade mode
+ rocksDBStore.init(context, rocksDBStore);
+
+ // Migrate all data by reading it (lazy migration deletes from DEFAULT
CF)
+ rocksDBStore.get(new Bytes("key1".getBytes()));
+ rocksDBStore.get(new Bytes("key2".getBytes()));
+
+ rocksDBStore.close();
+
+ // Reopen - should now be in regular mode since DEFAULT CF is empty
after migration
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in regular headers-aware mode"));
+ } finally {
+ rocksDBStore.close();
+ }
+ }
+
@Test
public void shouldNotSupportDowngradeFromHeadersAwareToRegularStore() {
// prepare headers-aware store with data
@@ -622,6 +912,25 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
}
}
+ private void prepareKeyValueStoreWithMultipleKeys() {
+ // Create a plain RocksDBStore with multiple keys for comprehensive
migration testing
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ try {
+ kvStore.init(context, kvStore);
+
+ // Write plain key-value pairs to default column family
+ kvStore.put(new Bytes("key1".getBytes()), "1".getBytes());
+ kvStore.put(new Bytes("key2".getBytes()), "22".getBytes());
+ kvStore.put(new Bytes("key3".getBytes()), "333".getBytes());
+ kvStore.put(new Bytes("key4".getBytes()), "4444".getBytes());
+ kvStore.put(new Bytes("key5".getBytes()), "55555".getBytes());
+ kvStore.put(new Bytes("key6".getBytes()), "666666".getBytes());
+ kvStore.put(new Bytes("key7".getBytes()), "7777777".getBytes());
+ } finally {
+ kvStore.close();
+ }
+ }
+
private void prepareTimestampedStore() {
// Create a legacy RocksDBTimestampedStore to test upgrade scenario
final RocksDBTimestampedStore timestampedStore = new
RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
index 8330c746e31..77a3be9ca56 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -211,24 +211,15 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
}
@Test
- public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
- when(supplier.name()).thenReturn("test-store");
- when(supplier.metricsScope()).thenReturn("metricScope");
- when(supplier.get()).thenReturn(new RocksDBStore("test-store",
"metrics-scope"));
-
- builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
- supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime()
- );
-
- final IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () ->
builder.withLoggingDisabled().withCachingDisabled().build()
- );
+ public void shouldWrapPlainKeyValueStoreAsHeadersStore() {
+ setUp();
+ when(supplier.get()).thenReturn(new RocksDBStore("name",
"metrics-scope"));
- assertTrue(exception.getMessage().contains("Provided store must be a
timestamped store"));
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertInstanceOf(PlainToHeadersStoreAdapter.class,
((WrappedStateStore) store).wrapped());
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
new file mode 100644
index 00000000000..561743d49d5
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+
+public class UtilsTest {
+
+ @Test
+ public void shouldExtractRawPlainValue() {
+ // Format: [headersSize(varint)][headers][timestamp(8)][value]
+ // Create a value with headers size=0, timestamp=-1, value="test"
+ final byte[] value = "test".getBytes();
+ final ByteBuffer buffer = ByteBuffer.allocate(1 + 8 + value.length);
+ buffer.put((byte) 0x00); // headers size = 0
+ buffer.putLong(-1L); // timestamp = -1
+ buffer.put(value);
+
+ final byte[] result = rawPlainValue(buffer.array());
+
+ assertArrayEquals(value, result);
+ }
+
+ @Test
+ public void shouldReturnNullForNullRawPlainValue() {
+ assertNull(rawPlainValue(null));
+ }
+
+}
\ No newline at end of file