This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 060650ed21f KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders
(2/N) (#21446)
060650ed21f is described below
commit 060650ed21f2e1bfd52262c653dc0d7696bf4dd2
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 18 03:47:22 2026 +0100
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (2/N) (#21446)
This PR add RocksDBTimestampedStoreWithHeaders (and the corresponding
unit test) for the TimestampedKeyValueStoreWithHeaders introduced in
KIP-1271.
Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../state/internals/RocksDBTimestampedStore.java | 2 +-
.../RocksDBTimestampedStoreWithHeaders.java | 151 ++++++
.../RocksDBTimestampedStoreWithHeadersTest.java | 601 +++++++++++++++++++++
3 files changed, 753 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 441c6620079..07508fe7494 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -37,7 +37,7 @@ import java.util.List;
public class RocksDBTimestampedStore extends RocksDBStore implements
TimestampedBytesStore {
private static final Logger log =
LoggerFactory.getLogger(RocksDBTimestampedStore.class);
- private static final byte[] TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME =
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+ static final byte[] TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME =
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
public RocksDBTimestampedStore(final String name,
final String metricsScope) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
new file mode 100644
index 00000000000..c4dbb6ea259
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore
implements HeadersBytesStore {
+
+ private static final Logger log =
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+ /**
+ * Legacy column family name - must match {@code
RocksDBTimestampedStore#TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME}
+ */
+
+ private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+ RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME;
+
+ private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+ "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+ public RocksDBTimestampedStoreWithHeaders(final String name,
+ final String metricsScope) {
+ super(name, metricsScope);
+ }
+
+ RocksDBTimestampedStoreWithHeaders(final String name,
+ final String parentDir,
+ final RocksDBMetricsRecorder
metricsRecorder) {
+ super(name, parentDir, metricsRecorder);
+ }
+
+ @Override
+ void openRocksDB(final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions) {
+ // Check if we're upgrading from RocksDBTimestampedStore (which uses
keyValueWithTimestamp CF)
+ final List<byte[]> existingCFs;
+ try (final Options options = new Options(dbOptions, new
ColumnFamilyOptions())) {
+ existingCFs = RocksDB.listColumnFamilies(options,
dbDir.getAbsolutePath());
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error listing column families
for store " + name, e);
+ }
+
+
+ final boolean upgradingFromTimestampedStore = existingCFs.stream()
+ .anyMatch(cf -> Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME));
+
+ if (upgradingFromTimestampedStore) {
+ openInUpgradeMode(dbOptions, columnFamilyOptions);
+ } else {
+ openInRegularMode(dbOptions, columnFamilyOptions);
+ }
+ }
+
+ private void openInUpgradeMode(final DBOptions dbOptions,
+ final ColumnFamilyOptions
columnFamilyOptions) {
+ final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+ dbOptions,
+ // we have to open the default CF to be able to open the legacy
CF, but we won't use it
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions)
+ );
+
+ verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+
+ final ColumnFamilyHandle legacyCf = columnFamilies.get(1);
+ final ColumnFamilyHandle headersCf = columnFamilies.get(2);
+
+ // Check if legacy CF has data
+ try (final RocksIterator legacyIter = db.newIterator(legacyCf)) {
+ legacyIter.seekToFirst();
+ if (legacyIter.isValid()) {
+ log.info("Opening store {} in upgrade mode", name);
+ cfAccessor = new DualColumnFamilyAccessor(legacyCf, headersCf,
+ HeadersBytesStore::convertToHeaderFormat, this);
+ } else {
+ log.info("Opening store {} in regular headers-aware mode",
name);
+ cfAccessor = new SingleColumnFamilyAccessor(headersCf);
+ try {
+ db.dropColumnFamily(legacyCf);
+ } catch (final RocksDBException e) {
+ throw new RuntimeException(e);
+ } finally {
+ legacyCf.close();
+ }
+ }
+ }
+ }
+
+ private void openInRegularMode(final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions) {
+ final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+ dbOptions,
+ // we have to open the default CF to be able to open the legacy
CF, but we won't use it
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions)
+ );
+
+ verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+
+ final ColumnFamilyHandle headersCf = columnFamilies.get(1);
+ log.info("Opening store {} in regular headers-aware mode", name);
+ cfAccessor = new SingleColumnFamilyAccessor(headersCf);
+ }
+
+ private void verifyAndCloseEmptyDefaultColumnFamily(final
ColumnFamilyHandle columnFamilyHandle) {
+ try (final RocksIterator defaultIter =
db.newIterator(columnFamilyHandle)) {
+ defaultIter.seekToFirst();
+ if (defaultIter.isValid()) {
+ throw new ProcessorStateException("Cannot upgrade directly
from key-value store to headers-aware store for " + name + ". " +
+ "Please first upgrade to RocksDBTimestampedStore, then
upgrade to RocksDBTimestampedStoreWithHeaders.");
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
new file mode 100644
index 00000000000..7b1bef67ad3
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RocksDBTimestampedStoreWithHeadersTest extends RocksDBStoreTest {
+
+ private final Serializer<String> stringSerializer = new StringSerializer();
+
+ RocksDBStore getRocksDBStore() {
+ return new RocksDBTimestampedStoreWithHeaders(DB_NAME, METRICS_SCOPE);
+ }
+
+ @Test
+ public void shouldOpenNewStoreInRegularMode() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in regular headers-aware mode"));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> iterator =
rocksDBStore.all()) {
+ assertFalse(iterator.hasNext());
+ }
+ }
+
+ @Test
+ public void shouldOpenExistingStoreInRegularMode() throws Exception {
+ final String key = "key";
+ final String value = "timestampedWithHeaders";
+ // prepare store
+ rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.put(new Bytes(key.getBytes()), value.getBytes());
+ rocksDBStore.close();
+
+ // re-open store
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in regular headers-aware mode"));
+ } finally {
+ rocksDBStore.close();
+ }
+
+ // verify store
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+
+ RocksDB db = null;
+ ColumnFamilyHandle defaultColumnFamily = null, headersColumnFamily =
null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultColumnFamily = columnFamilies.get(0);
+ headersColumnFamily = columnFamilies.get(1);
+
+ assertNull(db.get(defaultColumnFamily, "key".getBytes()));
+ assertEquals(0L, db.getLongProperty(defaultColumnFamily,
"rocksdb.estimate-num-keys"));
+ assertEquals(value.getBytes().length, db.get(headersColumnFamily,
"key".getBytes()).length);
+ assertEquals(1L, db.getLongProperty(headersColumnFamily,
"rocksdb.estimate-num-keys"));
+ } finally {
+ // Order of closing must follow: ColumnFamilyHandle > RocksDB >
DBOptions > ColumnFamilyOptions
+ if (defaultColumnFamily != null) {
+ defaultColumnFamily.close();
+ }
+ if (headersColumnFamily != null) {
+ headersColumnFamily.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ @Test
+ public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+ // Prepare a plain key-value store (with data in default column family)
+ prepareKeyValueStore();
+
+ // Try to open with RocksDBTimestampedStoreWithHeaders - should throw
exception
+ final ProcessorStateException exception =
assertThrows(ProcessorStateException.class,
+ () -> rocksDBStore.init(context, rocksDBStore));
+
+ assertTrue(exception.getMessage().contains("Cannot upgrade directly
from key-value store to headers-aware store"));
+ assertTrue(exception.getMessage().contains("Please first upgrade to
RocksDBTimestampedStore"));
+ }
+
+ @Test
+ public void shouldMigrateFromTimestampedToHeadersAwareColumnFamily()
throws Exception {
+ prepareTimestampedStore();
+
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect legacy
CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode"));
+ }
+
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries in legacy CF and 0 in headers-aware CF before migration");
+
+ // get() - tests lazy migration on read
+
+ assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())),
"Expected null for unknown key");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries on legacy CF, 0 in headers-aware CF");
+
+ assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new
Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(1) = 10 bytes");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6
entries on legacy CF, 1 in headers-aware CF after migrating key1");
+
+ // put() - tests migration on write
+
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"headers+timestamp+22".getBytes());
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5
entries on legacy CF, 2 in headers-aware CF after migrating key2 with put()");
+
+ rocksDBStore.put(new Bytes("key3".getBytes()), null);
+ // count is off by one, due to two delete operations (even if one does
not delete anything)
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on legacy CF, 1 in headers-aware CF after deleting key3 with put()");
+
+ rocksDBStore.put(new Bytes("key8new".getBytes()),
"headers+timestamp+88888888".getBytes());
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on legacy CF, 2 in headers-aware CF after adding new key8new with
put()");
+
+ // putIfAbsent() - tests migration on conditional write
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
"headers+timestamp+11111111111".getBytes()),
+ "Expected null return value for putIfAbsent on non-existing
key11new, and new key should be added to headers-aware CF");
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 2
entries on legacy CF, 3 in headers-aware CF after adding new key11new with
putIfAbsent()");
+
+ assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new
Bytes("key5".getBytes()), null).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 1
entry on legacy CF, 4 in headers-aware CF after migrating key5 with
putIfAbsent(null)");
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null));
+ // two delete operation, however, only one is counted because old CF
count can not be less than 0
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 0
entries on legacy CF, 3 in headers-aware CF after putIfAbsent with null on
non-existing key12new");
+
+ // delete() - tests migration on delete
+
+ assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new
Bytes("key6".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+ // two delete operation, however, only one is counted because old CF
count was zero before already
+ assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 0
entries on legacy CF, 2 in headers-aware CF after deleting key6 with delete()");
+
+ // iterators should not trigger migration (read-only)
+ iteratorsShouldNotMigrateData();
+ assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+ rocksDBStore.close();
+
+ // Verify the final state of both column families
+ verifyOldAndNewColumnFamily();
+ }
+
+ private void iteratorsShouldNotMigrateData() {
+ // iterating should not migrate any data, but return all keys over
both CFs
+ // Values from legacy CF are converted to header-aware format on the
fly: 1 byte + [timestamp][value]
+ try (final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all())
{
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key1".getBytes(), keyValue.key.get()); //
migrated by get()
+ assertEquals(10, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(1) = 10 bytes for
key1 from headers-aware CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
// inserted by putIfAbsent()
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1',
'1', '1', '1', '1', '1', '1', '1'}, keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get()); //
migrated by put()
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'},
keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get()); //
not migrated since not accessed, should still be in legacy format:
[timestamp(8)][value], with 1 byte of varint, but without headers
+ assertEquals(13, keyValue.value.length,
+ "Expected header-aware format: varint(1) + empty
headers(0) + timestamp(8) + value(4) = 13 bytes for key4 from legacy CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get()); //
migrated by putIfAbsent with null value, should be in header-aware format but
with former value: varint(1) + empty headers(0) + timestamp(8) + value(5) = 14
bytes
+ assertEquals(14, keyValue.value.length, "Expected header-aware
format: varint(0) + empty headers(0) + timestamp(8) + value(5) = 14 bytes for
key5 from headers-aware CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key7".getBytes(), keyValue.key.get()); //
not migrated since not accessed, should still be in legacy format:
[timestamp(8)][value], with 1 byte of varint, but without headers
+ assertEquals(16, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(7) = 16 bytes for
key7 from legacy CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key8new".getBytes(), keyValue.key.get());
// inserted by put()
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8',
'8', '8', '8', '8'}, keyValue.value);
+ }
+ assertFalse(itAll.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> it =
+ rocksDBStore.range(new Bytes("key2".getBytes()), new
Bytes("key5".getBytes()))) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get());
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'},
keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get());
+ assertEquals(13, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(4) = 13 bytes for
key4 from legacy CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get());
+ assertEquals(14, keyValue.value.length, "Expected header-aware
format: varint(0) + empty headers(0) + timestamp(8) + value(5) = 14 bytes for
key5 from headers-aware CF");
+ }
+ assertFalse(it.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> itAll =
rocksDBStore.reverseAll()) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key8new".getBytes(), keyValue.key.get());
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8',
'8', '8', '8', '8'}, keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key7".getBytes(), keyValue.key.get());
+ assertEquals(16, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(7) = 16 bytes for
key7 from legacy CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get());
+ assertEquals(14, keyValue.value.length, "Expected header-aware
format: varint(0) + empty headers(0) + timestamp(8) + value(5) = 14 bytes for
key5 from headers-aware CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get());
+ assertEquals(13, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(4) = 13 bytes for
key4 from legacy CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get());
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'},
keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1',
'1', '1', '1', '1', '1', '1', '1'}, keyValue.value);
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key1".getBytes(), keyValue.key.get());
+ assertEquals(10, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(1) = 10 bytes for
key1 from headers-aware CF");
+ }
+ assertFalse(itAll.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> it =
+ rocksDBStore.reverseRange(new
Bytes("key2".getBytes()), new Bytes("key5".getBytes()))) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get());
+ assertEquals(14, keyValue.value.length, "Expected header-aware
format: varint(0) + empty headers(0) + timestamp(8) + value(5) = 14 bytes for
key5 from headers-aware CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get());
+ assertEquals(13, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(4) = 13 bytes for
key4 from legacy CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get());
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'},
keyValue.value);
+ }
+ assertFalse(it.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> it =
rocksDBStore.prefixScan("key1", stringSerializer)) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key1".getBytes(), keyValue.key.get());
+ assertEquals(10, keyValue.value.length, "Expected header-aware
format: varint(1) + empty headers(0) + timestamp(8) + value(1) = 10 bytes for
key1 from headers-aware CF");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
+ assertArrayEquals(new byte[]{'h', 'e', 'a', 'd', 'e', 'r',
's', '+', 't', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1',
'1', '1', '1', '1', '1', '1', '1'}, keyValue.value);
+ }
+ assertFalse(it.hasNext());
+ }
+ }
+
+ private void verifyOldAndNewColumnFamily() throws Exception {
+ // In upgrade scenario from RocksDBTimestampedStore,
+ // we expect 3 CFs: DEFAULT (closed on open), keyValueWithTimestamp
(legacy), keyValueWithTimestampAndHeaders (new)
+ verifyColumnFamilyContents();
+ verifyStillInUpgradeMode();
+ clearLegacyColumnFamily();
+ verifyLegacyColumnFamilyDropped();
+ verifyInHeadersAwareMode();
+ }
+
+ private void verifyColumnFamilyContents() throws Exception {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultColumnFamily = null;
+ ColumnFamilyHandle legacyTimestampedColumnFamily = null;
+ ColumnFamilyHandle headersColumnFamily = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultColumnFamily = columnFamilies.get(0);
+ legacyTimestampedColumnFamily = columnFamilies.get(1);
+ headersColumnFamily = columnFamilies.get(2);
+
+ verifyDefaultColumnFamily(db, defaultColumnFamily);
+ verifyLegacyTimestampedColumnFamily(db,
legacyTimestampedColumnFamily);
+ verifyHeadersColumnFamily(db, headersColumnFamily);
+ } finally {
+ closeColumnFamilies(db, defaultColumnFamily,
legacyTimestampedColumnFamily, headersColumnFamily);
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ private void verifyDefaultColumnFamily(final RocksDB db, final
ColumnFamilyHandle defaultColumnFamily) {
+ // DEFAULT CF should be empty (closed on open)
+ try (final RocksIterator iterator =
db.newIterator(defaultColumnFamily)) {
+ iterator.seekToFirst();
+ assertFalse(iterator.isValid(), "Expected no keys in default CF");
+ }
+ }
+
+ private void verifyLegacyTimestampedColumnFamily(final RocksDB db, final
ColumnFamilyHandle legacyTimestampedColumnFamily) throws Exception {
+ // Legacy timestamped CF should have migrated keys as null,
un-migrated as timestamped values
+ assertNull(db.get(legacyTimestampedColumnFamily,
"unknown".getBytes()));
+ assertNull(db.get(legacyTimestampedColumnFamily, "key1".getBytes()));
// migrated
+ assertNull(db.get(legacyTimestampedColumnFamily, "key2".getBytes()));
// migrated
+ assertNull(db.get(legacyTimestampedColumnFamily, "key3".getBytes()));
// deleted
+ assertEquals(8 + 4, db.get(legacyTimestampedColumnFamily,
"key4".getBytes()).length); // not migrated
+ assertNull(db.get(legacyTimestampedColumnFamily, "key5".getBytes()));
// migrated
+ assertNull(db.get(legacyTimestampedColumnFamily, "key6".getBytes()));
// migrated
+ assertEquals(8 + 7, db.get(legacyTimestampedColumnFamily,
"key7".getBytes()).length); // not migrated
+ assertNull(db.get(legacyTimestampedColumnFamily,
"key8new".getBytes()));
+ assertNull(db.get(legacyTimestampedColumnFamily,
"key11new".getBytes()));
+ }
+
+ private void verifyHeadersColumnFamily(final RocksDB db, final
ColumnFamilyHandle headersColumnFamily) throws Exception {
+ // Headers CF should have all migrated/new keys with header-aware
format
+ assertNull(db.get(headersColumnFamily, "unknown".getBytes()));
+ assertEquals(1 + 0 + 8 + 1, db.get(headersColumnFamily,
"key1".getBytes()).length); // migrated by get()
+ assertEquals("headers+timestamp+22".getBytes().length,
db.get(headersColumnFamily, "key2".getBytes()).length); // migrated by put() =>
value is inserted without any conversion
+ assertNull(db.get(headersColumnFamily, "key3".getBytes())); //
migrated by put() with null value => deleted
+ assertNull(db.get(headersColumnFamily, "key4".getBytes())); // not
migrated, should still be in legacy column family
+ assertEquals(1 + 0 + 8 + 5, db.get(headersColumnFamily,
"key5".getBytes()).length); // migrated by putIfAbsent with null value, should
be in header-aware format but with former value
+ assertNull(db.get(headersColumnFamily, "key6".getBytes())); //
migrated by delete() => deleted
+ assertNull(db.get(headersColumnFamily, "key7".getBytes())); // not
migrated, should still be in legacy column family
+ assertEquals("headers+timestamp+88888888".getBytes().length,
db.get(headersColumnFamily, "key8new".getBytes()).length); // added by put() =>
value is inserted without any conversion
+ assertEquals("headers+timestamp+11111111111".getBytes().length,
db.get(headersColumnFamily, "key11new".getBytes()).length); // inserted (newly
added) by putIfAbsent() => value is inserted without any conversion
+ assertNull(db.get(headersColumnFamily, "key12new".getBytes())); //
putIfAbsent with null value on non-existing key should not create any entry
+ }
+
+ private void closeColumnFamilies(
+ final RocksDB db,
+ final ColumnFamilyHandle defaultColumnFamily,
+ final ColumnFamilyHandle legacyTimestampedColumnFamily,
+ final ColumnFamilyHandle headersColumnFamily) {
+ // Order of closing must follow: ColumnFamilyHandle > RocksDB
+ if (defaultColumnFamily != null) {
+ defaultColumnFamily.close();
+ }
+ if (legacyTimestampedColumnFamily != null) {
+ legacyTimestampedColumnFamily.close();
+ }
+ if (headersColumnFamily != null) {
+ headersColumnFamily.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ }
+
+ private void verifyStillInUpgradeMode() {
+ // check that still in upgrade mode
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode"));
+ } finally {
+ rocksDBStore.close();
+ }
+ }
+
+ private void clearLegacyColumnFamily() throws Exception {
+ // clear legacy timestamped CF by deleting remaining keys
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultCF = null;
+ ColumnFamilyHandle legacyCF = null;
+ ColumnFamilyHandle headersCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultCF = columnFamilies.get(0);
+ legacyCF = columnFamilies.get(1);
+ headersCF = columnFamilies.get(2);
+ db.delete(legacyCF, "key4".getBytes());
+ db.delete(legacyCF, "key7".getBytes());
+ } finally {
+ // Order of closing must follow: ColumnFamilyHandle > RocksDB >
DBOptions > ColumnFamilyOptions
+ if (defaultCF != null) {
+ defaultCF.close();
+ }
+ if (legacyCF != null) {
+ legacyCF.close();
+ }
+ if (headersCF != null) {
+ headersCF.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ private void verifyLegacyColumnFamilyDropped() throws Exception {
+ // Open and close the store to trigger the legacy CF drop (when it
detects empty legacy CF)
+ rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.close();
+
+ // Verify that the legacy column family no longer exists
+ try (DBOptions dbOptions = new DBOptions(); final Options options =
new Options(dbOptions, new ColumnFamilyOptions())) {
+ final List<byte[]> existingCFs = RocksDB.listColumnFamilies(
+ options,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath()
+ );
+
+ // Should only have DEFAULT and keyValueWithTimestampAndHeaders,
not the legacy keyValueWithTimestamp
+ assertEquals(2, existingCFs.size(), "Expected only 2 column
families after legacy CF is dropped");
+
+ boolean hasDefault = false;
+ boolean hasHeadersAware = false;
+ boolean hasLegacy = false;
+
+ for (final byte[] cf : existingCFs) {
+ if (java.util.Arrays.equals(cf,
RocksDB.DEFAULT_COLUMN_FAMILY)) {
+ hasDefault = true;
+ } else if (java.util.Arrays.equals(cf,
"keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8))) {
+ hasHeadersAware = true;
+ } else if (java.util.Arrays.equals(cf,
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8))) {
+ hasLegacy = true;
+ }
+ }
+
+ assertTrue(hasDefault, "Expected default column family to exist");
+ assertTrue(hasHeadersAware, "Expected headers-aware column family
to exist");
+ assertFalse(hasLegacy, "Expected legacy column family to be
dropped");
+ }
+ }
+
+ private void verifyInHeadersAwareMode() {
+ // check that now in regular header-aware mode (all legacy data
migrated)
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in regular headers-aware mode"));
+ }
+ }
+
+ private void prepareKeyValueStore() {
+ // Create a plain RocksDBStore (key-value, not timestamped) with data
in default column family
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ try {
+ kvStore.init(context, kvStore);
+
+ // Write plain key-value pairs to default column family
+ kvStore.put(new Bytes("key1".getBytes()), "value1".getBytes());
+ kvStore.put(new Bytes("key2".getBytes()), "value2".getBytes());
+ } finally {
+ kvStore.close();
+ }
+ }
+
+ private void prepareTimestampedStore() {
+ // Create a legacy RocksDBTimestampedStore to test upgrade scenario
+ final RocksDBTimestampedStore timestampedStore = new
RocksDBTimestampedStore(DB_NAME, METRICS_SCOPE);
+ try {
+ timestampedStore.init(context, timestampedStore);
+
+ // Write timestamped values (timestamp = -1 for unknown timestamp)
+ timestampedStore.put(new Bytes("key1".getBytes()),
wrapTimestampedValue("1".getBytes()));
+ timestampedStore.put(new Bytes("key2".getBytes()),
wrapTimestampedValue("22".getBytes()));
+ timestampedStore.put(new Bytes("key3".getBytes()),
wrapTimestampedValue("333".getBytes()));
+ timestampedStore.put(new Bytes("key4".getBytes()),
wrapTimestampedValue("4444".getBytes()));
+ timestampedStore.put(new Bytes("key5".getBytes()),
wrapTimestampedValue("55555".getBytes()));
+ timestampedStore.put(new Bytes("key6".getBytes()),
wrapTimestampedValue("666666".getBytes()));
+ timestampedStore.put(new Bytes("key7".getBytes()),
wrapTimestampedValue("7777777".getBytes()));
+ } finally {
+ timestampedStore.close();
+ }
+ }
+
+ private byte[] wrapTimestampedValue(final byte[] value) {
+ // Format: [timestamp(8 bytes)][value]
+ // Use the numeric value as timestamp
+ final long timestamp = Long.parseLong(new String(value));
+ final byte[] result = new byte[8 + value.length];
+
+ // Convert timestamp to big-endian 8-byte array
+ result[0] = (byte) (timestamp >> 56);
+ result[1] = (byte) (timestamp >> 48);
+ result[2] = (byte) (timestamp >> 40);
+ result[3] = (byte) (timestamp >> 32);
+ result[4] = (byte) (timestamp >> 24);
+ result[5] = (byte) (timestamp >> 16);
+ result[6] = (byte) (timestamp >> 8);
+ result[7] = (byte) timestamp;
+
+ System.arraycopy(value, 0, result, 8, value.length);
+ return result;
+ }
+}