Repository: kafka Updated Branches: refs/heads/trunk d2a267b11 -> 9edc71230
KAFKA-4223; RocksDBStore should close all open iterators on close Keep track of open Rocks DB iterators. When a store is closed, close all open iterators. Author: Damian Guy <damian....@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com> Closes #1917 from dguy/kafka-4223 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9edc7123 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9edc7123 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9edc7123 Branch: refs/heads/trunk Commit: 9edc71230ee7b77fd512f6d923e76469508c3280 Parents: d2a267b Author: Damian Guy <damian....@gmail.com> Authored: Fri Sep 30 09:41:28 2016 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Fri Sep 30 09:41:28 2016 +0100 ---------------------------------------------------------------------- .../streams/state/internals/RocksDBStore.java | 49 ++++++++++++++------ .../internals/RocksDBKeyValueStoreTest.java | 49 ++++++++++++++++++++ .../state/internals/RocksDBWindowStoreTest.java | 41 ++++++++++++++++ 3 files changed, 126 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9edc7123/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 6a34ef9..7bd1020 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -41,12 +41,16 @@ import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; /** * A persistent key-value store based on RocksDB. @@ -62,6 +66,7 @@ import java.util.NoSuchElementException; */ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { + private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class); private static final int TTL_NOT_USED = -1; // TODO: these values should be configurable @@ -76,8 +81,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private final String name; private final String parentDir; + private final Set<KeyValueIterator> openIterators = new HashSet<>(); - protected File dbDir; + File dbDir; private StateSerdes<K, V> serdes; private final Serde<K> keySerde; private final Serde<V> valueSerde; @@ -313,7 +319,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { public synchronized KeyValueIterator<K, V> range(K from, K to) { validateStoreOpen(); // query rocksdb - return new RocksDBRangeIterator<>(db.newIterator(), serdes, from, to); + final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(db.newIterator(), serdes, from, to); + openIterators.add(rocksDBRangeIterator); + return rocksDBRangeIterator; } @Override @@ -322,7 +330,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { // query rocksdb RocksIterator innerIter = db.newIterator(); innerIter.seekToFirst(); - return new RocksDbIterator<>(innerIter, serdes); + final RocksDbIterator rocksDbIterator = new RocksDbIterator(innerIter, serdes); + openIterators.add(rocksDbIterator); + return rocksDbIterator; } /** @@ -384,6 +394,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { return; } open = false; + closeOpenIterators(); flush(); options.close(); wOptions.close(); @@ -396,27 +407,37 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { db = null; } + private void closeOpenIterators() { + for (KeyValueIterator iterator : new HashSet<>(openIterators)) { + iterator.close(); + } + openIterators.clear(); + } - public static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> { + class RocksDbIterator implements KeyValueIterator<K, V> { private final RocksIterator iter; private final StateSerdes<K, V> serdes; + private boolean open = true; - public RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) { + RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) { this.iter = iter; this.serdes = serdes; } - public byte[] peekRawKey() { + byte[] peekRawKey() { return iter.key(); } - protected KeyValue<K, V> getKeyValue() { + private KeyValue<K, V> getKeyValue() { return new KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value())); } @Override - public boolean hasNext() { + public synchronized boolean hasNext() { + if (!open) { + throw new InvalidStateStoreException("store %s has closed"); + } return iter.isValid(); } @@ -424,7 +445,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { * @throws NoSuchElementException if no next element exist */ @Override - public KeyValue<K, V> next() { + public synchronized KeyValue<K, V> next() { if (!hasNext()) throw new NoSuchElementException(); @@ -442,27 +463,29 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } @Override - public void close() { + public synchronized void close() { + open = false; + openIterators.remove(this); iter.close(); } } - private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> { + private class RocksDBRangeIterator extends RocksDbIterator { // RocksDB's JNI interface does not expose getters/setters that allow the // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR; private byte[] rawToKey; - public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) { + RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) { super(iter, serdes); iter.seek(serdes.rawKey(from)); this.rawToKey = serdes.rawKey(to); } @Override - public boolean hasNext() { + public synchronized boolean hasNext() { return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9edc7123/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 5e41143..25e0620 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -17,12 +17,14 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStoreTestDriver; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; import org.rocksdb.Options; @@ -31,6 +33,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -115,4 +118,50 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { assertFalse(range.hasNext()); } + @Test + public void shouldCloseOpenIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext() throws Exception { + final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + final MockProcessorContext context = (MockProcessorContext) driver.context(); + context.setTime(1L); + final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false); + store.put(1, "hi"); + store.put(2, "goodbye"); + final KeyValueIterator<Integer, String> iteratorOne = store.range(1, 5); + final KeyValueIterator<Integer, String> iteratorTwo = store.range(1, 4); + + assertTrue(iteratorOne.hasNext()); + assertTrue(iteratorTwo.hasNext()); + + store.close(); + + try { + iteratorOne.hasNext(); + fail("should have thrown InvalidStateStoreException on closed store"); + } catch (InvalidStateStoreException e) { + // ok + } + + try { + iteratorOne.next(); + fail("should have thrown InvalidStateStoreException on closed store"); + } catch (InvalidStateStoreException e) { + // ok + } + + try { + iteratorTwo.hasNext(); + fail("should have thrown InvalidStateStoreException on closed store"); + } catch (InvalidStateStoreException e) { + // ok + } + + try { + iteratorTwo.next(); + fail("should have thrown InvalidStateStoreException on closed store"); + } catch (InvalidStateStoreException e) { + // ok + } + + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/9edc7123/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 8389dd6..f47bc24 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -51,6 +52,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RocksDBWindowStoreTest { @@ -839,6 +841,45 @@ public class RocksDBWindowStoreTest { } } + @Test + public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception { + final File baseDir = TestUtils.tempDirectory(); + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerde, byteArraySerde, + recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + + final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true); + context.setRecordContext(createRecordContext(0)); + windowStore.put(1, "one", 1L); + windowStore.put(1, "two", 2L); + windowStore.put(1, "three", 3L); + + final WindowStoreIterator<String> iterator = windowStore.fetch(1, 1L, 3L); + assertTrue(iterator.hasNext()); + windowStore.close(); + try { + iterator.hasNext(); + fail("should have thrown InvalidStateStoreException on closed store"); + } catch (InvalidStateStoreException e) { + // ok + } + + try { + iterator.next(); + fail("should have thrown InvalidStateStoreException on closed store"); + } catch (InvalidStateStoreException e) { + // ok + } + } + private <E> List<E> toList(WindowStoreIterator<E> iterator) { ArrayList<E> list = new ArrayList<>(); while (iterator.hasNext()) {