This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ceec1c2c8eb KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders
(1/N) (#21444)
ceec1c2c8eb is described below
commit ceec1c2c8eb5f5f9f7c9622145b516a736189c88
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 11 09:25:48 2026 +0100
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (1/N) (#21444)
This PR extracts the `DualColumnFamilyAccessor` from
`RocksDBTimestampedStore` and generalizes it so it can be used by both
`RocksDBTimestampedStore` and headers-aware stores. It also adds
corresponding unit tests.
Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
<[email protected]>
---------
Co-authored-by: TengYao Chi <[email protected]>
---
.../state/internals/DualColumnFamilyAccessor.java | 465 +++++++++++++++++++++
.../state/internals/RocksDBTimestampedStore.java | 416 +-----------------
.../internals/DualColumnFamilyAccessorTest.java | 427 +++++++++++++++++++
3 files changed, 902 insertions(+), 406 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
new file mode 100644
index 00000000000..11b4c11b511
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static
org.apache.kafka.streams.state.internals.RocksDBStore.incrementWithoutOverflow;
+
+/**
+ * A generic implementation of {@link RocksDBStore.ColumnFamilyAccessor} that
supports dual-column-family
+ * upgrade scenarios. This class manages two column families:
+ * <ul>
+ * <li>oldColumnFamily: contains legacy data in the old format</li>
+ * <li>newColumnFamily: contains data in the new format</li>
+ * </ul>
+ *
+ * When reading, it first checks the new column family, then falls back to the
old column family
+ * and converts values on-the-fly using the provided conversion function.
+ */
+class DualColumnFamilyAccessor implements RocksDBStore.ColumnFamilyAccessor {
+
+ private final ColumnFamilyHandle oldColumnFamily;
+ private final ColumnFamilyHandle newColumnFamily;
+ private final Function<byte[], byte[]> valueConverter;
+ private final RocksDBStore store;
+
+ /**
+ * Constructs a DualColumnFamilyAccessor.
+ *
+ * @param oldColumnFamily the column family containing legacy data
+ * @param newColumnFamily the column family for new format data
+ * @param valueConverter function to convert old format values to new
format
+ * @param store the RocksDBStore instance (for accessing
position, context, and name)
+ */
+ DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily,
+ final ColumnFamilyHandle newColumnFamily,
+ final Function<byte[], byte[]> valueConverter,
+ final RocksDBStore store) {
+ this.oldColumnFamily = oldColumnFamily;
+ this.newColumnFamily = newColumnFamily;
+ this.valueConverter = valueConverter;
+ this.store = store;
+ }
+
+ @Override
+ public void put(final DBAccessor accessor,
+ final byte[] key,
+ final byte[] value) {
+ synchronized (store.position) {
+ if (value == null) {
+ try {
+ accessor.delete(oldColumnFamily, key);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while removing
key from store " + store.name(), e);
+ }
+ try {
+ accessor.delete(newColumnFamily, key);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while removing
key from store " + store.name(), e);
+ }
+ } else {
+ try {
+ accessor.delete(oldColumnFamily, key);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while removing
key from store " + store.name(), e);
+ }
+ try {
+ accessor.put(newColumnFamily, key, value);
+ StoreQueryUtils.updatePosition(store.position,
store.context);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while putting
key/value into store " + store.name(), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
+ final WriteBatchInterface batch) throws
RocksDBException {
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
+ Objects.requireNonNull(entry.key, "key cannot be null");
+ addToBatch(entry.key.get(), entry.value, batch);
+ }
+ }
+
+ @Override
+ public byte[] get(final DBAccessor accessor, final byte[] key)
+ throws RocksDBException {
+ return get(accessor, key, Optional.empty());
+ }
+
+ @Override
+ public byte[] get(final DBAccessor accessor, final byte[] key,
+ final ReadOptions readOptions)
+ throws RocksDBException {
+ return get(accessor, key, Optional.of(readOptions));
+ }
+
+ private byte[] get(final DBAccessor accessor, final byte[] key,
+ final Optional<ReadOptions> readOptions)
+ throws RocksDBException {
+ final byte[] valueInNewFormat = readOptions.isPresent()
+ ? accessor.get(newColumnFamily, readOptions.get(), key)
+ : accessor.get(newColumnFamily, key);
+ if (valueInNewFormat != null) {
+ return valueInNewFormat;
+ }
+
+ final byte[] valueInOldFormat = readOptions.isPresent()
+ ? accessor.get(oldColumnFamily, readOptions.get(), key)
+ : accessor.get(oldColumnFamily, key);
+ if (valueInOldFormat != null) {
+ final byte[] convertedValue =
valueConverter.apply(valueInOldFormat);
+ // This does only work because the changelog topic contains
correct data already.
+ // For other format changes, we cannot take this short cut and can
only migrate data
+ // from old to new store on put().
+ put(accessor, key, convertedValue);
+ return convertedValue;
+ }
+
+ return null;
+ }
+
+ @Override
+ public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws
RocksDBException {
+ final byte[] valueInNewFormat = accessor.get(newColumnFamily, key);
+ if (valueInNewFormat != null) {
+ return valueInNewFormat;
+ }
+
+ final byte[] valueInOldFormat = accessor.get(oldColumnFamily, key);
+ if (valueInOldFormat != null) {
+ return valueConverter.apply(valueInOldFormat);
+ }
+
+ return null;
+ }
+
+ @Override
+ public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor
accessor,
+ final Bytes from,
+ final Bytes to,
+ final boolean forward)
{
+ return new RocksDBDualCFRangeIterator(
+ store.name(),
+ accessor.newIterator(newColumnFamily),
+ accessor.newIterator(oldColumnFamily),
+ from,
+ to,
+ forward,
+ true,
+ valueConverter);
+ }
+
+ @Override
+ public void deleteRange(final DBAccessor accessor, final byte[] from,
final byte[] to) {
+ try {
+ accessor.deleteRange(oldColumnFamily, from, to);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while removing key from
store " + store.name(), e);
+ }
+ try {
+ accessor.deleteRange(newColumnFamily, from, to);
+ } catch (final RocksDBException e) {
+ throw new ProcessorStateException("Error while removing key from
store " + store.name(), e);
+ }
+ }
+
+ @Override
+ public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor
accessor,
+ final boolean forward) {
+ final RocksIterator innerIterNew =
accessor.newIterator(newColumnFamily);
+ final RocksIterator innerIterOld =
accessor.newIterator(oldColumnFamily);
+ if (forward) {
+ innerIterNew.seekToFirst();
+ innerIterOld.seekToFirst();
+ } else {
+ innerIterNew.seekToLast();
+ innerIterOld.seekToLast();
+ }
+ return new RocksDBDualCFIterator(store.name(), innerIterNew,
innerIterOld, forward, valueConverter);
+ }
+
+ @Override
+ public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor
accessor,
+ final Bytes
prefix) {
+ final Bytes to = incrementWithoutOverflow(prefix);
+ return new RocksDBDualCFRangeIterator(
+ store.name(),
+ accessor.newIterator(newColumnFamily),
+ accessor.newIterator(oldColumnFamily),
+ prefix,
+ to,
+ true,
+ false,
+ valueConverter
+ );
+ }
+
+ @Override
+ public long approximateNumEntries(final DBAccessor accessor)
+ throws RocksDBException {
+ return accessor.approximateNumEntries(oldColumnFamily)
+ + accessor.approximateNumEntries(newColumnFamily);
+ }
+
+ @Override
+ public void commit(final DBAccessor accessor,
+ final Map<TopicPartition, Long> changelogOffsets)
throws RocksDBException {
+ accessor.flush(oldColumnFamily, newColumnFamily);
+ }
+
+ @Override
+ public void addToBatch(final byte[] key,
+ final byte[] value,
+ final WriteBatchInterface batch) throws
RocksDBException {
+ if (value == null) {
+ batch.delete(oldColumnFamily, key);
+ batch.delete(newColumnFamily, key);
+ } else {
+ batch.delete(oldColumnFamily, key);
+ batch.put(newColumnFamily, key, value);
+ }
+ }
+
+ @Override
+ public void close() {
+ oldColumnFamily.close();
+ newColumnFamily.close();
+ }
+
+ private static class RocksDBDualCFIterator
+ extends org.apache.kafka.common.utils.AbstractIterator<KeyValue<Bytes,
byte[]>>
+ implements ManagedKeyValueIterator<Bytes, byte[]> {
+
+ // RocksDB's JNI interface does not expose getters/setters that allow
the
+ // comparator to be pluggable, and the default is lexicographic, so
it's
+ // safe to just force lexicographic comparator here for now.
+ private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
+
+ private final String storeName;
+ private final RocksIterator iterNewFormat;
+ private final RocksIterator iterOldFormat;
+ private final boolean forward;
+ private final Function<byte[], byte[]> valueConverter;
+
+ private volatile boolean open = true;
+
+ private byte[] nextNewFormat;
+ private byte[] nextOldFormat;
+ private KeyValue<Bytes, byte[]> next;
+ private Runnable closeCallback = null;
+
+ RocksDBDualCFIterator(final String storeName,
+ final RocksIterator iterNewFormat,
+ final RocksIterator iterOldFormat,
+ final boolean forward,
+ final Function<byte[], byte[]> valueConverter) {
+ this.iterNewFormat = iterNewFormat;
+ this.iterOldFormat = iterOldFormat;
+ this.storeName = storeName;
+ this.forward = forward;
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public synchronized boolean hasNext() {
+ if (!open) {
+ throw new
org.apache.kafka.streams.errors.InvalidStateStoreException(
+ String.format("RocksDB iterator for store %s has
closed", storeName));
+ }
+ return super.hasNext();
+ }
+
+ @Override
+ public synchronized KeyValue<Bytes, byte[]> next() {
+ return super.next();
+ }
+
+ @Override
+ protected KeyValue<Bytes, byte[]> makeNext() {
+ if (nextOldFormat == null && iterOldFormat.isValid()) {
+ nextOldFormat = iterOldFormat.key();
+ }
+
+ if (nextNewFormat == null && iterNewFormat.isValid()) {
+ nextNewFormat = iterNewFormat.key();
+ }
+
+ if (nextOldFormat == null && !iterOldFormat.isValid()) {
+ if (nextNewFormat == null && !iterNewFormat.isValid()) {
+ return allDone();
+ } else {
+ next = KeyValue.pair(new Bytes(nextNewFormat),
iterNewFormat.value());
+ nextNewFormat = null;
+ if (forward) {
+ iterNewFormat.next();
+ } else {
+ iterNewFormat.prev();
+ }
+ }
+ } else {
+ if (nextNewFormat == null) {
+ next = KeyValue.pair(new Bytes(nextOldFormat),
valueConverter.apply(iterOldFormat.value()));
+ nextOldFormat = null;
+ if (forward) {
+ iterOldFormat.next();
+ } else {
+ iterOldFormat.prev();
+ }
+ } else {
+ if (forward) {
+ if (comparator.compare(nextOldFormat, nextNewFormat)
<= 0) {
+ next = KeyValue.pair(new Bytes(nextOldFormat),
valueConverter.apply(iterOldFormat.value()));
+ nextOldFormat = null;
+ iterOldFormat.next();
+ } else {
+ next = KeyValue.pair(new Bytes(nextNewFormat),
iterNewFormat.value());
+ nextNewFormat = null;
+ iterNewFormat.next();
+ }
+ } else {
+ if (comparator.compare(nextOldFormat, nextNewFormat)
>= 0) {
+ next = KeyValue.pair(new Bytes(nextOldFormat),
valueConverter.apply(iterOldFormat.value()));
+ nextOldFormat = null;
+ iterOldFormat.prev();
+ } else {
+ next = KeyValue.pair(new Bytes(nextNewFormat),
iterNewFormat.value());
+ nextNewFormat = null;
+ iterNewFormat.prev();
+ }
+ }
+ }
+ }
+ return next;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closeCallback == null) {
+ throw new IllegalStateException(
+ "RocksDBDualCFIterator expects close callback to be
set immediately upon creation");
+ }
+ closeCallback.run();
+
+ iterOldFormat.close();
+ iterNewFormat.close();
+ open = false;
+ }
+
+ @Override
+ public Bytes peekNextKey() {
+ if (!hasNext()) {
+ throw new java.util.NoSuchElementException();
+ }
+ return next.key;
+ }
+
+ @Override
+ public void onClose(final Runnable closeCallback) {
+ this.closeCallback = closeCallback;
+ }
+ }
+
+ private static class RocksDBDualCFRangeIterator extends
RocksDBDualCFIterator {
+ // RocksDB's JNI interface does not expose getters/setters that allow the
+ // comparator to be pluggable, and the default is lexicographic, so it's
+ // safe to just force lexicographic comparator here for now.
+ private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
+ private final byte[] rawLastKey;
+ private final boolean forward;
+ private final boolean toInclusive;
+
+ RocksDBDualCFRangeIterator(final String storeName,
+ final RocksIterator iterNewFormat,
+ final RocksIterator iterOldFormat,
+ final Bytes from,
+ final Bytes to,
+ final boolean forward,
+ final boolean toInclusive,
+ final Function<byte[], byte[]>
valueConverter) {
+ super(storeName, iterNewFormat, iterOldFormat, forward,
valueConverter);
+ this.forward = forward;
+ this.toInclusive = toInclusive;
+ if (forward) {
+ if (from == null) {
+ iterNewFormat.seekToFirst();
+ iterOldFormat.seekToFirst();
+ } else {
+ iterNewFormat.seek(from.get());
+ iterOldFormat.seek(from.get());
+ }
+ rawLastKey = to == null ? null : to.get();
+ } else {
+ if (to == null) {
+ iterNewFormat.seekToLast();
+ iterOldFormat.seekToLast();
+ } else {
+ iterNewFormat.seekForPrev(to.get());
+ iterOldFormat.seekForPrev(to.get());
+ }
+ rawLastKey = from == null ? null : from.get();
+ }
+ }
+
+ @Override
+ protected KeyValue<Bytes, byte[]> makeNext() {
+ final KeyValue<Bytes, byte[]> next = super.makeNext();
+
+ if (next == null) {
+ return allDone();
+ } else if (rawLastKey == null) {
+ // null means range endpoint is open
+ return next;
+ } else {
+ if (forward) {
+ if (comparator.compare(next.key.get(), rawLastKey) < 0) {
+ return next;
+ } else if (comparator.compare(next.key.get(), rawLastKey)
== 0) {
+ return toInclusive ? next : allDone();
+ } else {
+ return allDone();
+ }
+ } else {
+ if (comparator.compare(next.key.get(), rawLastKey) >= 0) {
+ return next;
+ } else {
+ return allDone();
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 36262cb8e0d..441c6620079 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -16,12 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -29,23 +23,13 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteBatchInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
-import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Optional;
-
-import static
org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
/**
* A persistent key-(value-timestamp) store based on RocksDB.
@@ -56,7 +40,7 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
private static final byte[] TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME =
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
public RocksDBTimestampedStore(final String name,
- final String metricsScope) {
+ final String metricsScope) {
super(name, metricsScope);
}
@@ -70,9 +54,9 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
- dbOptions,
- new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
- new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ dbOptions,
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions)
);
final ColumnFamilyHandle noTimestampColumnFamily =
columnFamilies.get(0);
final ColumnFamilyHandle withTimestampColumnFamily =
columnFamilies.get(1);
@@ -81,7 +65,12 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
- cfAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily,
withTimestampColumnFamily);
+ cfAccessor = new DualColumnFamilyAccessor(
+ noTimestampColumnFamily,
+ withTimestampColumnFamily,
+ TimestampedBytesStore::convertToTimestampedFormat,
+ this
+ );
} else {
log.info("Opening store {} in regular mode", name);
cfAccessor = new
SingleColumnFamilyAccessor(withTimestampColumnFamily);
@@ -90,389 +79,4 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
noTimestampsIter.close();
}
- private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
- private final ColumnFamilyHandle oldColumnFamily;
- private final ColumnFamilyHandle newColumnFamily;
-
- private DualColumnFamilyAccessor(final ColumnFamilyHandle
oldColumnFamily,
- final ColumnFamilyHandle
newColumnFamily) {
- this.oldColumnFamily = oldColumnFamily;
- this.newColumnFamily = newColumnFamily;
- }
-
- @Override
- public void put(final DBAccessor accessor,
- final byte[] key,
- final byte[] valueWithTimestamp) {
- synchronized (position) {
- if (valueWithTimestamp == null) {
- try {
- accessor.delete(oldColumnFamily, key);
- } catch (final RocksDBException e) {
- // String format is happening in wrapping stores. So
formatted message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while
removing key from store " + name, e);
- }
- try {
- accessor.delete(newColumnFamily, key);
- } catch (final RocksDBException e) {
- // String format is happening in wrapping stores. So
formatted message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while
removing key from store " + name, e);
- }
- } else {
- try {
- accessor.delete(oldColumnFamily, key);
- } catch (final RocksDBException e) {
- // String format is happening in wrapping stores. So
formatted message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while
removing key from store " + name, e);
- }
- try {
- accessor.put(newColumnFamily, key, valueWithTimestamp);
- StoreQueryUtils.updatePosition(position, context);
- } catch (final RocksDBException e) {
- // String format is happening in wrapping stores. So
formatted message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while putting
key/value into store " + name, e);
- }
- }
- }
- }
-
- @Override
- public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
- final WriteBatchInterface batch) throws
RocksDBException {
- for (final KeyValue<Bytes, byte[]> entry : entries) {
- Objects.requireNonNull(entry.key, "key cannot be null");
- addToBatch(entry.key.get(), entry.value, batch);
- }
- }
-
- @Override
- public byte[] get(final DBAccessor accessor, final byte[] key) throws
RocksDBException {
- return get(accessor, key, Optional.empty());
- }
-
- @Override
- public byte[] get(final DBAccessor accessor, final byte[] key, final
ReadOptions readOptions) throws RocksDBException {
- return get(accessor, key, Optional.of(readOptions));
- }
-
- private byte[] get(final DBAccessor accessor, final byte[] key, final
Optional<ReadOptions> readOptions) throws RocksDBException {
- final byte[] valueWithTimestamp = readOptions.isPresent() ?
accessor.get(newColumnFamily, readOptions.get(), key) :
accessor.get(newColumnFamily, key);
- if (valueWithTimestamp != null) {
- return valueWithTimestamp;
- }
-
- final byte[] plainValue = readOptions.isPresent() ?
accessor.get(oldColumnFamily, readOptions.get(), key) :
accessor.get(oldColumnFamily, key);
- if (plainValue != null) {
- final byte[] valueWithUnknownTimestamp =
convertToTimestampedFormat(plainValue);
- // this does only work, because the changelog topic contains
correct data already
- // for other format changes, we cannot take this short cut and
can only migrate data
- // from old to new store on put()
- put(accessor, key, valueWithUnknownTimestamp);
- return valueWithUnknownTimestamp;
- }
-
- return null;
- }
-
- @Override
- public byte[] getOnly(final DBAccessor accessor, final byte[] key)
throws RocksDBException {
- final byte[] valueWithTimestamp = accessor.get(newColumnFamily,
key);
- if (valueWithTimestamp != null) {
- return valueWithTimestamp;
- }
-
- final byte[] plainValue = accessor.get(oldColumnFamily, key);
- if (plainValue != null) {
- return convertToTimestampedFormat(plainValue);
- }
-
- return null;
- }
-
- @Override
- public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor
accessor,
- final Bytes from,
- final Bytes to,
- final boolean
forward) {
- return new RocksDBDualCFRangeIterator(
- name,
- accessor.newIterator(newColumnFamily),
- accessor.newIterator(oldColumnFamily),
- from,
- to,
- forward,
- true);
- }
-
- @Override
- public void deleteRange(final DBAccessor accessor, final byte[] from,
final byte[] to) {
- try {
- accessor.deleteRange(oldColumnFamily, from, to);
- } catch (final RocksDBException e) {
- // String format is happening in wrapping stores. So formatted
message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while removing key
from store " + name, e);
- }
- try {
- accessor.deleteRange(newColumnFamily, from, to);
- } catch (final RocksDBException e) {
- // String format is happening in wrapping stores. So formatted
message is thrown from wrapping stores.
- throw new ProcessorStateException("Error while removing key
from store " + name, e);
- }
- }
-
- @Override
- public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor
accessor, final boolean forward) {
- final RocksIterator innerIterWithTimestamp =
accessor.newIterator(newColumnFamily);
- final RocksIterator innerIterNoTimestamp =
accessor.newIterator(oldColumnFamily);
- if (forward) {
- innerIterWithTimestamp.seekToFirst();
- innerIterNoTimestamp.seekToFirst();
- } else {
- innerIterWithTimestamp.seekToLast();
- innerIterNoTimestamp.seekToLast();
- }
- return new RocksDBDualCFIterator(name, innerIterWithTimestamp,
innerIterNoTimestamp, forward);
- }
-
- @Override
- public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final
DBAccessor accessor, final Bytes prefix) {
- final Bytes to = incrementWithoutOverflow(prefix);
- return new RocksDBDualCFRangeIterator(
- name,
- accessor.newIterator(newColumnFamily),
- accessor.newIterator(oldColumnFamily),
- prefix,
- to,
- true,
- false
- );
- }
-
- @Override
- public long approximateNumEntries(final DBAccessor accessor) throws
RocksDBException {
- return accessor.approximateNumEntries(oldColumnFamily) +
- accessor.approximateNumEntries(newColumnFamily);
- }
-
- @Override
- public void commit(final DBAccessor accessor,
- final Map<TopicPartition, Long> changelogOffsets)
throws RocksDBException {
- accessor.flush(oldColumnFamily, newColumnFamily);
- }
-
- @Override
- public void addToBatch(final byte[] key,
- final byte[] value,
- final WriteBatchInterface batch) throws
RocksDBException {
- if (value == null) {
- batch.delete(oldColumnFamily, key);
- batch.delete(newColumnFamily, key);
- } else {
- batch.delete(oldColumnFamily, key);
- batch.put(newColumnFamily, key, value);
- }
- }
-
- @Override
- public void close() {
- oldColumnFamily.close();
- newColumnFamily.close();
- }
- }
-
- private static class RocksDBDualCFIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>>
- implements ManagedKeyValueIterator<Bytes, byte[]> {
-
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
-
- private final String storeName;
- private final RocksIterator iterWithTimestamp;
- private final RocksIterator iterNoTimestamp;
- private final boolean forward;
-
- private volatile boolean open = true;
-
- private byte[] nextWithTimestamp;
- private byte[] nextNoTimestamp;
- private KeyValue<Bytes, byte[]> next;
- private Runnable closeCallback = null;
-
- RocksDBDualCFIterator(final String storeName,
- final RocksIterator iterWithTimestamp,
- final RocksIterator iterNoTimestamp,
- final boolean forward) {
- this.iterWithTimestamp = iterWithTimestamp;
- this.iterNoTimestamp = iterNoTimestamp;
- this.storeName = storeName;
- this.forward = forward;
- }
-
- @Override
- public synchronized boolean hasNext() {
- if (!open) {
- throw new InvalidStateStoreException(String.format("RocksDB
iterator for store %s has closed", storeName));
- }
- return super.hasNext();
- }
-
- @Override
- public synchronized KeyValue<Bytes, byte[]> next() {
- return super.next();
- }
-
- @Override
- protected KeyValue<Bytes, byte[]> makeNext() {
- if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
- nextNoTimestamp = iterNoTimestamp.key();
- }
-
- if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
- nextWithTimestamp = iterWithTimestamp.key();
- }
-
- if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
- if (nextWithTimestamp == null && !iterWithTimestamp.isValid())
{
- return allDone();
- } else {
- next = KeyValue.pair(new Bytes(nextWithTimestamp),
iterWithTimestamp.value());
- nextWithTimestamp = null;
- if (forward) {
- iterWithTimestamp.next();
- } else {
- iterWithTimestamp.prev();
- }
- }
- } else {
- if (nextWithTimestamp == null) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp),
convertToTimestampedFormat(iterNoTimestamp.value()));
- nextNoTimestamp = null;
- if (forward) {
- iterNoTimestamp.next();
- } else {
- iterNoTimestamp.prev();
- }
- } else {
- if (forward) {
- if (comparator.compare(nextNoTimestamp,
nextWithTimestamp) <= 0) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp),
convertToTimestampedFormat(iterNoTimestamp.value()));
- nextNoTimestamp = null;
- iterNoTimestamp.next();
- } else {
- next = KeyValue.pair(new Bytes(nextWithTimestamp),
iterWithTimestamp.value());
- nextWithTimestamp = null;
- iterWithTimestamp.next();
- }
- } else {
- if (comparator.compare(nextNoTimestamp,
nextWithTimestamp) >= 0) {
- next = KeyValue.pair(new Bytes(nextNoTimestamp),
convertToTimestampedFormat(iterNoTimestamp.value()));
- nextNoTimestamp = null;
- iterNoTimestamp.prev();
- } else {
- next = KeyValue.pair(new Bytes(nextWithTimestamp),
iterWithTimestamp.value());
- nextWithTimestamp = null;
- iterWithTimestamp.prev();
- }
- }
- }
- }
- return next;
- }
-
- @Override
- public synchronized void close() {
- if (closeCallback == null) {
- throw new IllegalStateException("RocksDBDualCFIterator expects
close callback to be set immediately upon creation");
- }
- closeCallback.run();
-
- iterNoTimestamp.close();
- iterWithTimestamp.close();
- open = false;
- }
-
- @Override
- public Bytes peekNextKey() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return next.key;
- }
-
- @Override
- public void onClose(final Runnable closeCallback) {
- this.closeCallback = closeCallback;
- }
- }
-
- private static class RocksDBDualCFRangeIterator extends
RocksDBDualCFIterator {
- // RocksDB's JNI interface does not expose getters/setters that allow
the
- // comparator to be pluggable, and the default is lexicographic, so
it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator =
Bytes.BYTES_LEXICO_COMPARATOR;
- private final byte[] rawLastKey;
- private final boolean forward;
- private final boolean toInclusive;
-
- RocksDBDualCFRangeIterator(final String storeName,
- final RocksIterator iterWithTimestamp,
- final RocksIterator iterNoTimestamp,
- final Bytes from,
- final Bytes to,
- final boolean forward,
- final boolean toInclusive) {
- super(storeName, iterWithTimestamp, iterNoTimestamp, forward);
- this.forward = forward;
- this.toInclusive = toInclusive;
- if (forward) {
- if (from == null) {
- iterWithTimestamp.seekToFirst();
- iterNoTimestamp.seekToFirst();
- } else {
- iterWithTimestamp.seek(from.get());
- iterNoTimestamp.seek(from.get());
- }
- rawLastKey = to == null ? null : to.get();
- } else {
- if (to == null) {
- iterWithTimestamp.seekToLast();
- iterNoTimestamp.seekToLast();
- } else {
- iterWithTimestamp.seekForPrev(to.get());
- iterNoTimestamp.seekForPrev(to.get());
- }
- rawLastKey = from == null ? null : from.get();
- }
- }
-
- @Override
- protected KeyValue<Bytes, byte[]> makeNext() {
- final KeyValue<Bytes, byte[]> next = super.makeNext();
-
- if (next == null) {
- return allDone();
- } else if (rawLastKey == null) {
- //null means range endpoint is open
- return next;
- } else {
- if (forward) {
- if (comparator.compare(next.key.get(), rawLastKey) < 0) {
- return next;
- } else if (comparator.compare(next.key.get(), rawLastKey)
== 0) {
- return toInclusive ? next : allDone();
- } else {
- return allDone();
- }
- } else {
- if (comparator.compare(next.key.get(), rawLastKey) >= 0) {
- return next;
- } else {
- return allDone();
- }
- }
- }
- }
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
new file mode 100644
index 00000000000..6fee5cb5718
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DualColumnFamilyAccessorTest {
+
+ @Mock
+ private ColumnFamilyHandle oldCF;
+
+ @Mock
+ private ColumnFamilyHandle newCF;
+
+ @Mock
+ private DBAccessor dbAccessor;
+
+ private Function<byte[], byte[]> valueConverter;
+ private DualColumnFamilyAccessor accessor;
+
+ private static final String STORE_NAME = "test-store";
+ private static final byte[] KEY = "key".getBytes();
+ private static final byte[] OLD_VALUE = "old-value".getBytes();
+ private static final byte[] NEW_VALUE = "new-value".getBytes();
+
+ @BeforeEach
+ public void setUp() {
+ // Create a real Position object
+ final Position position = Position.emptyPosition();
+
+ // Create a mock store with real position field
+ final RocksDBStore store = mock(RocksDBStore.class);
+ store.position = position;
+ store.context = mock(StateStoreContext.class);
+ lenient().when(store.name()).thenReturn(STORE_NAME);
+
+ // Value converter that adds a prefix to indicate conversion
+ valueConverter = oldValue -> {
+ if (oldValue == null) {
+ return null;
+ }
+ return ByteBuffer.allocate(oldValue.length +
10).put("converted:".getBytes()).put(oldValue).array();
+ };
+
+ accessor = new DualColumnFamilyAccessor(oldCF, newCF, valueConverter,
store);
+ }
+
+ @Test
+ public void shouldPutValueToNewColumnFamilyAndDeleteFromOld() throws
RocksDBException {
+ accessor.put(dbAccessor, KEY, NEW_VALUE);
+
+ verify(dbAccessor).delete(oldCF, KEY);
+ verify(dbAccessor).put(newCF, KEY, NEW_VALUE);
+ verify(dbAccessor, never()).delete(newCF, KEY);
+ }
+
+ @Test
+ public void shouldDeleteFromBothColumnFamiliesWhenValueIsNull() throws
RocksDBException {
+ accessor.put(dbAccessor, KEY, null);
+
+ verify(dbAccessor).delete(oldCF, KEY);
+ verify(dbAccessor).delete(newCF, KEY);
+ verify(dbAccessor, never()).put(any(), any(), any());
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionWhenPutFailsOnDeleteColumnFamily() throws
RocksDBException {
+ doThrow(new RocksDBException("Delete
failed")).when(dbAccessor).delete(oldCF, KEY);
+
+ ProcessorStateException exception =
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY,
NEW_VALUE));
+
+ assertEquals("Error while removing key from store " + STORE_NAME,
exception.getMessage());
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionWhenPutFailsOnNewColumnFamily() throws
RocksDBException {
+ doThrow(new RocksDBException("Put
failed")).when(dbAccessor).put(newCF, KEY, NEW_VALUE);
+
+ ProcessorStateException exception =
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY,
NEW_VALUE));
+
+ assertEquals("Error while putting key/value into store " + STORE_NAME,
exception.getMessage());
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionWhenDeleteFailsOnOldColumnFamilyWithNullValue()
throws RocksDBException {
+ doThrow(new RocksDBException("Delete
failed")).when(dbAccessor).delete(eq(oldCF), any(byte[].class));
+
+ ProcessorStateException exception =
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY,
null));
+
+ assertEquals("Error while removing key from store " + STORE_NAME,
exception.getMessage());
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionWhenDeleteFailsOnNewColumnFamilyWithNullValue()
throws RocksDBException {
+ lenient().doNothing().when(dbAccessor).delete(eq(oldCF),
any(byte[].class));
+ doThrow(new RocksDBException("Delete
failed")).when(dbAccessor).delete(eq(newCF), any(byte[].class));
+
+ final ProcessorStateException exception =
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY,
null));
+
+ assertEquals("Error while removing key from store " + STORE_NAME,
exception.getMessage());
+ verify(dbAccessor).delete(oldCF, KEY); // Should have tried to delete
from old first
+ }
+
+ @Test
+ public void shouldGetValueFromNewColumnFamily() throws RocksDBException {
+ when(dbAccessor.get(newCF, KEY)).thenReturn(NEW_VALUE);
+
+ final byte[] result = accessor.get(dbAccessor, KEY);
+
+ assertArrayEquals(NEW_VALUE, result);
+ verify(dbAccessor).get(newCF, KEY);
+ verify(dbAccessor, never()).get(oldCF, KEY);
+ }
+
+ @Test
+ public void shouldGetValueFromOldColumnFamilyAndConvert() throws
RocksDBException {
+ when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+ when(dbAccessor.get(oldCF, KEY)).thenReturn(OLD_VALUE);
+
+ final byte[] result = accessor.get(dbAccessor, KEY);
+
+ assertNotNull(result);
+ verify(dbAccessor).get(newCF, KEY);
+ verify(dbAccessor).get(oldCF, KEY);
+ // Should be converted value
+ assertArrayEquals(valueConverter.apply(OLD_VALUE), result);
+ // Should migrate the value to new CF
+ verify(dbAccessor).delete(oldCF, KEY);
+ verify(dbAccessor).put(eq(newCF), eq(KEY), any());
+ }
+
+ @Test
+ public void shouldReturnNullWhenKeyNotFoundInEitherColumnFamily() throws
RocksDBException {
+ when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+ when(dbAccessor.get(oldCF, KEY)).thenReturn(null);
+
+ final byte[] result = accessor.get(dbAccessor, KEY);
+
+ assertNull(result);
+ verify(dbAccessor).get(newCF, KEY);
+ verify(dbAccessor).get(oldCF, KEY);
+ }
+
+ @Test
+ public void shouldGetValueWithReadOptions() throws RocksDBException {
+ ReadOptions readOptions = mock(ReadOptions.class);
+ when(dbAccessor.get(newCF, readOptions, KEY)).thenReturn(NEW_VALUE);
+
+ final byte[] result = accessor.get(dbAccessor, KEY, readOptions);
+
+ assertArrayEquals(NEW_VALUE, result);
+ verify(dbAccessor).get(newCF, readOptions, KEY);
+ verify(dbAccessor, never()).get(eq(oldCF), eq(readOptions), eq(KEY));
+ }
+
+ @Test
+ public void shouldGetFromOldColumnFamilyWithReadOptionsAndConvert() throws
RocksDBException {
+ ReadOptions readOptions = mock(ReadOptions.class);
+ when(dbAccessor.get(newCF, readOptions, KEY)).thenReturn(null);
+ when(dbAccessor.get(oldCF, readOptions, KEY)).thenReturn(OLD_VALUE);
+
+ final byte[] result = accessor.get(dbAccessor, KEY, readOptions);
+
+ assertNotNull(result);
+ verify(dbAccessor).get(newCF, readOptions, KEY);
+ verify(dbAccessor).get(oldCF, readOptions, KEY);
+ // Should be converted value
+ assertArrayEquals(valueConverter.apply(OLD_VALUE), result);
+ // Should migrate the value to new CF
+ verify(dbAccessor).delete(oldCF, KEY);
+ verify(dbAccessor).put(eq(newCF), eq(KEY), any());
+ }
+
+ @Test
+ public void shouldGetOnlyFromOldColumnFamilyAndConvertWithoutMigration()
throws RocksDBException {
+ when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+ when(dbAccessor.get(oldCF, KEY)).thenReturn(OLD_VALUE);
+
+ final byte[] result = accessor.getOnly(dbAccessor, KEY);
+
+ assertNotNull(result);
+ verify(dbAccessor).get(newCF, KEY);
+ verify(dbAccessor).get(oldCF, KEY);
+ // Should be converted value
+ assertArrayEquals(valueConverter.apply(OLD_VALUE), result);
+ // getOnly should NOT migrate (no put/delete calls)
+ verify(dbAccessor, never()).put(any(), any(), any());
+ verify(dbAccessor, never()).delete(any(), any());
+ }
+
+ @Test
+ public void shouldGetOnlyReturnNullWhenNotFound() throws RocksDBException {
+ when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+ when(dbAccessor.get(oldCF, KEY)).thenReturn(null);
+
+ final byte[] result = accessor.getOnly(dbAccessor, KEY);
+
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldPrepareBatchWithMultipleEntries() throws
RocksDBException {
+ final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+ final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+ entries.add(new KeyValue<>(new Bytes("key1".getBytes()),
"value1".getBytes()));
+ entries.add(new KeyValue<>(new Bytes("key2".getBytes()),
"value2".getBytes()));
+ entries.add(new KeyValue<>(new Bytes("key3".getBytes()), null)); //
Delete
+
+ accessor.prepareBatch(entries, batch);
+
+ verify(batch).delete(oldCF, "key1".getBytes());
+ verify(batch).put(newCF, "key1".getBytes(), "value1".getBytes());
+ verify(batch).delete(oldCF, "key2".getBytes());
+ verify(batch).put(newCF, "key2".getBytes(), "value2".getBytes());
+ verify(batch).delete(oldCF, "key3".getBytes());
+ verify(batch).delete(newCF, "key3".getBytes());
+ }
+
+ @Test
+ public void shouldThrowNPEWhenBatchEntryHasNullKey() {
+ final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+ final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+ entries.add(new KeyValue<>(null, "value".getBytes()));
+
+ assertThrows(NullPointerException.class, () ->
accessor.prepareBatch(entries, batch));
+ }
+
+ @Test
+ public void shouldAddToBatchDeletesFromOldAndPutsToNew() throws
RocksDBException {
+ final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+
+ accessor.addToBatch(KEY, NEW_VALUE, batch);
+
+ verify(batch).delete(oldCF, KEY);
+ verify(batch).put(newCF, KEY, NEW_VALUE);
+ }
+
+ @Test
+ public void shouldAddToBatchDeletesFromBothWhenValueIsNull() throws
RocksDBException {
+ final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+
+ accessor.addToBatch(KEY, null, batch);
+
+ verify(batch).delete(oldCF, KEY);
+ verify(batch).delete(newCF, KEY);
+ verify(batch, never()).put(any(ColumnFamilyHandle.class),
any(byte[].class), any(byte[].class));
+ }
+
+ @Test
+ public void shouldDeleteRangeFromBothColumnFamilies() throws
RocksDBException {
+ final byte[] from = "a".getBytes();
+ final byte[] to = "z".getBytes();
+
+ accessor.deleteRange(dbAccessor, from, to);
+
+ verify(dbAccessor).deleteRange(oldCF, from, to);
+ verify(dbAccessor).deleteRange(newCF, from, to);
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionWhenDeleteRangeFailsOnOldColumnFamily()
throws RocksDBException {
+ final byte[] from = "a".getBytes();
+ final byte[] to = "z".getBytes();
+
+ doThrow(new RocksDBException("Delete range
failed")).when(dbAccessor).deleteRange(eq(oldCF), any(byte[].class),
any(byte[].class));
+
+ ProcessorStateException exception =
assertThrows(ProcessorStateException.class, () ->
accessor.deleteRange(dbAccessor, from, to));
+
+ assertEquals("Error while removing key from store " + STORE_NAME,
exception.getMessage());
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionWhenDeleteRangeFailsOnNewColumnFamily()
throws RocksDBException {
+ final byte[] from = "a".getBytes();
+ final byte[] to = "z".getBytes();
+
+ lenient().doNothing().when(dbAccessor).deleteRange(eq(oldCF),
any(byte[].class), any(byte[].class));
+ doThrow(new RocksDBException("Delete range
failed")).when(dbAccessor).deleteRange(eq(newCF), any(byte[].class),
any(byte[].class));
+
+ ProcessorStateException exception =
assertThrows(ProcessorStateException.class, () ->
accessor.deleteRange(dbAccessor, from, to));
+
+ assertEquals("Error while removing key from store " + STORE_NAME,
exception.getMessage());
+ verify(dbAccessor).deleteRange(oldCF, from, to);
+ }
+
+ @Test
+ public void shouldReturnSumOfEntriesFromBothColumnFamilies() throws
RocksDBException {
+ when(dbAccessor.approximateNumEntries(oldCF)).thenReturn(100L);
+ when(dbAccessor.approximateNumEntries(newCF)).thenReturn(50L);
+
+ final long result = accessor.approximateNumEntries(dbAccessor);
+
+ assertEquals(150L, result);
+ }
+
+ @Test
+ public void shouldFlushBothColumnFamiliesOnCommit() throws
RocksDBException {
+ final Map<TopicPartition, Long> offsets = new HashMap<>();
+ offsets.put(new TopicPartition("topic", 0), 100L);
+
+ accessor.commit(dbAccessor, offsets);
+
+ verify(dbAccessor).flush(oldCF, newCF);
+ }
+
+ @Test
+ public void shouldCreateRangeIterator() {
+ final RocksIterator iterNewFormat = mock(RocksIterator.class);
+ final RocksIterator oldIterFormat = mock(RocksIterator.class);
+ when(dbAccessor.newIterator(newCF)).thenReturn(iterNewFormat);
+ when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+ final Bytes from = new Bytes("a".getBytes());
+ final Bytes to = new Bytes("z".getBytes());
+
+ final ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.range(dbAccessor, from, to, true);
+
+ assertNotNull(iterator);
+ verify(dbAccessor).newIterator(newCF);
+ verify(dbAccessor).newIterator(oldCF);
+ }
+
+ @Test
+ public void shouldCreateAllIteratorForward() {
+ final RocksIterator newIterFormat = mock(RocksIterator.class);
+ final RocksIterator oldIterFormat = mock(RocksIterator.class);
+ when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
+ when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+ ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.all(dbAccessor, true);
+
+ assertNotNull(iterator);
+ verify(oldIterFormat).seekToFirst();
+ verify(oldIterFormat).seekToFirst();
+ }
+
+ @Test
+ public void shouldCreateAllIteratorReverse() {
+ final RocksIterator newIterFormat = mock(RocksIterator.class);
+ final RocksIterator oldIterFormat = mock(RocksIterator.class);
+ when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
+ when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+ final ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.all(dbAccessor, false);
+
+ assertNotNull(iterator);
+ verify(newIterFormat).seekToLast();
+ verify(oldIterFormat).seekToLast();
+ }
+
+ @Test
+ public void shouldCreatePrefixScanIterator() {
+ final RocksIterator newIterFormat = mock(RocksIterator.class);
+ final RocksIterator oldIterFormat = mock(RocksIterator.class);
+ when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
+ when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+ final Bytes prefix = new Bytes("prefix".getBytes());
+
+ final ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.prefixScan(dbAccessor, prefix);
+
+ assertNotNull(iterator);
+ verify(dbAccessor).newIterator(newCF);
+ verify(dbAccessor).newIterator(oldCF);
+ }
+
+ @Test
+ public void shouldCloseBothColumnFamilies() {
+ accessor.close();
+
+ verify(oldCF).close();
+ verify(newCF).close();
+ }
+}
\ No newline at end of file