KAFKA-3735: Dispose all RocksObejcts upon completeness Author: Guozhang Wang <[email protected]>
Reviewers: Roger Hoover, Eno Thereska, Ismael Juma Closes #1411 from guozhangwang/K3735-dispose-rocksobject Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bef359ef Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bef359ef Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bef359ef Branch: refs/heads/0.10.0 Commit: bef359ef2e53920d91dfac037c0fceefb51954d1 Parents: 73949c2 Author: Guozhang Wang <[email protected]> Authored: Fri May 20 11:52:36 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sat Jun 18 11:46:41 2016 -0700 ---------------------------------------------------------------------- .../wordcount/WordCountProcessorDemo.java | 16 +++--- .../kstream/internals/KStreamKStreamJoin.java | 19 ++++--- .../internals/KStreamWindowAggregate.java | 49 ++++++++--------- .../kstream/internals/KStreamWindowReduce.java | 56 +++++++++----------- .../kafka/streams/state/KeyValueIterator.java | 3 ++ .../streams/state/WindowStoreIterator.java | 8 ++- .../streams/state/internals/RocksDBStore.java | 31 +++++++---- .../internals/ProcessorTopologyTest.java | 8 +-- .../streams/state/KeyValueStoreTestDriver.java | 8 +-- .../state/internals/RocksDBWindowStoreTest.java | 7 +-- 10 files changed, 110 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 34c35b7..1ee6928 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -81,19 +81,17 @@ public class WordCountProcessorDemo { @Override public void punctuate(long timestamp) { - KeyValueIterator<String, Integer> iter = this.kvStore.all(); + try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) { + System.out.println("----------- " + timestamp + " ----------- "); - System.out.println("----------- " + timestamp + " ----------- "); + while (iter.hasNext()) { + KeyValue<String, Integer> entry = iter.next(); - while (iter.hasNext()) { - KeyValue<String, Integer> entry = iter.next(); + System.out.println("[" + entry.key + ", " + entry.value + "]"); - System.out.println("[" + entry.key + ", " + entry.value + "]"); - - context.forward(entry.key, entry.value.toString()); + context.forward(entry.key, entry.value.toString()); + } } - - iter.close(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index d13d112..72029a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -25,8 +24,8 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { @@ -76,15 +75,15 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); - Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo); - while (iter.hasNext()) { - needOuterJoin = false; - context().forward(key, joiner.apply(value, iter.next().value)); - } + try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) { + while (iter.hasNext()) { + needOuterJoin = false; + context().forward(key, joiner.apply(value, iter.next().value)); + } - if (needOuterJoin) - context().forward(key, joiner.apply(value, null)); + if (needOuterJoin) + context().forward(key, joiner.apply(value, null)); + } } } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index b4272f8..125c7fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; import java.util.Map; public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> { @@ -90,38 +89,37 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; } - WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo); + try (WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo)) { - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue<Long, T> entry = iter.next(); - W window = matchedWindows.get(entry.key); + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue<Long, T> entry = iter.next(); + W window = matchedWindows.get(entry.key); - if (window != null) { + if (window != null) { - T oldAgg = entry.value; + T oldAgg = entry.value; - if (oldAgg == null) - oldAgg = initializer.apply(); + if (oldAgg == null) + oldAgg = initializer.apply(); - // try to add the new new value (there will never be old value) - T newAgg = aggregator.apply(key, value, oldAgg); + // try to add the new new value (there will never be old value) + T newAgg = aggregator.apply(key, value, oldAgg); - // update the store with the new value - windowStore.put(key, newAgg, window.start()); + // update the store with the new value + windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); + matchedWindows.remove(entry.key); + } } } - iter.close(); - // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { T oldAgg = initializer.apply(); @@ -167,10 +165,9 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea W window = (W) windowedKey.window(); // this iterator should contain at most one element - Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.hasNext() ? iter.next().value : null; + try (WindowStoreIterator<T> iter = windowStore.fetch(key, window.start(), window.start())) { + return iter.hasNext() ? iter.next().value : null; + } } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index 3ed1499..a526506 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; import java.util.Map; public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> { @@ -88,40 +87,38 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; } - WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo); + try (WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo)) { + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue<Long, V> entry = iter.next(); + W window = matchedWindows.get(entry.key); - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue<Long, V> entry = iter.next(); - W window = matchedWindows.get(entry.key); + if (window != null) { - if (window != null) { + V oldAgg = entry.value; + V newAgg = oldAgg; - V oldAgg = entry.value; - V newAgg = oldAgg; + // try to add the new new value (there will never be old value) + if (newAgg == null) { + newAgg = value; + } else { + newAgg = reducer.apply(newAgg, value); + } - // try to add the new new value (there will never be old value) - if (newAgg == null) { - newAgg = value; - } else { - newAgg = reducer.apply(newAgg, value); - } - - // update the store with the new value - windowStore.put(key, newAgg, window.start()); + // update the store with the new value + windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); + matchedWindows.remove(entry.key); + } } } - iter.close(); - // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { windowStore.put(key, value, windowStartMs); @@ -161,10 +158,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr W window = (W) windowedKey.window(); // this iterator should only contain one element - Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.next().value; + try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(), window.start())) { + return iter.next().value; + } } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index cdb3de5..ddbc7b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -27,6 +27,9 @@ import java.util.Iterator; /** * Iterator interface of {@link KeyValue}. * + * Users need to call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * * @param <K> Type of keys * @param <V> Type of values */ http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index 7c474dd..b6e6d0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -21,13 +21,19 @@ package org.apache.kafka.streams.state; import org.apache.kafka.streams.KeyValue; +import java.io.Closeable; import java.util.Iterator; /** * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}. * + * Users need to call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * * @param <E> Type of values */ -public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> { +public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>>, Closeable { + + @Override void close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 37609a0..a00de19 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -77,17 +77,18 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private final String name; private final String parentDir; - private final Options options; - private final WriteOptions wOptions; - private final FlushOptions fOptions; - + protected File dbDir; + private StateSerdes<K, V> serdes; private final Serde<K> keySerde; private final Serde<V> valueSerde; - private StateSerdes<K, V> serdes; - protected File dbDir; private RocksDB db; + // the following option objects will be created at constructor and disposed at close() + private Options options; + private WriteOptions wOptions; + private FlushOptions fOptions; + private boolean loggingEnabled = false; private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE; @@ -313,14 +314,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) { WriteBatch batch = new WriteBatch(); - for (KeyValue<byte[], byte[]> entry : entries) { - batch.put(entry.key, entry.value); - } - try { + for (KeyValue<byte[], byte[]> entry : entries) { + batch.put(entry.key, entry.value); + } + db.write(wOptions, batch); } catch (RocksDBException e) { throw new ProcessorStateException("Error while batch writing to store " + this.name, e); + } finally { + batch.dispose(); } } @@ -425,7 +428,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @Override public void close() { flush(); + options.dispose(); + wOptions.dispose(); + fOptions.dispose(); db.close(); + + options = null; + wOptions = null; + fOptions = null; + db = null; } private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 1095fcf..62b283a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -351,9 +351,11 @@ public class ProcessorTopologyTest { @Override public void punctuate(long streamTime) { int count = 0; - for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) { - iter.next(); - ++count; + try (KeyValueIterator<String, String> iter = store.all()) { + while (iter.hasNext()) { + iter.next(); + ++count; + } } context().forward(Long.toString(streamTime), count); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 3a35d75..be5596d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -362,9 +362,11 @@ public class KeyValueStoreTestDriver<K, V> { */ public int sizeOf(KeyValueStore<K, V> store) { int size = 0; - for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) { - iterator.next(); - ++size; + try (KeyValueIterator<K, V> iterator = store.all()) { + while (iterator.hasNext()) { + iterator.next(); + ++size; + } } return size; } http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index e9888ad..d889e7b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -785,9 +785,10 @@ public class RocksDBWindowStoreTest { segmentDirs(baseDir) ); - WindowStoreIterator iter = store.fetch(0, 0L, 1000000L); - while (iter.hasNext()) { - iter.next(); + try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) { + while (iter.hasNext()) { + iter.next(); + } } assertEquals(
