Repository: kafka
Updated Branches:
  refs/heads/trunk d2a267b11 -> 9edc71230


KAFKA-4223; RocksDBStore should close all open iterators on close

Keep track of open Rocks DB iterators. When a store is closed, close all open 
iterators.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Guozhang Wang <wangg...@gmail.com>

Closes #1917 from dguy/kafka-4223


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9edc7123
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9edc7123
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9edc7123

Branch: refs/heads/trunk
Commit: 9edc71230ee7b77fd512f6d923e76469508c3280
Parents: d2a267b
Author: Damian Guy <damian....@gmail.com>
Authored: Fri Sep 30 09:41:28 2016 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Fri Sep 30 09:41:28 2016 +0100

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   | 49 ++++++++++++++------
 .../internals/RocksDBKeyValueStoreTest.java     | 49 ++++++++++++++++++++
 .../state/internals/RocksDBWindowStoreTest.java | 41 ++++++++++++++++
 3 files changed, 126 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9edc7123/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6a34ef9..7bd1020 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -41,12 +41,16 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  * A persistent key-value store based on RocksDB.
@@ -62,6 +66,7 @@ import java.util.NoSuchElementException;
  */
 public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBStore.class);
     private static final int TTL_NOT_USED = -1;
 
     // TODO: these values should be configurable
@@ -76,8 +81,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
 
     private final String name;
     private final String parentDir;
+    private final Set<KeyValueIterator> openIterators = new HashSet<>();
 
-    protected File dbDir;
+    File dbDir;
     private StateSerdes<K, V> serdes;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
@@ -313,7 +319,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
     public synchronized KeyValueIterator<K, V> range(K from, K to) {
         validateStoreOpen();
         // query rocksdb
-        return new RocksDBRangeIterator<>(db.newIterator(), serdes, from, to);
+        final RocksDBRangeIterator rocksDBRangeIterator = new 
RocksDBRangeIterator(db.newIterator(), serdes, from, to);
+        openIterators.add(rocksDBRangeIterator);
+        return rocksDBRangeIterator;
     }
 
     @Override
@@ -322,7 +330,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
         // query rocksdb
         RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
-        return new RocksDbIterator<>(innerIter, serdes);
+        final RocksDbIterator rocksDbIterator = new RocksDbIterator(innerIter, 
serdes);
+        openIterators.add(rocksDbIterator);
+        return rocksDbIterator;
     }
 
     /**
@@ -384,6 +394,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
             return;
         }
         open = false;
+        closeOpenIterators();
         flush();
         options.close();
         wOptions.close();
@@ -396,27 +407,37 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         db = null;
     }
 
+    private void closeOpenIterators() {
+        for (KeyValueIterator iterator : new HashSet<>(openIterators)) {
+            iterator.close();
+        }
+        openIterators.clear();
+    }
 
 
-    public static class RocksDbIterator<K, V> implements KeyValueIterator<K, 
V> {
+    class RocksDbIterator implements KeyValueIterator<K, V> {
         private final RocksIterator iter;
         private final StateSerdes<K, V> serdes;
+        private boolean open = true;
 
-        public RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) {
+        RocksDbIterator(RocksIterator iter, StateSerdes<K, V> serdes) {
             this.iter = iter;
             this.serdes = serdes;
         }
 
-        public byte[] peekRawKey() {
+        byte[] peekRawKey() {
             return iter.key();
         }
 
-        protected KeyValue<K, V> getKeyValue() {
+        private KeyValue<K, V> getKeyValue() {
             return new KeyValue<>(serdes.keyFrom(iter.key()), 
serdes.valueFrom(iter.value()));
         }
 
         @Override
-        public boolean hasNext() {
+        public synchronized boolean hasNext() {
+            if (!open) {
+                throw new InvalidStateStoreException("store %s has closed");
+            }
             return iter.isValid();
         }
 
@@ -424,7 +445,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
          * @throws NoSuchElementException if no next element exist
          */
         @Override
-        public KeyValue<K, V> next() {
+        public synchronized KeyValue<K, V> next() {
             if (!hasNext())
                 throw new NoSuchElementException();
 
@@ -442,27 +463,29 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         }
 
         @Override
-        public void close() {
+        public synchronized void close() {
+            open = false;
+            openIterators.remove(this);
             iter.close();
         }
 
     }
 
-    private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, 
V> {
+    private class RocksDBRangeIterator extends RocksDbIterator {
         // RocksDB's JNI interface does not expose getters/setters that allow 
the
         // comparator to be pluggable, and the default is lexicographic, so 
it's
         // safe to just force lexicographic comparator here for now.
         private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
         private byte[] rawToKey;
 
-        public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> 
serdes, K from, K to) {
+        RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K 
from, K to) {
             super(iter, serdes);
             iter.seek(serdes.rawKey(from));
             this.rawToKey = serdes.rawKey(to);
         }
 
         @Override
-        public boolean hasNext() {
+        public synchronized boolean hasNext() {
             return super.hasNext() && comparator.compare(super.peekRawKey(), 
this.rawToKey) <= 0;
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9edc7123/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 5e41143..25e0620 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -17,12 +17,14 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 import org.rocksdb.Options;
 
@@ -31,6 +33,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
@@ -115,4 +118,50 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         assertFalse(range.hasNext());
     }
 
+    @Test
+    public void 
shouldCloseOpenIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext()
 throws Exception {
+        final KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
+        final MockProcessorContext context = (MockProcessorContext) 
driver.context();
+        context.setTime(1L);
+        final KeyValueStore<Integer, String> store = createStore(context, 
Integer.class, String.class, false, false);
+        store.put(1, "hi");
+        store.put(2, "goodbye");
+        final KeyValueIterator<Integer, String> iteratorOne = store.range(1, 
5);
+        final KeyValueIterator<Integer, String> iteratorTwo = store.range(1, 
4);
+
+        assertTrue(iteratorOne.hasNext());
+        assertTrue(iteratorTwo.hasNext());
+
+        store.close();
+
+        try {
+            iteratorOne.hasNext();
+            fail("should have thrown InvalidStateStoreException on closed 
store");
+        } catch (InvalidStateStoreException e) {
+            // ok
+        }
+
+        try {
+            iteratorOne.next();
+            fail("should have thrown InvalidStateStoreException on closed 
store");
+        } catch (InvalidStateStoreException e) {
+            // ok
+        }
+
+        try {
+            iteratorTwo.hasNext();
+            fail("should have thrown InvalidStateStoreException on closed 
store");
+        } catch (InvalidStateStoreException e) {
+            // ok
+        }
+
+        try {
+            iteratorTwo.next();
+            fail("should have thrown InvalidStateStoreException on closed 
store");
+        } catch (InvalidStateStoreException e) {
+            // ok
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9edc7123/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 8389dd6..f47bc24 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -51,6 +52,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class RocksDBWindowStoreTest {
 
@@ -839,6 +841,45 @@ public class RocksDBWindowStoreTest {
         }
     }
 
+    @Test
+    public void 
shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext()
 throws Exception {
+        final File baseDir = TestUtils.tempDirectory();
+        Producer<byte[], byte[]> producer = new MockProducer<>(true, 
byteArraySerde.serializer(), byteArraySerde.serializer());
+        RecordCollector recordCollector = new RecordCollector(producer, 
"RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+            @Override
+            public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+            }
+        };
+
+        MockProcessorContext context = new MockProcessorContext(
+                null, baseDir,
+                byteArraySerde, byteArraySerde,
+                recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+
+        final WindowStore<Integer, String> windowStore = 
createWindowStore(context, false, true);
+        context.setRecordContext(createRecordContext(0));
+        windowStore.put(1, "one", 1L);
+        windowStore.put(1, "two", 2L);
+        windowStore.put(1, "three", 3L);
+
+        final WindowStoreIterator<String> iterator = windowStore.fetch(1, 1L, 
3L);
+        assertTrue(iterator.hasNext());
+        windowStore.close();
+        try {
+            iterator.hasNext();
+            fail("should have thrown InvalidStateStoreException on closed 
store");
+        } catch (InvalidStateStoreException e) {
+            // ok
+        }
+
+        try {
+            iterator.next();
+            fail("should have thrown InvalidStateStoreException on closed 
store");
+        } catch (InvalidStateStoreException e) {
+            // ok
+        }
+    }
+
     private <E> List<E> toList(WindowStoreIterator<E> iterator) {
         ArrayList<E> list = new ArrayList<>();
         while (iterator.hasNext()) {

Reply via email to