Repository: kafka Updated Branches: refs/heads/trunk 7258a5fdd -> b1cd3afc1
KAFKA-5198: Synchronize on RocksDbStore#openIterators â¦it is accessed from multiple threads Author: Colin P. Mccabe <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3000 from cmccabe/KAFKA-5198 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1cd3afc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1cd3afc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1cd3afc Branch: refs/heads/trunk Commit: b1cd3afc13070c4c50ba963520e7266f97ac52b4 Parents: 7258a5f Author: Colin P. Mccabe <[email protected]> Authored: Fri May 12 17:01:26 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 17:01:26 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/internals/RocksDBStore.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd3afc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index da582b4..a01de77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -47,6 +47,7 @@ import org.rocksdb.WriteOptions; import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -82,7 +83,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private final String name; private final String parentDir; - private final Set<KeyValueIterator> openIterators = new HashSet<>(); + private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>()); File dbDir; private StateSerdes<K, V> serdes; @@ -386,10 +387,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } private void closeOpenIterators() { - for (KeyValueIterator iterator : new HashSet<>(openIterators)) { + HashSet<KeyValueIterator> iterators = null; + synchronized (openIterators) { + iterators = new HashSet<>(openIterators); + } + for (KeyValueIterator iterator : iterators) { iterator.close(); } - openIterators.clear(); } private class RocksDbIterator implements KeyValueIterator<K, V> {
