Repository: kafka Updated Branches: refs/heads/0.10.2 767cc31a8 -> b9fc41f04
KAFKA-5198: Synchronize on RocksDbStore#openIterators since it is accessed from multiple threads Author: Colin P. Mccabe <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3000 from cmccabe/KAFKA-5198 (cherry picked from commit b1cd3afc13070c4c50ba963520e7266f97ac52b4) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9fc41f0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9fc41f0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9fc41f0 Branch: refs/heads/0.10.2 Commit: b9fc41f04b0dc964896f95e41fcf882d734e0326 Parents: 767cc31 Author: Colin P. Mccabe <[email protected]> Authored: Fri May 12 17:01:26 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 17:03:11 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/internals/RocksDBStore.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b9fc41f0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index a778cd8..71b5cdf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -45,6 +45,7 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import java.io.File; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -80,7 +81,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private final String name; private final String parentDir; - private final Set<KeyValueIterator> openIterators = new HashSet<>(); + private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>()); File dbDir; private StateSerdes<K, V> serdes; @@ -379,10 +380,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } private void closeOpenIterators() { - for (KeyValueIterator iterator : new HashSet<>(openIterators)) { + HashSet<KeyValueIterator> iterators = null; + synchronized (openIterators) { + iterators = new HashSet<>(openIterators); + } + for (KeyValueIterator iterator : iterators) { iterator.close(); } - openIterators.clear(); }
