This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6c98544a964 KAFKA-14491: [2/N] Refactor RocksDB store open iterator 
management (#13142)
6c98544a964 is described below

commit 6c98544a964b40ede6bbe1b3440f8e5db96a4ad6
Author: Victoria Xia <[email protected]>
AuthorDate: Tue Jan 31 00:05:43 2023 -0800

    KAFKA-14491: [2/N] Refactor RocksDB store open iterator management (#13142)
    
    This PR refactors how the list of open iterators for RocksDB stores is 
managed. Prior to this PR, the `openIterators` list was passed into the 
constructor for the iterators themselves, allowing `RocksDbIterator.close()` to 
remove the iterator from the `openIterators` list. After this PR, the iterators 
themselves will not know about lists of open iterators. Instead, a generic 
close callback is exposed, and it's the responsibility of the store that 
creates a new iterator to set the call [...]
    
    This refactor is desirable because it enables more flexible iterator 
lifecycle management. Building on top of this, RocksDBStore is updated with an 
option to allow the user (i.e., the caller of methods such as `range()` and 
`prefixScan()` which return iterators) to pass a custom `openIterators` list 
for the new iterator to be stored in. This will allow for a new Segments 
implementation where multiple Segments can share the same RocksDBStore 
instance, while having each Segment manage i [...]
    
    Part of KIP-889.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../state/internals/ManagedKeyValueIterator.java   |  31 ++++++
 .../state/internals/RocksDBRangeIterator.java      |   5 +-
 .../streams/state/internals/RocksDBStore.java      | 117 +++++++++++++++++----
 .../state/internals/RocksDBTimestampedStore.java   |  21 ++--
 .../streams/state/internals/RocksDbIterator.java   |  19 ++--
 .../state/internals/RocksDBRangeIteratorTest.java  |  32 +++---
 .../streams/state/internals/RocksDBStoreTest.java  |  67 ++++++++++++
 7 files changed, 242 insertions(+), 50 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ManagedKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ManagedKeyValueIterator.java
new file mode 100644
index 00000000000..25181482c90
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ManagedKeyValueIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+/**
+ * A {@link KeyValueIterator} with life-cycle management.
+ */
+public interface ManagedKeyValueIterator<K, V> extends KeyValueIterator<K, V> {
+
+    /**
+     * Sets a close callback to be called when the iterator is closed.
+     */
+    void onClose(Runnable closeCallback);
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
index 21e22012618..f16ed084423 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
@@ -18,11 +18,9 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.rocksdb.RocksIterator;
 
 import java.util.Comparator;
-import java.util.Set;
 
 class RocksDBRangeIterator extends RocksDbIterator {
     // RocksDB's JNI interface does not expose getters/setters that allow the
@@ -35,12 +33,11 @@ class RocksDBRangeIterator extends RocksDbIterator {
 
     RocksDBRangeIterator(final String storeName,
                          final RocksIterator iter,
-                         final Set<KeyValueIterator<Bytes, byte[]>> 
openIterators,
                          final Bytes from,
                          final Bytes to,
                          final boolean forward,
                          final boolean toInclusive) {
-        super(storeName, iter, openIterators, forward);
+        super(storeName, iter, forward);
         this.forward = forward;
         this.toInclusive = toInclusive;
         if (forward) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index edba07acb8c..c796940992b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -114,6 +114,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     private boolean userSpecifiedStatistics = false;
 
     private final RocksDBMetricsRecorder metricsRecorder;
+    // if true, then open iterators (for range, prefix scan, and other 
operations) will be
+    // managed automatically (by this store instance). if false, then these 
iterators must be
+    // managed elsewhere (by the caller of those methods).
+    private final boolean autoManagedIterators;
 
     protected volatile boolean open = false;
     protected StateStoreContext context;
@@ -129,9 +133,17 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     RocksDBStore(final String name,
                  final String parentDir,
                  final RocksDBMetricsRecorder metricsRecorder) {
+        this(name, parentDir, metricsRecorder, true);
+    }
+
+    RocksDBStore(final String name,
+                 final String parentDir,
+                 final RocksDBMetricsRecorder metricsRecorder,
+                 final boolean autoManagedIterators) {
         this.name = name;
         this.parentDir = parentDir;
         this.metricsRecorder = metricsRecorder;
+        this.autoManagedIterators = autoManagedIterators;
     }
 
     @Deprecated
@@ -351,13 +363,32 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
                                                                                
     final PS prefixKeySerializer) {
+        if (!autoManagedIterators) {
+            throw new IllegalStateException("Must specify openIterators in 
call to prefixScan()");
+        }
+        return doPrefixScan(prefix, prefixKeySerializer, openIterators);
+    }
+
+    <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                             
final PS prefixKeySerializer,
+                                                                             
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+        if (autoManagedIterators) {
+            throw new IllegalStateException("Cannot specify openIterators when 
using auto-managed iterators");
+        }
+        return doPrefixScan(prefix, prefixKeySerializer, openIterators);
+    }
+
+    <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
doPrefixScan(final P prefix,
+                                                                               
final PS prefixKeySerializer,
+                                                                               
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
         validateStoreOpen();
         Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
         final Bytes prefixBytes = 
Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
 
-        final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = 
dbAccessor.prefixScan(prefixBytes);
+        final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator 
= dbAccessor.prefixScan(prefixBytes);
         openIterators.add(rocksDbPrefixSeekIterator);
+        rocksDbPrefixSeekIterator.onClose(() -> 
openIterators.remove(rocksDbPrefixSeekIterator));
 
         return rocksDbPrefixSeekIterator;
     }
@@ -400,18 +431,43 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     @Override
     public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                               final Bytes to) {
-        return range(from, to, true);
+        if (!autoManagedIterators) {
+            throw new IllegalStateException("Must specify openIterators in 
call to range()");
+        }
+        return range(from, to, true, openIterators);
+    }
+
+    synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                       final Bytes to,
+                                                       final 
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+        if (autoManagedIterators) {
+            throw new IllegalStateException("Cannot specify openIterators when 
using auto-managed iterators");
+        }
+        return range(from, to, true, openIterators);
     }
 
     @Override
     public synchronized KeyValueIterator<Bytes, byte[]> reverseRange(final 
Bytes from,
                                                                      final 
Bytes to) {
-        return range(from, to, false);
+        if (!autoManagedIterators) {
+            throw new IllegalStateException("Must specify openIterators in 
call to reverseRange()");
+        }
+        return range(from, to, false, openIterators);
     }
 
-    KeyValueIterator<Bytes, byte[]> range(final Bytes from,
-                                          final Bytes to,
-                                          final boolean forward) {
+    synchronized KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+                                                              final Bytes to,
+                                                              final 
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+        if (autoManagedIterators) {
+            throw new IllegalStateException("Cannot specify openIterators when 
using auto-managed iterators");
+        }
+        return range(from, to, false, openIterators);
+    }
+
+    private KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                  final Bytes to,
+                                                  final boolean forward,
+                                                  final 
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
         if (Objects.nonNull(from) && Objects.nonNull(to) && from.compareTo(to) 
> 0) {
             log.warn("Returning empty iterator for fetch with invalid key 
range: from > to. "
                     + "This may be due to range arguments set in the wrong 
order, " +
@@ -422,26 +478,49 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
 
         validateStoreOpen();
 
-        final KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = 
dbAccessor.range(from, to, forward);
+        final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = 
dbAccessor.range(from, to, forward);
         openIterators.add(rocksDBRangeIterator);
+        rocksDBRangeIterator.onClose(() -> 
openIterators.remove(rocksDBRangeIterator));
 
         return rocksDBRangeIterator;
     }
 
     @Override
     public synchronized KeyValueIterator<Bytes, byte[]> all() {
-        return all(true);
+        if (!autoManagedIterators) {
+            throw new IllegalStateException("Must specify openIterators in 
call to all()");
+        }
+        return all(true, openIterators);
+    }
+
+    synchronized KeyValueIterator<Bytes, byte[]> all(final 
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+        if (autoManagedIterators) {
+            throw new IllegalStateException("Cannot specify openIterators when 
using auto-managed iterators");
+        }
+        return all(true, openIterators);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> reverseAll() {
-        return all(false);
+        if (!autoManagedIterators) {
+            throw new IllegalStateException("Must specify openIterators in 
call to reverseAll()");
+        }
+        return all(false, openIterators);
+    }
+
+    KeyValueIterator<Bytes, byte[]> reverseAll(final 
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+        if (autoManagedIterators) {
+            throw new IllegalStateException("Cannot specify openIterators when 
using auto-managed iterators");
+        }
+        return all(false, openIterators);
     }
 
-    KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
+    private KeyValueIterator<Bytes, byte[]> all(final boolean forward,
+                                                final 
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
         validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> rocksDbIterator = 
dbAccessor.all(forward);
+        final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = 
dbAccessor.all(forward);
         openIterators.add(rocksDbIterator);
+        rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
         return rocksDbIterator;
     }
 
@@ -569,7 +648,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
          */
         byte[] getOnly(final byte[] key) throws RocksDBException;
 
-        KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+        ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                               final Bytes to,
                                               final boolean forward);
 
@@ -579,9 +658,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         void deleteRange(final byte[] from,
                          final byte[] to);
 
-        KeyValueIterator<Bytes, byte[]> all(final boolean forward);
+        ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward);
 
-        KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
+        ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
 
         long approximateNumEntries() throws RocksDBException;
 
@@ -641,13 +720,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+        public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                      final Bytes to,
                                                      final boolean forward) {
             return new RocksDBRangeIterator(
                     name,
                     db.newIterator(columnFamily),
-                    openIterators,
                     from,
                     to,
                     forward,
@@ -666,23 +744,22 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
+        public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean 
forward) {
             final RocksIterator innerIterWithTimestamp = 
db.newIterator(columnFamily);
             if (forward) {
                 innerIterWithTimestamp.seekToFirst();
             } else {
                 innerIterWithTimestamp.seekToLast();
             }
-            return new RocksDbIterator(name, innerIterWithTimestamp, 
openIterators, forward);
+            return new RocksDbIterator(name, innerIterWithTimestamp, forward);
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
+        public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes 
prefix) {
             final Bytes to = Bytes.increment(prefix);
             return new RocksDBRangeIterator(
                     name,
                     db.newIterator(columnFamily),
-                    openIterators,
                     prefix,
                     to,
                     true,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 0fd81ee0e8c..be971feefd2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -192,7 +191,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+        public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                      final Bytes to,
                                                      final boolean forward) {
             return new RocksDBDualCFRangeIterator(
@@ -222,7 +221,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
+        public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean 
forward) {
             final RocksIterator innerIterWithTimestamp = 
db.newIterator(newColumnFamily);
             final RocksIterator innerIterNoTimestamp = 
db.newIterator(oldColumnFamily);
             if (forward) {
@@ -236,7 +235,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
         }
 
         @Override
-        public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
+        public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes 
prefix) {
             final Bytes to = Bytes.increment(prefix);
             return new RocksDBDualCFRangeIterator(
                 name,
@@ -282,7 +281,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
     }
 
     private class RocksDBDualCFIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>>
-        implements KeyValueIterator<Bytes, byte[]> {
+        implements ManagedKeyValueIterator<Bytes, byte[]> {
 
         // RocksDB's JNI interface does not expose getters/setters that allow 
the
         // comparator to be pluggable, and the default is lexicographic, so 
it's
@@ -299,6 +298,7 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
         private byte[] nextWithTimestamp;
         private byte[] nextNoTimestamp;
         private KeyValue<Bytes, byte[]> next;
+        private Runnable closeCallback = null;
 
         RocksDBDualCFIterator(final String storeName,
                               final RocksIterator iterWithTimestamp,
@@ -383,7 +383,11 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
 
         @Override
         public synchronized void close() {
-            openIterators.remove(this);
+            if (closeCallback == null) {
+                throw new IllegalStateException("RocksDBDualCFIterator expects 
close callback to be set immediately upon creation");
+            }
+            closeCallback.run();
+
             iterNoTimestamp.close();
             iterWithTimestamp.close();
             open = false;
@@ -396,6 +400,11 @@ public class RocksDBTimestampedStore extends RocksDBStore 
implements Timestamped
             }
             return next.key;
         }
+
+        @Override
+        public void onClose(final Runnable closeCallback) {
+            this.closeCallback = closeCallback;
+        }
     }
 
     private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
index 388195a81b8..d3fe2aa7df2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
@@ -20,31 +20,27 @@ import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.rocksdb.RocksIterator;
 
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.function.Consumer;
 
-class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> 
implements KeyValueIterator<Bytes, byte[]> {
+class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> 
implements ManagedKeyValueIterator<Bytes, byte[]> {
 
     private final String storeName;
     private final RocksIterator iter;
-    private final Set<KeyValueIterator<Bytes, byte[]>> openIterators;
     private final Consumer<RocksIterator> advanceIterator;
 
     private volatile boolean open = true;
 
     private KeyValue<Bytes, byte[]> next;
+    private Runnable closeCallback = null;
 
     RocksDbIterator(final String storeName,
                     final RocksIterator iter,
-                    final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                     final boolean forward) {
         this.storeName = storeName;
         this.iter = iter;
-        this.openIterators = openIterators;
         this.advanceIterator = forward ? RocksIterator::next : 
RocksIterator::prev;
     }
 
@@ -73,7 +69,11 @@ class RocksDbIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>> implemen
 
     @Override
     public synchronized void close() {
-        openIterators.remove(this);
+        if (closeCallback == null) {
+            throw new IllegalStateException("RocksDbIterator expects close 
callback to be set immediately upon creation");
+        }
+        closeCallback.run();
+
         iter.close();
         open = false;
     }
@@ -85,4 +85,9 @@ class RocksDbIterator extends 
AbstractIterator<KeyValue<Bytes, byte[]>> implemen
         }
         return next.key;
     }
+
+    @Override
+    public synchronized void onClose(final Runnable closeCallback) {
+        this.closeCallback = closeCallback;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
index b4c7d79ac7c..16badb50b97 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.Test;
 import org.rocksdb.RocksIterator;
 
-import java.util.Collections;
 import java.util.NoSuchElementException;
 
 import static org.easymock.EasyMock.mock;
@@ -67,7 +67,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key3Bytes,
             true,
@@ -103,7 +102,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key3Bytes,
             false,
@@ -142,7 +140,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             toBytes,
             true,
@@ -183,7 +180,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key4Bytes,
             false,
@@ -212,7 +208,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key2Bytes,
             true,
@@ -237,7 +232,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             fromBytes,
             toBytes,
             false,
@@ -265,7 +259,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key3Bytes,
             true,
@@ -299,7 +292,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key3Bytes,
             toBytes,
             false,
@@ -331,7 +323,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key3Bytes,
             true,
@@ -369,7 +360,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key3Bytes,
             toBytes,
             false,
@@ -398,16 +388,33 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key2Bytes,
             true,
             true
         );
+        rocksDBRangeIterator.onClose(() -> { });
         rocksDBRangeIterator.close();
         verify(rocksIterator);
     }
 
+    @Test
+    public void shouldCallCloseCallbackOnClose() {
+        final RocksIterator rocksIterator = mock(RocksIterator.class);
+        final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
+            storeName,
+            rocksIterator,
+            key1Bytes,
+            key2Bytes,
+            true,
+            true
+        );
+        final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+        rocksDBRangeIterator.onClose(() -> callbackCalled.set(true));
+        rocksDBRangeIterator.close();
+        assertThat(callbackCalled.get(), is(true));
+    }
+
     @Test
     public void shouldExcludeEndOfRange() {
         final RocksIterator rocksIterator = mock(RocksIterator.class);
@@ -425,7 +432,6 @@ public class RocksDBRangeIteratorTest {
         final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(
             storeName,
             rocksIterator,
-            Collections.emptySet(),
             key1Bytes,
             key2Bytes,
             true,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 00b08d68ca1..092f695be76 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -171,6 +171,10 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         return new RocksDBStore(DB_NAME, METRICS_SCOPE, metricsRecorder);
     }
 
+    private RocksDBStore getRocksDBStoreWithCustomManagedIterators() {
+        return new RocksDBStore(DB_NAME, METRICS_SCOPE, metricsRecorder, 
false);
+    }
+
     private InternalMockProcessorContext getProcessorContext(final Properties 
streamsProps) {
         return new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
@@ -527,6 +531,69 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         }
     }
 
+    @Test
+    public void shouldAllowCustomManagedIterators() {
+        rocksDBStore = getRocksDBStoreWithCustomManagedIterators();
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        final Set<KeyValueIterator<Bytes, byte[]>> openIterators = new 
HashSet<>();
+
+        final KeyValueIterator<Bytes, byte[]> prefixScanIterator = 
rocksDBStore.prefixScan("abcd", stringSerializer, openIterators);
+        assertThat(openIterators.size(), is(1));
+        prefixScanIterator.close();
+        assertThat(openIterators.size(), is(0));
+
+        final KeyValueIterator<Bytes, byte[]> rangeIterator = 
rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "1")), 
openIterators);
+        assertThat(openIterators.size(), is(1));
+        rangeIterator.close();
+        assertThat(openIterators.size(), is(0));
+
+        final KeyValueIterator<Bytes, byte[]> reverseRangeIterator = 
rocksDBStore.reverseRange(null, new Bytes(stringSerializer.serialize(null, 
"1")), openIterators);
+        assertThat(openIterators.size(), is(1));
+        reverseRangeIterator.close();
+        assertThat(openIterators.size(), is(0));
+
+        final KeyValueIterator<Bytes, byte[]> allIterator = 
rocksDBStore.all(openIterators);
+        assertThat(openIterators.size(), is(1));
+        allIterator.close();
+        assertThat(openIterators.size(), is(0));
+
+        final KeyValueIterator<Bytes, byte[]> reverseAllIterator = 
rocksDBStore.reverseAll(openIterators);
+        assertThat(openIterators.size(), is(1));
+        reverseAllIterator.close();
+        assertThat(openIterators.size(), is(0));
+    }
+
+    @Test
+    public void shouldRequireOpenIteratorsWhenUsingCustomManagedIterators() {
+        rocksDBStore = getRocksDBStoreWithCustomManagedIterators();
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+
+        assertThrows(IllegalStateException.class,
+            () -> rocksDBStore.prefixScan("abcd", stringSerializer));
+        assertThrows(IllegalStateException.class,
+            () -> rocksDBStore.range(null, new 
Bytes(stringSerializer.serialize(null, "1"))));
+        assertThrows(IllegalStateException.class,
+            () -> rocksDBStore.reverseRange(null, new 
Bytes(stringSerializer.serialize(null, "1"))));
+        assertThrows(IllegalStateException.class, () -> rocksDBStore.all());
+        assertThrows(IllegalStateException.class, () -> 
rocksDBStore.reverseAll());
+    }
+
+    @Test
+    public void shouldNotAllowOpenIteratorsWhenUsingAutoManagedIterators() {
+        rocksDBStore = getRocksDBStore();
+        rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+        final Set<KeyValueIterator<Bytes, byte[]>> openIterators = new 
HashSet<>();
+
+        assertThrows(IllegalStateException.class,
+            () -> rocksDBStore.prefixScan("abcd", stringSerializer, 
openIterators));
+        assertThrows(IllegalStateException.class,
+            () -> rocksDBStore.range(null, new 
Bytes(stringSerializer.serialize(null, "1")), openIterators));
+        assertThrows(IllegalStateException.class,
+            () -> rocksDBStore.reverseRange(null, new 
Bytes(stringSerializer.serialize(null, "1")), openIterators));
+        assertThrows(IllegalStateException.class, () -> 
rocksDBStore.all(openIterators));
+        assertThrows(IllegalStateException.class, () -> 
rocksDBStore.reverseAll(openIterators));
+    }
+
     @Test
     public void shouldReturnUUIDsWithStringPrefix() {
         final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();

Reply via email to