This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6c98544a964 KAFKA-14491: [2/N] Refactor RocksDB store open iterator
management (#13142)
6c98544a964 is described below
commit 6c98544a964b40ede6bbe1b3440f8e5db96a4ad6
Author: Victoria Xia <[email protected]>
AuthorDate: Tue Jan 31 00:05:43 2023 -0800
KAFKA-14491: [2/N] Refactor RocksDB store open iterator management (#13142)
This PR refactors how the list of open iterators for RocksDB stores is
managed. Prior to this PR, the `openIterators` list was passed into the
constructor for the iterators themselves, allowing `RocksDbIterator.close()` to
remove the iterator from the `openIterators` list. After this PR, the iterators
themselves will not know about lists of open iterators. Instead, a generic
close callback is exposed, and it's the responsibility of the store that
creates a new iterator to set the call [...]
This refactor is desirable because it enables more flexible iterator
lifecycle management. Building on top of this, RocksDBStore is updated with an
option to allow the user (i.e., the caller of methods such as `range()` and
`prefixScan()` which return iterators) to pass a custom `openIterators` list
for the new iterator to be stored in. This will allow for a new Segments
implementation where multiple Segments can share the same RocksDBStore
instance, while having each Segment manage i [...]
Part of KIP-889.
Reviewers: Matthias J. Sax <[email protected]>
---
.../state/internals/ManagedKeyValueIterator.java | 31 ++++++
.../state/internals/RocksDBRangeIterator.java | 5 +-
.../streams/state/internals/RocksDBStore.java | 117 +++++++++++++++++----
.../state/internals/RocksDBTimestampedStore.java | 21 ++--
.../streams/state/internals/RocksDbIterator.java | 19 ++--
.../state/internals/RocksDBRangeIteratorTest.java | 32 +++---
.../streams/state/internals/RocksDBStoreTest.java | 67 ++++++++++++
7 files changed, 242 insertions(+), 50 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ManagedKeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ManagedKeyValueIterator.java
new file mode 100644
index 00000000000..25181482c90
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ManagedKeyValueIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+/**
+ * A {@link KeyValueIterator} with life-cycle management.
+ */
+public interface ManagedKeyValueIterator<K, V> extends KeyValueIterator<K, V> {
+
+ /**
+ * Sets a close callback to be called when the iterator is closed.
+ */
+ void onClose(Runnable closeCallback);
+
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
index 21e22012618..f16ed084423 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
@@ -18,11 +18,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksIterator;
import java.util.Comparator;
-import java.util.Set;
class RocksDBRangeIterator extends RocksDbIterator {
// RocksDB's JNI interface does not expose getters/setters that allow the
@@ -35,12 +33,11 @@ class RocksDBRangeIterator extends RocksDbIterator {
RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
- final Set<KeyValueIterator<Bytes, byte[]>>
openIterators,
final Bytes from,
final Bytes to,
final boolean forward,
final boolean toInclusive) {
- super(storeName, iter, openIterators, forward);
+ super(storeName, iter, forward);
this.forward = forward;
this.toInclusive = toInclusive;
if (forward) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index edba07acb8c..c796940992b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -114,6 +114,10 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
private boolean userSpecifiedStatistics = false;
private final RocksDBMetricsRecorder metricsRecorder;
+ // if true, then open iterators (for range, prefix scan, and other
operations) will be
+ // managed automatically (by this store instance). if false, then these
iterators must be
+ // managed elsewhere (by the caller of those methods).
+ private final boolean autoManagedIterators;
protected volatile boolean open = false;
protected StateStoreContext context;
@@ -129,9 +133,17 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
RocksDBStore(final String name,
final String parentDir,
final RocksDBMetricsRecorder metricsRecorder) {
+ this(name, parentDir, metricsRecorder, true);
+ }
+
+ RocksDBStore(final String name,
+ final String parentDir,
+ final RocksDBMetricsRecorder metricsRecorder,
+ final boolean autoManagedIterators) {
this.name = name;
this.parentDir = parentDir;
this.metricsRecorder = metricsRecorder;
+ this.autoManagedIterators = autoManagedIterators;
}
@Deprecated
@@ -351,13 +363,32 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
final PS prefixKeySerializer) {
+ if (!autoManagedIterators) {
+ throw new IllegalStateException("Must specify openIterators in
call to prefixScan()");
+ }
+ return doPrefixScan(prefix, prefixKeySerializer, openIterators);
+ }
+
+ <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
+
final PS prefixKeySerializer,
+
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+ if (autoManagedIterators) {
+ throw new IllegalStateException("Cannot specify openIterators when
using auto-managed iterators");
+ }
+ return doPrefixScan(prefix, prefixKeySerializer, openIterators);
+ }
+
+ <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
doPrefixScan(final P prefix,
+
final PS prefixKeySerializer,
+
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
validateStoreOpen();
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
final Bytes prefixBytes =
Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
- final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator =
dbAccessor.prefixScan(prefixBytes);
+ final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator
= dbAccessor.prefixScan(prefixBytes);
openIterators.add(rocksDbPrefixSeekIterator);
+ rocksDbPrefixSeekIterator.onClose(() ->
openIterators.remove(rocksDbPrefixSeekIterator));
return rocksDbPrefixSeekIterator;
}
@@ -400,18 +431,43 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
@Override
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
- return range(from, to, true);
+ if (!autoManagedIterators) {
+ throw new IllegalStateException("Must specify openIterators in
call to range()");
+ }
+ return range(from, to, true, openIterators);
+ }
+
+ synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to,
+ final
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+ if (autoManagedIterators) {
+ throw new IllegalStateException("Cannot specify openIterators when
using auto-managed iterators");
+ }
+ return range(from, to, true, openIterators);
}
@Override
public synchronized KeyValueIterator<Bytes, byte[]> reverseRange(final
Bytes from,
final
Bytes to) {
- return range(from, to, false);
+ if (!autoManagedIterators) {
+ throw new IllegalStateException("Must specify openIterators in
call to reverseRange()");
+ }
+ return range(from, to, false, openIterators);
}
- KeyValueIterator<Bytes, byte[]> range(final Bytes from,
- final Bytes to,
- final boolean forward) {
+ synchronized KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+ final Bytes to,
+ final
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+ if (autoManagedIterators) {
+ throw new IllegalStateException("Cannot specify openIterators when
using auto-managed iterators");
+ }
+ return range(from, to, false, openIterators);
+ }
+
+ private KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to,
+ final boolean forward,
+ final
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
if (Objects.nonNull(from) && Objects.nonNull(to) && from.compareTo(to)
> 0) {
log.warn("Returning empty iterator for fetch with invalid key
range: from > to. "
+ "This may be due to range arguments set in the wrong
order, " +
@@ -422,26 +478,49 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
validateStoreOpen();
- final KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator =
dbAccessor.range(from, to, forward);
+ final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator =
dbAccessor.range(from, to, forward);
openIterators.add(rocksDBRangeIterator);
+ rocksDBRangeIterator.onClose(() ->
openIterators.remove(rocksDBRangeIterator));
return rocksDBRangeIterator;
}
@Override
public synchronized KeyValueIterator<Bytes, byte[]> all() {
- return all(true);
+ if (!autoManagedIterators) {
+ throw new IllegalStateException("Must specify openIterators in
call to all()");
+ }
+ return all(true, openIterators);
+ }
+
+ synchronized KeyValueIterator<Bytes, byte[]> all(final
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+ if (autoManagedIterators) {
+ throw new IllegalStateException("Cannot specify openIterators when
using auto-managed iterators");
+ }
+ return all(true, openIterators);
}
@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
- return all(false);
+ if (!autoManagedIterators) {
+ throw new IllegalStateException("Must specify openIterators in
call to reverseAll()");
+ }
+ return all(false, openIterators);
+ }
+
+ KeyValueIterator<Bytes, byte[]> reverseAll(final
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
+ if (autoManagedIterators) {
+ throw new IllegalStateException("Cannot specify openIterators when
using auto-managed iterators");
+ }
+ return all(false, openIterators);
}
- KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
+ private KeyValueIterator<Bytes, byte[]> all(final boolean forward,
+ final
Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
validateStoreOpen();
- final KeyValueIterator<Bytes, byte[]> rocksDbIterator =
dbAccessor.all(forward);
+ final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator =
dbAccessor.all(forward);
openIterators.add(rocksDbIterator);
+ rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
return rocksDbIterator;
}
@@ -569,7 +648,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
*/
byte[] getOnly(final byte[] key) throws RocksDBException;
- KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to,
final boolean forward);
@@ -579,9 +658,9 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
void deleteRange(final byte[] from,
final byte[] to);
- KeyValueIterator<Bytes, byte[]> all(final boolean forward);
+ ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward);
- KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
+ ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
long approximateNumEntries() throws RocksDBException;
@@ -641,13 +720,12 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
}
@Override
- public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to,
final boolean forward) {
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
- openIterators,
from,
to,
forward,
@@ -666,23 +744,22 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
}
@Override
- public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
+ public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean
forward) {
final RocksIterator innerIterWithTimestamp =
db.newIterator(columnFamily);
if (forward) {
innerIterWithTimestamp.seekToFirst();
} else {
innerIterWithTimestamp.seekToLast();
}
- return new RocksDbIterator(name, innerIterWithTimestamp,
openIterators, forward);
+ return new RocksDbIterator(name, innerIterWithTimestamp, forward);
}
@Override
- public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
+ public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes
prefix) {
final Bytes to = Bytes.increment(prefix);
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
- openIterators,
prefix,
to,
true,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 0fd81ee0e8c..be971feefd2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -192,7 +191,7 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
}
@Override
- public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to,
final boolean forward) {
return new RocksDBDualCFRangeIterator(
@@ -222,7 +221,7 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
}
@Override
- public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
+ public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean
forward) {
final RocksIterator innerIterWithTimestamp =
db.newIterator(newColumnFamily);
final RocksIterator innerIterNoTimestamp =
db.newIterator(oldColumnFamily);
if (forward) {
@@ -236,7 +235,7 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
}
@Override
- public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
+ public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes
prefix) {
final Bytes to = Bytes.increment(prefix);
return new RocksDBDualCFRangeIterator(
name,
@@ -282,7 +281,7 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
}
private class RocksDBDualCFIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>>
- implements KeyValueIterator<Bytes, byte[]> {
+ implements ManagedKeyValueIterator<Bytes, byte[]> {
// RocksDB's JNI interface does not expose getters/setters that allow
the
// comparator to be pluggable, and the default is lexicographic, so
it's
@@ -299,6 +298,7 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
private byte[] nextWithTimestamp;
private byte[] nextNoTimestamp;
private KeyValue<Bytes, byte[]> next;
+ private Runnable closeCallback = null;
RocksDBDualCFIterator(final String storeName,
final RocksIterator iterWithTimestamp,
@@ -383,7 +383,11 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
@Override
public synchronized void close() {
- openIterators.remove(this);
+ if (closeCallback == null) {
+ throw new IllegalStateException("RocksDBDualCFIterator expects
close callback to be set immediately upon creation");
+ }
+ closeCallback.run();
+
iterNoTimestamp.close();
iterWithTimestamp.close();
open = false;
@@ -396,6 +400,11 @@ public class RocksDBTimestampedStore extends RocksDBStore
implements Timestamped
}
return next.key;
}
+
+ @Override
+ public void onClose(final Runnable closeCallback) {
+ this.closeCallback = closeCallback;
+ }
}
private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
index 388195a81b8..d3fe2aa7df2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
@@ -20,31 +20,27 @@ import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksIterator;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.function.Consumer;
-class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>>
implements KeyValueIterator<Bytes, byte[]> {
+class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>>
implements ManagedKeyValueIterator<Bytes, byte[]> {
private final String storeName;
private final RocksIterator iter;
- private final Set<KeyValueIterator<Bytes, byte[]>> openIterators;
private final Consumer<RocksIterator> advanceIterator;
private volatile boolean open = true;
private KeyValue<Bytes, byte[]> next;
+ private Runnable closeCallback = null;
RocksDbIterator(final String storeName,
final RocksIterator iter,
- final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
final boolean forward) {
this.storeName = storeName;
this.iter = iter;
- this.openIterators = openIterators;
this.advanceIterator = forward ? RocksIterator::next :
RocksIterator::prev;
}
@@ -73,7 +69,11 @@ class RocksDbIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>> implemen
@Override
public synchronized void close() {
- openIterators.remove(this);
+ if (closeCallback == null) {
+ throw new IllegalStateException("RocksDbIterator expects close
callback to be set immediately upon creation");
+ }
+ closeCallback.run();
+
iter.close();
open = false;
}
@@ -85,4 +85,9 @@ class RocksDbIterator extends
AbstractIterator<KeyValue<Bytes, byte[]>> implemen
}
return next.key;
}
+
+ @Override
+ public synchronized void onClose(final Runnable closeCallback) {
+ this.closeCallback = closeCallback;
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
index b4c7d79ac7c..16badb50b97 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBRangeIteratorTest.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.Bytes;
import org.junit.Test;
import org.rocksdb.RocksIterator;
-import java.util.Collections;
import java.util.NoSuchElementException;
import static org.easymock.EasyMock.mock;
@@ -67,7 +67,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key3Bytes,
true,
@@ -103,7 +102,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key3Bytes,
false,
@@ -142,7 +140,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
toBytes,
true,
@@ -183,7 +180,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key4Bytes,
false,
@@ -212,7 +208,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key2Bytes,
true,
@@ -237,7 +232,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
fromBytes,
toBytes,
false,
@@ -265,7 +259,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key3Bytes,
true,
@@ -299,7 +292,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key3Bytes,
toBytes,
false,
@@ -331,7 +323,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key3Bytes,
true,
@@ -369,7 +360,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key3Bytes,
toBytes,
false,
@@ -398,16 +388,33 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key2Bytes,
true,
true
);
+ rocksDBRangeIterator.onClose(() -> { });
rocksDBRangeIterator.close();
verify(rocksIterator);
}
+ @Test
+ public void shouldCallCloseCallbackOnClose() {
+ final RocksIterator rocksIterator = mock(RocksIterator.class);
+ final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
+ storeName,
+ rocksIterator,
+ key1Bytes,
+ key2Bytes,
+ true,
+ true
+ );
+ final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+ rocksDBRangeIterator.onClose(() -> callbackCalled.set(true));
+ rocksDBRangeIterator.close();
+ assertThat(callbackCalled.get(), is(true));
+ }
+
@Test
public void shouldExcludeEndOfRange() {
final RocksIterator rocksIterator = mock(RocksIterator.class);
@@ -425,7 +432,6 @@ public class RocksDBRangeIteratorTest {
final RocksDBRangeIterator rocksDBRangeIterator = new
RocksDBRangeIterator(
storeName,
rocksIterator,
- Collections.emptySet(),
key1Bytes,
key2Bytes,
true,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 00b08d68ca1..092f695be76 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -171,6 +171,10 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
return new RocksDBStore(DB_NAME, METRICS_SCOPE, metricsRecorder);
}
+ private RocksDBStore getRocksDBStoreWithCustomManagedIterators() {
+ return new RocksDBStore(DB_NAME, METRICS_SCOPE, metricsRecorder,
false);
+ }
+
private InternalMockProcessorContext getProcessorContext(final Properties
streamsProps) {
return new InternalMockProcessorContext(
TestUtils.tempDirectory(),
@@ -527,6 +531,69 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
}
}
+ @Test
+ public void shouldAllowCustomManagedIterators() {
+ rocksDBStore = getRocksDBStoreWithCustomManagedIterators();
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+ final Set<KeyValueIterator<Bytes, byte[]>> openIterators = new
HashSet<>();
+
+ final KeyValueIterator<Bytes, byte[]> prefixScanIterator =
rocksDBStore.prefixScan("abcd", stringSerializer, openIterators);
+ assertThat(openIterators.size(), is(1));
+ prefixScanIterator.close();
+ assertThat(openIterators.size(), is(0));
+
+ final KeyValueIterator<Bytes, byte[]> rangeIterator =
rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "1")),
openIterators);
+ assertThat(openIterators.size(), is(1));
+ rangeIterator.close();
+ assertThat(openIterators.size(), is(0));
+
+ final KeyValueIterator<Bytes, byte[]> reverseRangeIterator =
rocksDBStore.reverseRange(null, new Bytes(stringSerializer.serialize(null,
"1")), openIterators);
+ assertThat(openIterators.size(), is(1));
+ reverseRangeIterator.close();
+ assertThat(openIterators.size(), is(0));
+
+ final KeyValueIterator<Bytes, byte[]> allIterator =
rocksDBStore.all(openIterators);
+ assertThat(openIterators.size(), is(1));
+ allIterator.close();
+ assertThat(openIterators.size(), is(0));
+
+ final KeyValueIterator<Bytes, byte[]> reverseAllIterator =
rocksDBStore.reverseAll(openIterators);
+ assertThat(openIterators.size(), is(1));
+ reverseAllIterator.close();
+ assertThat(openIterators.size(), is(0));
+ }
+
+ @Test
+ public void shouldRequireOpenIteratorsWhenUsingCustomManagedIterators() {
+ rocksDBStore = getRocksDBStoreWithCustomManagedIterators();
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+
+ assertThrows(IllegalStateException.class,
+ () -> rocksDBStore.prefixScan("abcd", stringSerializer));
+ assertThrows(IllegalStateException.class,
+ () -> rocksDBStore.range(null, new
Bytes(stringSerializer.serialize(null, "1"))));
+ assertThrows(IllegalStateException.class,
+ () -> rocksDBStore.reverseRange(null, new
Bytes(stringSerializer.serialize(null, "1"))));
+ assertThrows(IllegalStateException.class, () -> rocksDBStore.all());
+ assertThrows(IllegalStateException.class, () ->
rocksDBStore.reverseAll());
+ }
+
+ @Test
+ public void shouldNotAllowOpenIteratorsWhenUsingAutoManagedIterators() {
+ rocksDBStore = getRocksDBStore();
+ rocksDBStore.init((StateStoreContext) context, rocksDBStore);
+ final Set<KeyValueIterator<Bytes, byte[]>> openIterators = new
HashSet<>();
+
+ assertThrows(IllegalStateException.class,
+ () -> rocksDBStore.prefixScan("abcd", stringSerializer,
openIterators));
+ assertThrows(IllegalStateException.class,
+ () -> rocksDBStore.range(null, new
Bytes(stringSerializer.serialize(null, "1")), openIterators));
+ assertThrows(IllegalStateException.class,
+ () -> rocksDBStore.reverseRange(null, new
Bytes(stringSerializer.serialize(null, "1")), openIterators));
+ assertThrows(IllegalStateException.class, () ->
rocksDBStore.all(openIterators));
+ assertThrows(IllegalStateException.class, () ->
rocksDBStore.reverseAll(openIterators));
+ }
+
@Test
public void shouldReturnUUIDsWithStringPrefix() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();