This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ceec1c2c8eb KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders 
(1/N) (#21444)
ceec1c2c8eb is described below

commit ceec1c2c8eb5f5f9f7c9622145b516a736189c88
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 11 09:25:48 2026 +0100

    KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (1/N) (#21444)
    
    This PR extracts the `DualColumnFamilyAccessor` from
    `RocksDBTimestampedStore` and generalizes it so it can be used by both
    `RocksDBTimestampedStore` and headers-aware stores. It also adds
    corresponding unit tests.
    
    Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
     <[email protected]>
    
    ---------
    
    Co-authored-by: TengYao Chi <[email protected]>
---
 .../state/internals/DualColumnFamilyAccessor.java  | 465 +++++++++++++++++++++
 .../state/internals/RocksDBTimestampedStore.java   | 416 +-----------------
 .../internals/DualColumnFamilyAccessorTest.java    | 427 +++++++++++++++++++
 3 files changed, 902 insertions(+), 406 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
new file mode 100644
index 00000000000..11b4c11b511
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static 
org.apache.kafka.streams.state.internals.RocksDBStore.incrementWithoutOverflow;
+
+/**
+ * A generic implementation of {@link RocksDBStore.ColumnFamilyAccessor} that 
supports dual-column-family
+ * upgrade scenarios. This class manages two column families:
+ * <ul>
+ *   <li>oldColumnFamily: contains legacy data in the old format</li>
+ *   <li>newColumnFamily: contains data in the new format</li>
+ * </ul>
+ *
+ * When reading, it first checks the new column family, then falls back to the 
old column family
+ * and converts values on-the-fly using the provided conversion function.
+ */
+class DualColumnFamilyAccessor implements RocksDBStore.ColumnFamilyAccessor {
+
+    private final ColumnFamilyHandle oldColumnFamily;
+    private final ColumnFamilyHandle newColumnFamily;
+    private final Function<byte[], byte[]> valueConverter;
+    private final RocksDBStore store;
+
+    /**
+     * Constructs a DualColumnFamilyAccessor.
+     *
+     * @param oldColumnFamily the column family containing legacy data
+     * @param newColumnFamily the column family for new format data
+     * @param valueConverter  function to convert old format values to new 
format
+     * @param store           the RocksDBStore instance (for accessing 
position, context, and name)
+     */
+    DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily,
+                             final ColumnFamilyHandle newColumnFamily,
+                             final Function<byte[], byte[]> valueConverter,
+                             final RocksDBStore store) {
+        this.oldColumnFamily = oldColumnFamily;
+        this.newColumnFamily = newColumnFamily;
+        this.valueConverter = valueConverter;
+        this.store = store;
+    }
+
+    @Override
+    public void put(final DBAccessor accessor,
+                    final byte[] key,
+                    final byte[] value) {
+        synchronized (store.position) {
+            if (value == null) {
+                try {
+                    accessor.delete(oldColumnFamily, key);
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error while removing 
key from store " + store.name(), e);
+                }
+                try {
+                    accessor.delete(newColumnFamily, key);
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error while removing 
key from store " + store.name(), e);
+                }
+            } else {
+                try {
+                    accessor.delete(oldColumnFamily, key);
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error while removing 
key from store " + store.name(), e);
+                }
+                try {
+                    accessor.put(newColumnFamily, key, value);
+                    StoreQueryUtils.updatePosition(store.position, 
store.context);
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error while putting 
key/value into store " + store.name(), e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
+                             final WriteBatchInterface batch) throws 
RocksDBException {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            Objects.requireNonNull(entry.key, "key cannot be null");
+            addToBatch(entry.key.get(), entry.value, batch);
+        }
+    }
+
+    @Override
+    public byte[] get(final DBAccessor accessor, final byte[] key)
+        throws RocksDBException {
+        return get(accessor, key, Optional.empty());
+    }
+
+    @Override
+    public byte[] get(final DBAccessor accessor, final byte[] key,
+                      final ReadOptions readOptions)
+        throws RocksDBException {
+        return get(accessor, key, Optional.of(readOptions));
+    }
+
+    private byte[] get(final DBAccessor accessor, final byte[] key,
+                       final Optional<ReadOptions> readOptions)
+        throws RocksDBException {
+        final byte[] valueInNewFormat = readOptions.isPresent()
+                ? accessor.get(newColumnFamily, readOptions.get(), key)
+                : accessor.get(newColumnFamily, key);
+        if (valueInNewFormat != null) {
+            return valueInNewFormat;
+        }
+
+        final byte[] valueInOldFormat = readOptions.isPresent()
+                ? accessor.get(oldColumnFamily, readOptions.get(), key)
+                : accessor.get(oldColumnFamily, key);
+        if (valueInOldFormat != null) {
+            final byte[] convertedValue = 
valueConverter.apply(valueInOldFormat);
+            // This does only work because the changelog topic contains 
correct data already.
+            // For other format changes, we cannot take this short cut and can 
only migrate data
+            // from old to new store on put().
+            put(accessor, key, convertedValue);
+            return convertedValue;
+        }
+
+        return null;
+    }
+
+    @Override
+    public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws 
RocksDBException {
+        final byte[] valueInNewFormat = accessor.get(newColumnFamily, key);
+        if (valueInNewFormat != null) {
+            return valueInNewFormat;
+        }
+
+        final byte[] valueInOldFormat = accessor.get(oldColumnFamily, key);
+        if (valueInOldFormat != null) {
+            return valueConverter.apply(valueInOldFormat);
+        }
+
+        return null;
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor 
accessor,
+                                                        final Bytes from,
+                                                        final Bytes to,
+                                                        final boolean forward) 
{
+        return new RocksDBDualCFRangeIterator(
+            store.name(),
+            accessor.newIterator(newColumnFamily),
+            accessor.newIterator(oldColumnFamily),
+            from,
+            to,
+            forward,
+            true,
+            valueConverter);
+    }
+
+    @Override
+    public void deleteRange(final DBAccessor accessor, final byte[] from, 
final byte[] to) {
+        try {
+            accessor.deleteRange(oldColumnFamily, from, to);
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + store.name(), e);
+        }
+        try {
+            accessor.deleteRange(newColumnFamily, from, to);
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + store.name(), e);
+        }
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor 
accessor,
+                                                      final boolean forward) {
+        final RocksIterator innerIterNew = 
accessor.newIterator(newColumnFamily);
+        final RocksIterator innerIterOld = 
accessor.newIterator(oldColumnFamily);
+        if (forward) {
+            innerIterNew.seekToFirst();
+            innerIterOld.seekToFirst();
+        } else {
+            innerIterNew.seekToLast();
+            innerIterOld.seekToLast();
+        }
+        return new RocksDBDualCFIterator(store.name(), innerIterNew, 
innerIterOld, forward, valueConverter);
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor 
accessor,
+                                                             final Bytes 
prefix) {
+        final Bytes to = incrementWithoutOverflow(prefix);
+        return new RocksDBDualCFRangeIterator(
+            store.name(),
+            accessor.newIterator(newColumnFamily),
+            accessor.newIterator(oldColumnFamily),
+            prefix,
+            to,
+            true,
+            false,
+            valueConverter
+        );
+    }
+
+    @Override
+    public long approximateNumEntries(final DBAccessor accessor)
+        throws RocksDBException {
+        return accessor.approximateNumEntries(oldColumnFamily)
+                + accessor.approximateNumEntries(newColumnFamily);
+    }
+
+    @Override
+    public void commit(final DBAccessor accessor,
+                       final Map<TopicPartition, Long> changelogOffsets) 
throws RocksDBException {
+        accessor.flush(oldColumnFamily, newColumnFamily);
+    }
+
+    @Override
+    public void addToBatch(final byte[] key,
+                           final byte[] value,
+                           final WriteBatchInterface batch) throws 
RocksDBException {
+        if (value == null) {
+            batch.delete(oldColumnFamily, key);
+            batch.delete(newColumnFamily, key);
+        } else {
+            batch.delete(oldColumnFamily, key);
+            batch.put(newColumnFamily, key, value);
+        }
+    }
+
+    @Override
+    public void close() {
+        oldColumnFamily.close();
+        newColumnFamily.close();
+    }
+
+    private static class RocksDBDualCFIterator
+        extends org.apache.kafka.common.utils.AbstractIterator<KeyValue<Bytes, 
byte[]>>
+        implements ManagedKeyValueIterator<Bytes, byte[]> {
+
+        // RocksDB's JNI interface does not expose getters/setters that allow 
the
+        // comparator to be pluggable, and the default is lexicographic, so 
it's
+        // safe to just force lexicographic comparator here for now.
+        private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
+
+        private final String storeName;
+        private final RocksIterator iterNewFormat;
+        private final RocksIterator iterOldFormat;
+        private final boolean forward;
+        private final Function<byte[], byte[]> valueConverter;
+
+        private volatile boolean open = true;
+
+        private byte[] nextNewFormat;
+        private byte[] nextOldFormat;
+        private KeyValue<Bytes, byte[]> next;
+        private Runnable closeCallback = null;
+
+        RocksDBDualCFIterator(final String storeName,
+                              final RocksIterator iterNewFormat,
+                              final RocksIterator iterOldFormat,
+                              final boolean forward,
+                              final Function<byte[], byte[]> valueConverter) {
+            this.iterNewFormat = iterNewFormat;
+            this.iterOldFormat = iterOldFormat;
+            this.storeName = storeName;
+            this.forward = forward;
+            this.valueConverter = valueConverter;
+        }
+
+        @Override
+        public synchronized boolean hasNext() {
+            if (!open) {
+                throw new 
org.apache.kafka.streams.errors.InvalidStateStoreException(
+                        String.format("RocksDB iterator for store %s has 
closed", storeName));
+            }
+            return super.hasNext();
+        }
+
+        @Override
+        public synchronized KeyValue<Bytes, byte[]> next() {
+            return super.next();
+        }
+
+        @Override
+        protected KeyValue<Bytes, byte[]> makeNext() {
+            if (nextOldFormat == null && iterOldFormat.isValid()) {
+                nextOldFormat = iterOldFormat.key();
+            }
+
+            if (nextNewFormat == null && iterNewFormat.isValid()) {
+                nextNewFormat = iterNewFormat.key();
+            }
+
+            if (nextOldFormat == null && !iterOldFormat.isValid()) {
+                if (nextNewFormat == null && !iterNewFormat.isValid()) {
+                    return allDone();
+                } else {
+                    next = KeyValue.pair(new Bytes(nextNewFormat), 
iterNewFormat.value());
+                    nextNewFormat = null;
+                    if (forward) {
+                        iterNewFormat.next();
+                    } else {
+                        iterNewFormat.prev();
+                    }
+                }
+            } else {
+                if (nextNewFormat == null) {
+                    next = KeyValue.pair(new Bytes(nextOldFormat), 
valueConverter.apply(iterOldFormat.value()));
+                    nextOldFormat = null;
+                    if (forward) {
+                        iterOldFormat.next();
+                    } else {
+                        iterOldFormat.prev();
+                    }
+                } else {
+                    if (forward) {
+                        if (comparator.compare(nextOldFormat, nextNewFormat) 
<= 0) {
+                            next = KeyValue.pair(new Bytes(nextOldFormat), 
valueConverter.apply(iterOldFormat.value()));
+                            nextOldFormat = null;
+                            iterOldFormat.next();
+                        } else {
+                            next = KeyValue.pair(new Bytes(nextNewFormat), 
iterNewFormat.value());
+                            nextNewFormat = null;
+                            iterNewFormat.next();
+                        }
+                    } else {
+                        if (comparator.compare(nextOldFormat, nextNewFormat) 
>= 0) {
+                            next = KeyValue.pair(new Bytes(nextOldFormat), 
valueConverter.apply(iterOldFormat.value()));
+                            nextOldFormat = null;
+                            iterOldFormat.prev();
+                        } else {
+                            next = KeyValue.pair(new Bytes(nextNewFormat), 
iterNewFormat.value());
+                            nextNewFormat = null;
+                            iterNewFormat.prev();
+                        }
+                    }
+                }
+            }
+            return next;
+        }
+
+        @Override
+        public synchronized void close() {
+            if (closeCallback == null) {
+                throw new IllegalStateException(
+                        "RocksDBDualCFIterator expects close callback to be 
set immediately upon creation");
+            }
+            closeCallback.run();
+
+            iterOldFormat.close();
+            iterNewFormat.close();
+            open = false;
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            if (!hasNext()) {
+                throw new java.util.NoSuchElementException();
+            }
+            return next.key;
+        }
+
+        @Override
+        public void onClose(final Runnable closeCallback) {
+            this.closeCallback = closeCallback;
+        }
+    }
+
+    private static class RocksDBDualCFRangeIterator extends 
RocksDBDualCFIterator {
+    // RocksDB's JNI interface does not expose getters/setters that allow the
+    // comparator to be pluggable, and the default is lexicographic, so it's
+    // safe to just force lexicographic comparator here for now.
+        private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
+        private final byte[] rawLastKey;
+        private final boolean forward;
+        private final boolean toInclusive;
+
+        RocksDBDualCFRangeIterator(final String storeName,
+                                   final RocksIterator iterNewFormat,
+                                   final RocksIterator iterOldFormat,
+                                   final Bytes from,
+                                   final Bytes to,
+                                   final boolean forward,
+                                   final boolean toInclusive,
+                                   final Function<byte[], byte[]> 
valueConverter) {
+            super(storeName, iterNewFormat, iterOldFormat, forward, 
valueConverter);
+            this.forward = forward;
+            this.toInclusive = toInclusive;
+            if (forward) {
+                if (from == null) {
+                    iterNewFormat.seekToFirst();
+                    iterOldFormat.seekToFirst();
+                } else {
+                    iterNewFormat.seek(from.get());
+                    iterOldFormat.seek(from.get());
+                }
+                rawLastKey = to == null ? null : to.get();
+            } else {
+                if (to == null) {
+                    iterNewFormat.seekToLast();
+                    iterOldFormat.seekToLast();
+                } else {
+                    iterNewFormat.seekForPrev(to.get());
+                    iterOldFormat.seekForPrev(to.get());
+                }
+                rawLastKey = from == null ? null : from.get();
+            }
+        }
+
+        @Override
+        protected KeyValue<Bytes, byte[]> makeNext() {
+            final KeyValue<Bytes, byte[]> next = super.makeNext();
+
+            if (next == null) {
+                return allDone();
+            } else if (rawLastKey == null) {
+                // null means range endpoint is open
+                return next;
+            } else {
+                if (forward) {
+                    if (comparator.compare(next.key.get(), rawLastKey) < 0) {
+                        return next;
+                    } else if (comparator.compare(next.key.get(), rawLastKey) 
== 0) {
+                        return toInclusive ? next : allDone();
+                    } else {
+                        return allDone();
+                    }
+                } else {
+                    if (comparator.compare(next.key.get(), rawLastKey) >= 0) {
+                        return next;
+                    } else {
+                        return allDone();
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 36262cb8e0d..441c6620079 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -16,12 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
@@ -29,23 +23,13 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteBatchInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Optional;
-
-import static 
org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
 
 /**
  * A persistent key-(value-timestamp) store based on RocksDB.
@@ -56,7 +40,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
     private static final byte[] TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME = 
"keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
 
     public RocksDBTimestampedStore(final String name,
-                            final String metricsScope) {
+                                   final String metricsScope) {
         super(name, metricsScope);
     }
 
@@ -70,9 +54,9 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
         final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
-                dbOptions,
-                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
-                new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+            dbOptions,
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
         );
         final ColumnFamilyHandle noTimestampColumnFamily = 
columnFamilies.get(0);
         final ColumnFamilyHandle withTimestampColumnFamily = 
columnFamilies.get(1);
@@ -81,7 +65,12 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
         noTimestampsIter.seekToFirst();
         if (noTimestampsIter.isValid()) {
             log.info("Opening store {} in upgrade mode", name);
-            cfAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
withTimestampColumnFamily);
+            cfAccessor = new DualColumnFamilyAccessor(
+                noTimestampColumnFamily,
+                withTimestampColumnFamily,
+                TimestampedBytesStore::convertToTimestampedFormat,
+                this
+            );
         } else {
             log.info("Opening store {} in regular mode", name);
             cfAccessor = new 
SingleColumnFamilyAccessor(withTimestampColumnFamily);
@@ -90,389 +79,4 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
         noTimestampsIter.close();
     }
 
-    private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
-        private final ColumnFamilyHandle oldColumnFamily;
-        private final ColumnFamilyHandle newColumnFamily;
-
-        private DualColumnFamilyAccessor(final ColumnFamilyHandle 
oldColumnFamily,
-                                         final ColumnFamilyHandle 
newColumnFamily) {
-            this.oldColumnFamily = oldColumnFamily;
-            this.newColumnFamily = newColumnFamily;
-        }
-
-        @Override
-        public void put(final DBAccessor accessor,
-                        final byte[] key,
-                        final byte[] valueWithTimestamp) {
-            synchronized (position) {
-                if (valueWithTimestamp == null) {
-                    try {
-                        accessor.delete(oldColumnFamily, key);
-                    } catch (final RocksDBException e) {
-                        // String format is happening in wrapping stores. So 
formatted message is thrown from wrapping stores.
-                        throw new ProcessorStateException("Error while 
removing key from store " + name, e);
-                    }
-                    try {
-                        accessor.delete(newColumnFamily, key);
-                    } catch (final RocksDBException e) {
-                        // String format is happening in wrapping stores. So 
formatted message is thrown from wrapping stores.
-                        throw new ProcessorStateException("Error while 
removing key from store " + name, e);
-                    }
-                } else {
-                    try {
-                        accessor.delete(oldColumnFamily, key);
-                    } catch (final RocksDBException e) {
-                        // String format is happening in wrapping stores. So 
formatted message is thrown from wrapping stores.
-                        throw new ProcessorStateException("Error while 
removing key from store " + name, e);
-                    }
-                    try {
-                        accessor.put(newColumnFamily, key, valueWithTimestamp);
-                        StoreQueryUtils.updatePosition(position, context);
-                    } catch (final RocksDBException e) {
-                        // String format is happening in wrapping stores. So 
formatted message is thrown from wrapping stores.
-                        throw new ProcessorStateException("Error while putting 
key/value into store " + name, e);
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
-                                 final WriteBatchInterface batch) throws 
RocksDBException {
-            for (final KeyValue<Bytes, byte[]> entry : entries) {
-                Objects.requireNonNull(entry.key, "key cannot be null");
-                addToBatch(entry.key.get(), entry.value, batch);
-            }
-        }
-
-        @Override
-        public byte[] get(final DBAccessor accessor, final byte[] key) throws 
RocksDBException {
-            return get(accessor, key, Optional.empty());
-        }
-
-        @Override
-        public byte[] get(final DBAccessor accessor, final byte[] key, final 
ReadOptions readOptions) throws RocksDBException {
-            return get(accessor, key, Optional.of(readOptions));
-        }
-
-        private byte[] get(final DBAccessor accessor, final byte[] key, final 
Optional<ReadOptions> readOptions) throws RocksDBException {
-            final byte[] valueWithTimestamp = readOptions.isPresent() ? 
accessor.get(newColumnFamily, readOptions.get(), key) : 
accessor.get(newColumnFamily, key);
-            if (valueWithTimestamp != null) {
-                return valueWithTimestamp;
-            }
-
-            final byte[] plainValue = readOptions.isPresent() ? 
accessor.get(oldColumnFamily, readOptions.get(), key) : 
accessor.get(oldColumnFamily, key);
-            if (plainValue != null) {
-                final byte[] valueWithUnknownTimestamp = 
convertToTimestampedFormat(plainValue);
-                // this does only work, because the changelog topic contains 
correct data already
-                // for other format changes, we cannot take this short cut and 
can only migrate data
-                // from old to new store on put()
-                put(accessor, key, valueWithUnknownTimestamp);
-                return valueWithUnknownTimestamp;
-            }
-
-            return null;
-        }
-
-        @Override
-        public byte[] getOnly(final DBAccessor accessor, final byte[] key) 
throws RocksDBException {
-            final byte[] valueWithTimestamp = accessor.get(newColumnFamily, 
key);
-            if (valueWithTimestamp != null) {
-                return valueWithTimestamp;
-            }
-
-            final byte[] plainValue = accessor.get(oldColumnFamily, key);
-            if (plainValue != null) {
-                return convertToTimestampedFormat(plainValue);
-            }
-
-            return null;
-        }
-
-        @Override
-        public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor 
accessor,
-                                                            final Bytes from,
-                                                            final Bytes to,
-                                                            final boolean 
forward) {
-            return new RocksDBDualCFRangeIterator(
-                name,
-                accessor.newIterator(newColumnFamily),
-                accessor.newIterator(oldColumnFamily),
-                from,
-                to,
-                forward,
-                true);
-        }
-
-        @Override
-        public void deleteRange(final DBAccessor accessor, final byte[] from, 
final byte[] to) {
-            try {
-                accessor.deleteRange(oldColumnFamily, from, to);
-            } catch (final RocksDBException e) {
-                // String format is happening in wrapping stores. So formatted 
message is thrown from wrapping stores.
-                throw new ProcessorStateException("Error while removing key 
from store " + name, e);
-            }
-            try {
-                accessor.deleteRange(newColumnFamily, from, to);
-            } catch (final RocksDBException e) {
-                // String format is happening in wrapping stores. So formatted 
message is thrown from wrapping stores.
-                throw new ProcessorStateException("Error while removing key 
from store " + name, e);
-            }
-        }
-
-        @Override
-        public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor 
accessor, final boolean forward) {
-            final RocksIterator innerIterWithTimestamp = 
accessor.newIterator(newColumnFamily);
-            final RocksIterator innerIterNoTimestamp = 
accessor.newIterator(oldColumnFamily);
-            if (forward) {
-                innerIterWithTimestamp.seekToFirst();
-                innerIterNoTimestamp.seekToFirst();
-            } else {
-                innerIterWithTimestamp.seekToLast();
-                innerIterNoTimestamp.seekToLast();
-            }
-            return new RocksDBDualCFIterator(name, innerIterWithTimestamp, 
innerIterNoTimestamp, forward);
-        }
-
-        @Override
-        public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final 
DBAccessor accessor, final Bytes prefix) {
-            final Bytes to = incrementWithoutOverflow(prefix);
-            return new RocksDBDualCFRangeIterator(
-                name,
-                accessor.newIterator(newColumnFamily),
-                accessor.newIterator(oldColumnFamily),
-                prefix,
-                to,
-                true,
-                false
-            );
-        }
-
-        @Override
-        public long approximateNumEntries(final DBAccessor accessor) throws 
RocksDBException {
-            return accessor.approximateNumEntries(oldColumnFamily) +
-                    accessor.approximateNumEntries(newColumnFamily);
-        }
-
-        @Override
-        public void commit(final DBAccessor accessor,
-                           final Map<TopicPartition, Long> changelogOffsets) 
throws RocksDBException {
-            accessor.flush(oldColumnFamily, newColumnFamily);
-        }
-
-        @Override
-        public void addToBatch(final byte[] key,
-                               final byte[] value,
-                               final WriteBatchInterface batch) throws 
RocksDBException {
-            if (value == null) {
-                batch.delete(oldColumnFamily, key);
-                batch.delete(newColumnFamily, key);
-            } else {
-                batch.delete(oldColumnFamily, key);
-                batch.put(newColumnFamily, key, value);
-            }
-        }
-
-        @Override
-        public void close() {
-            oldColumnFamily.close();
-            newColumnFamily.close();
-        }
-    }
-
-    private static class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements ManagedKeyValueIterator<Bytes, byte[]> {
-
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
-        private final String storeName;
-        private final RocksIterator iterWithTimestamp;
-        private final RocksIterator iterNoTimestamp;
-        private final boolean forward;
-
-        private volatile boolean open = true;
-
-        private byte[] nextWithTimestamp;
-        private byte[] nextNoTimestamp;
-        private KeyValue<Bytes, byte[]> next;
-        private Runnable closeCallback = null;
-
-        RocksDBDualCFIterator(final String storeName,
-                              final RocksIterator iterWithTimestamp,
-                              final RocksIterator iterNoTimestamp,
-                              final boolean forward) {
-            this.iterWithTimestamp = iterWithTimestamp;
-            this.iterNoTimestamp = iterNoTimestamp;
-            this.storeName = storeName;
-            this.forward = forward;
-        }
-
-        @Override
-        public synchronized boolean hasNext() {
-            if (!open) {
-                throw new InvalidStateStoreException(String.format("RocksDB 
iterator for store %s has closed", storeName));
-            }
-            return super.hasNext();
-        }
-
-        @Override
-        public synchronized KeyValue<Bytes, byte[]> next() {
-            return super.next();
-        }
-
-        @Override
-        protected KeyValue<Bytes, byte[]> makeNext() {
-            if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
-                nextNoTimestamp = iterNoTimestamp.key();
-            }
-
-            if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
-                nextWithTimestamp = iterWithTimestamp.key();
-            }
-
-            if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
-                if (nextWithTimestamp == null && !iterWithTimestamp.isValid()) 
{
-                    return allDone();
-                } else {
-                    next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
-                    nextWithTimestamp = null;
-                    if (forward) {
-                        iterWithTimestamp.next();
-                    } else {
-                        iterWithTimestamp.prev();
-                    }
-                }
-            } else {
-                if (nextWithTimestamp == null) {
-                    next = KeyValue.pair(new Bytes(nextNoTimestamp), 
convertToTimestampedFormat(iterNoTimestamp.value()));
-                    nextNoTimestamp = null;
-                    if (forward) {
-                        iterNoTimestamp.next();
-                    } else {
-                        iterNoTimestamp.prev();
-                    }
-                } else {
-                    if (forward) {
-                        if (comparator.compare(nextNoTimestamp, 
nextWithTimestamp) <= 0) {
-                            next = KeyValue.pair(new Bytes(nextNoTimestamp), 
convertToTimestampedFormat(iterNoTimestamp.value()));
-                            nextNoTimestamp = null;
-                            iterNoTimestamp.next();
-                        } else {
-                            next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
-                            nextWithTimestamp = null;
-                            iterWithTimestamp.next();
-                        }
-                    } else {
-                        if (comparator.compare(nextNoTimestamp, 
nextWithTimestamp) >= 0) {
-                            next = KeyValue.pair(new Bytes(nextNoTimestamp), 
convertToTimestampedFormat(iterNoTimestamp.value()));
-                            nextNoTimestamp = null;
-                            iterNoTimestamp.prev();
-                        } else {
-                            next = KeyValue.pair(new Bytes(nextWithTimestamp), 
iterWithTimestamp.value());
-                            nextWithTimestamp = null;
-                            iterWithTimestamp.prev();
-                        }
-                    }
-                }
-            }
-            return next;
-        }
-
-        @Override
-        public synchronized void close() {
-            if (closeCallback == null) {
-                throw new IllegalStateException("RocksDBDualCFIterator expects 
close callback to be set immediately upon creation");
-            }
-            closeCallback.run();
-
-            iterNoTimestamp.close();
-            iterWithTimestamp.close();
-            open = false;
-        }
-
-        @Override
-        public Bytes peekNextKey() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            return next.key;
-        }
-
-        @Override
-        public void onClose(final Runnable closeCallback) {
-            this.closeCallback = closeCallback;
-        }
-    }
-
-    private static class RocksDBDualCFRangeIterator extends 
RocksDBDualCFIterator {
-        // RocksDB's JNI interface does not expose getters/setters that allow 
the
-        // comparator to be pluggable, and the default is lexicographic, so 
it's
-        // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-        private final byte[] rawLastKey;
-        private final boolean forward;
-        private final boolean toInclusive;
-
-        RocksDBDualCFRangeIterator(final String storeName,
-                                   final RocksIterator iterWithTimestamp,
-                                   final RocksIterator iterNoTimestamp,
-                                   final Bytes from,
-                                   final Bytes to,
-                                   final boolean forward,
-                                   final boolean toInclusive) {
-            super(storeName, iterWithTimestamp, iterNoTimestamp, forward);
-            this.forward = forward;
-            this.toInclusive = toInclusive;
-            if (forward) {
-                if (from == null) {
-                    iterWithTimestamp.seekToFirst();
-                    iterNoTimestamp.seekToFirst();
-                } else {
-                    iterWithTimestamp.seek(from.get());
-                    iterNoTimestamp.seek(from.get());
-                }
-                rawLastKey = to == null ? null : to.get();
-            } else {
-                if (to == null) {
-                    iterWithTimestamp.seekToLast();
-                    iterNoTimestamp.seekToLast();
-                } else {
-                    iterWithTimestamp.seekForPrev(to.get());
-                    iterNoTimestamp.seekForPrev(to.get());
-                }
-                rawLastKey = from == null ? null : from.get();
-            }
-        }
-
-        @Override
-        protected KeyValue<Bytes, byte[]> makeNext() {
-            final KeyValue<Bytes, byte[]> next = super.makeNext();
-
-            if (next == null) {
-                return allDone();
-            } else if (rawLastKey == null) {
-                //null means range endpoint is open
-                return next;
-            } else {
-                if (forward) {
-                    if (comparator.compare(next.key.get(), rawLastKey) < 0) {
-                        return next;
-                    } else if (comparator.compare(next.key.get(), rawLastKey) 
== 0) {
-                        return toInclusive ? next : allDone();
-                    } else {
-                        return allDone();
-                    }
-                } else {
-                    if (comparator.compare(next.key.get(), rawLastKey) >= 0) {
-                        return next;
-                    } else {
-                        return allDone();
-                    }
-                }
-            }
-        }
-    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
new file mode 100644
index 00000000000..6fee5cb5718
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class DualColumnFamilyAccessorTest {
+
+    @Mock
+    private ColumnFamilyHandle oldCF;
+
+    @Mock
+    private ColumnFamilyHandle newCF;
+
+    @Mock
+    private DBAccessor dbAccessor;
+
+    private Function<byte[], byte[]> valueConverter;
+    private DualColumnFamilyAccessor accessor;
+
+    private static final String STORE_NAME = "test-store";
+    private static final byte[] KEY = "key".getBytes();
+    private static final byte[] OLD_VALUE = "old-value".getBytes();
+    private static final byte[] NEW_VALUE = "new-value".getBytes();
+
+    @BeforeEach
+    public void setUp() {
+        // Create a real Position object
+        final Position position = Position.emptyPosition();
+
+        // Create a mock store with real position field
+        final RocksDBStore store = mock(RocksDBStore.class);
+        store.position = position;
+        store.context = mock(StateStoreContext.class);
+        lenient().when(store.name()).thenReturn(STORE_NAME);
+
+        // Value converter that adds a prefix to indicate conversion
+        valueConverter = oldValue -> {
+            if (oldValue == null) {
+                return null;
+            }
+            return ByteBuffer.allocate(oldValue.length + 
10).put("converted:".getBytes()).put(oldValue).array();
+        };
+
+        accessor = new DualColumnFamilyAccessor(oldCF, newCF, valueConverter, 
store);
+    }
+
+    @Test
+    public void shouldPutValueToNewColumnFamilyAndDeleteFromOld() throws 
RocksDBException {
+        accessor.put(dbAccessor, KEY, NEW_VALUE);
+
+        verify(dbAccessor).delete(oldCF, KEY);
+        verify(dbAccessor).put(newCF, KEY, NEW_VALUE);
+        verify(dbAccessor, never()).delete(newCF, KEY);
+    }
+
+    @Test
+    public void shouldDeleteFromBothColumnFamiliesWhenValueIsNull() throws 
RocksDBException {
+        accessor.put(dbAccessor, KEY, null);
+
+        verify(dbAccessor).delete(oldCF, KEY);
+        verify(dbAccessor).delete(newCF, KEY);
+        verify(dbAccessor, never()).put(any(), any(), any());
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionWhenPutFailsOnDeleteColumnFamily() throws 
RocksDBException {
+        doThrow(new RocksDBException("Delete 
failed")).when(dbAccessor).delete(oldCF, KEY);
+
+        ProcessorStateException exception = 
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY, 
NEW_VALUE));
+
+        assertEquals("Error while removing key from store " + STORE_NAME, 
exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionWhenPutFailsOnNewColumnFamily() throws 
RocksDBException {
+        doThrow(new RocksDBException("Put 
failed")).when(dbAccessor).put(newCF, KEY, NEW_VALUE);
+
+        ProcessorStateException exception = 
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY, 
NEW_VALUE));
+
+        assertEquals("Error while putting key/value into store " + STORE_NAME, 
exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionWhenDeleteFailsOnOldColumnFamilyWithNullValue()
 throws RocksDBException {
+        doThrow(new RocksDBException("Delete 
failed")).when(dbAccessor).delete(eq(oldCF), any(byte[].class));
+
+        ProcessorStateException exception = 
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY, 
null));
+
+        assertEquals("Error while removing key from store " + STORE_NAME, 
exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionWhenDeleteFailsOnNewColumnFamilyWithNullValue()
 throws RocksDBException {
+        lenient().doNothing().when(dbAccessor).delete(eq(oldCF), 
any(byte[].class));
+        doThrow(new RocksDBException("Delete 
failed")).when(dbAccessor).delete(eq(newCF), any(byte[].class));
+
+        final ProcessorStateException exception = 
assertThrows(ProcessorStateException.class, () -> accessor.put(dbAccessor, KEY, 
null));
+
+        assertEquals("Error while removing key from store " + STORE_NAME, 
exception.getMessage());
+        verify(dbAccessor).delete(oldCF, KEY); // Should have tried to delete 
from old first
+    }
+
+    @Test
+    public void shouldGetValueFromNewColumnFamily() throws RocksDBException {
+        when(dbAccessor.get(newCF, KEY)).thenReturn(NEW_VALUE);
+
+        final byte[] result = accessor.get(dbAccessor, KEY);
+
+        assertArrayEquals(NEW_VALUE, result);
+        verify(dbAccessor).get(newCF, KEY);
+        verify(dbAccessor, never()).get(oldCF, KEY);
+    }
+
+    @Test
+    public void shouldGetValueFromOldColumnFamilyAndConvert() throws 
RocksDBException {
+        when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+        when(dbAccessor.get(oldCF, KEY)).thenReturn(OLD_VALUE);
+
+        final byte[] result = accessor.get(dbAccessor, KEY);
+
+        assertNotNull(result);
+        verify(dbAccessor).get(newCF, KEY);
+        verify(dbAccessor).get(oldCF, KEY);
+        // Should be converted value
+        assertArrayEquals(valueConverter.apply(OLD_VALUE), result);
+        // Should migrate the value to new CF
+        verify(dbAccessor).delete(oldCF, KEY);
+        verify(dbAccessor).put(eq(newCF), eq(KEY), any());
+    }
+
+    @Test
+    public void shouldReturnNullWhenKeyNotFoundInEitherColumnFamily() throws 
RocksDBException {
+        when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+        when(dbAccessor.get(oldCF, KEY)).thenReturn(null);
+
+        final byte[] result = accessor.get(dbAccessor, KEY);
+
+        assertNull(result);
+        verify(dbAccessor).get(newCF, KEY);
+        verify(dbAccessor).get(oldCF, KEY);
+    }
+
+    @Test
+    public void shouldGetValueWithReadOptions() throws RocksDBException {
+        ReadOptions readOptions = mock(ReadOptions.class);
+        when(dbAccessor.get(newCF, readOptions, KEY)).thenReturn(NEW_VALUE);
+
+        final byte[] result = accessor.get(dbAccessor, KEY, readOptions);
+
+        assertArrayEquals(NEW_VALUE, result);
+        verify(dbAccessor).get(newCF, readOptions, KEY);
+        verify(dbAccessor, never()).get(eq(oldCF), eq(readOptions), eq(KEY));
+    }
+
+    @Test
+    public void shouldGetFromOldColumnFamilyWithReadOptionsAndConvert() throws 
RocksDBException {
+        ReadOptions readOptions = mock(ReadOptions.class);
+        when(dbAccessor.get(newCF, readOptions, KEY)).thenReturn(null);
+        when(dbAccessor.get(oldCF, readOptions, KEY)).thenReturn(OLD_VALUE);
+
+        final byte[] result = accessor.get(dbAccessor, KEY, readOptions);
+
+        assertNotNull(result);
+        verify(dbAccessor).get(newCF, readOptions, KEY);
+        verify(dbAccessor).get(oldCF, readOptions, KEY);
+        // Should be converted value
+        assertArrayEquals(valueConverter.apply(OLD_VALUE), result);
+        // Should migrate the value to new CF
+        verify(dbAccessor).delete(oldCF, KEY);
+        verify(dbAccessor).put(eq(newCF), eq(KEY), any());
+    }
+
+    @Test
+    public void shouldGetOnlyFromOldColumnFamilyAndConvertWithoutMigration() 
throws RocksDBException {
+        when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+        when(dbAccessor.get(oldCF, KEY)).thenReturn(OLD_VALUE);
+
+        final byte[] result = accessor.getOnly(dbAccessor, KEY);
+
+        assertNotNull(result);
+        verify(dbAccessor).get(newCF, KEY);
+        verify(dbAccessor).get(oldCF, KEY);
+        // Should be converted value
+        assertArrayEquals(valueConverter.apply(OLD_VALUE), result);
+        // getOnly should NOT migrate (no put/delete calls)
+        verify(dbAccessor, never()).put(any(), any(), any());
+        verify(dbAccessor, never()).delete(any(), any());
+    }
+
+    @Test
+    public void shouldGetOnlyReturnNullWhenNotFound() throws RocksDBException {
+        when(dbAccessor.get(newCF, KEY)).thenReturn(null);
+        when(dbAccessor.get(oldCF, KEY)).thenReturn(null);
+
+        final byte[] result = accessor.getOnly(dbAccessor, KEY);
+
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldPrepareBatchWithMultipleEntries() throws 
RocksDBException {
+        final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(new Bytes("key1".getBytes()), 
"value1".getBytes()));
+        entries.add(new KeyValue<>(new Bytes("key2".getBytes()), 
"value2".getBytes()));
+        entries.add(new KeyValue<>(new Bytes("key3".getBytes()), null)); // 
Delete
+
+        accessor.prepareBatch(entries, batch);
+
+        verify(batch).delete(oldCF, "key1".getBytes());
+        verify(batch).put(newCF, "key1".getBytes(), "value1".getBytes());
+        verify(batch).delete(oldCF, "key2".getBytes());
+        verify(batch).put(newCF, "key2".getBytes(), "value2".getBytes());
+        verify(batch).delete(oldCF, "key3".getBytes());
+        verify(batch).delete(newCF, "key3".getBytes());
+    }
+
+    @Test
+    public void shouldThrowNPEWhenBatchEntryHasNullKey() {
+        final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(null, "value".getBytes()));
+
+        assertThrows(NullPointerException.class, () -> 
accessor.prepareBatch(entries, batch));
+    }
+
+    @Test
+    public void shouldAddToBatchDeletesFromOldAndPutsToNew() throws 
RocksDBException {
+        final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+
+        accessor.addToBatch(KEY, NEW_VALUE, batch);
+
+        verify(batch).delete(oldCF, KEY);
+        verify(batch).put(newCF, KEY, NEW_VALUE);
+    }
+
+    @Test
+    public void shouldAddToBatchDeletesFromBothWhenValueIsNull() throws 
RocksDBException {
+        final WriteBatchInterface batch = mock(WriteBatchInterface.class);
+
+        accessor.addToBatch(KEY, null, batch);
+
+        verify(batch).delete(oldCF, KEY);
+        verify(batch).delete(newCF, KEY);
+        verify(batch, never()).put(any(ColumnFamilyHandle.class), 
any(byte[].class), any(byte[].class));
+    }
+
+    @Test
+    public void shouldDeleteRangeFromBothColumnFamilies() throws 
RocksDBException {
+        final byte[] from = "a".getBytes();
+        final byte[] to = "z".getBytes();
+
+        accessor.deleteRange(dbAccessor, from, to);
+
+        verify(dbAccessor).deleteRange(oldCF, from, to);
+        verify(dbAccessor).deleteRange(newCF, from, to);
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionWhenDeleteRangeFailsOnOldColumnFamily() 
throws RocksDBException {
+        final byte[] from = "a".getBytes();
+        final byte[] to = "z".getBytes();
+
+        doThrow(new RocksDBException("Delete range 
failed")).when(dbAccessor).deleteRange(eq(oldCF), any(byte[].class), 
any(byte[].class));
+
+        ProcessorStateException exception = 
assertThrows(ProcessorStateException.class, () -> 
accessor.deleteRange(dbAccessor, from, to));
+
+        assertEquals("Error while removing key from store " + STORE_NAME, 
exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionWhenDeleteRangeFailsOnNewColumnFamily() 
throws RocksDBException {
+        final byte[] from = "a".getBytes();
+        final byte[] to = "z".getBytes();
+
+        lenient().doNothing().when(dbAccessor).deleteRange(eq(oldCF), 
any(byte[].class), any(byte[].class));
+        doThrow(new RocksDBException("Delete range 
failed")).when(dbAccessor).deleteRange(eq(newCF), any(byte[].class), 
any(byte[].class));
+
+        ProcessorStateException exception = 
assertThrows(ProcessorStateException.class, () -> 
accessor.deleteRange(dbAccessor, from, to));
+
+        assertEquals("Error while removing key from store " + STORE_NAME, 
exception.getMessage());
+        verify(dbAccessor).deleteRange(oldCF, from, to);
+    }
+
+    @Test
+    public void shouldReturnSumOfEntriesFromBothColumnFamilies() throws 
RocksDBException {
+        when(dbAccessor.approximateNumEntries(oldCF)).thenReturn(100L);
+        when(dbAccessor.approximateNumEntries(newCF)).thenReturn(50L);
+
+        final long result = accessor.approximateNumEntries(dbAccessor);
+
+        assertEquals(150L, result);
+    }
+
+    @Test
+    public void shouldFlushBothColumnFamiliesOnCommit() throws 
RocksDBException {
+        final Map<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("topic", 0), 100L);
+
+        accessor.commit(dbAccessor, offsets);
+
+        verify(dbAccessor).flush(oldCF, newCF);
+    }
+
+    @Test
+    public void shouldCreateRangeIterator() {
+        final RocksIterator iterNewFormat = mock(RocksIterator.class);
+        final RocksIterator oldIterFormat = mock(RocksIterator.class);
+        when(dbAccessor.newIterator(newCF)).thenReturn(iterNewFormat);
+        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+        final Bytes from = new Bytes("a".getBytes());
+        final Bytes to = new Bytes("z".getBytes());
+
+        final ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.range(dbAccessor, from, to, true);
+
+        assertNotNull(iterator);
+        verify(dbAccessor).newIterator(newCF);
+        verify(dbAccessor).newIterator(oldCF);
+    }
+
+    @Test
+    public void shouldCreateAllIteratorForward() {
+        final RocksIterator newIterFormat = mock(RocksIterator.class);
+        final RocksIterator oldIterFormat = mock(RocksIterator.class);
+        when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
+        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+        ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.all(dbAccessor, true);
+
+        assertNotNull(iterator);
+        verify(oldIterFormat).seekToFirst();
+        verify(oldIterFormat).seekToFirst();
+    }
+
+    @Test
+    public void shouldCreateAllIteratorReverse() {
+        final RocksIterator newIterFormat = mock(RocksIterator.class);
+        final RocksIterator oldIterFormat = mock(RocksIterator.class);
+        when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
+        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+        final ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.all(dbAccessor, false);
+
+        assertNotNull(iterator);
+        verify(newIterFormat).seekToLast();
+        verify(oldIterFormat).seekToLast();
+    }
+
+    @Test
+    public void shouldCreatePrefixScanIterator() {
+        final RocksIterator newIterFormat = mock(RocksIterator.class);
+        final RocksIterator oldIterFormat = mock(RocksIterator.class);
+        when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
+        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+
+        final Bytes prefix = new Bytes("prefix".getBytes());
+
+        final ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.prefixScan(dbAccessor, prefix);
+
+        assertNotNull(iterator);
+        verify(dbAccessor).newIterator(newCF);
+        verify(dbAccessor).newIterator(oldCF);
+    }
+
+    @Test
+    public void shouldCloseBothColumnFamilies() {
+        accessor.close();
+
+        verify(oldCF).close();
+        verify(newCF).close();
+    }
+}
\ No newline at end of file

Reply via email to