Repository: kafka Updated Branches: refs/heads/trunk e472ee7b6 -> 257ad524d
KAFKA-5174: Have at least 2 threads for compaction and flushing in RocksDB This fix needs to be backported to 0.10.2 as well. Author: Eno Thereska <[email protected]> Reviewers: Damian Guy, Ismael Juma, Guozhang Wang Closes #2982 from enothereska/KAFKA-5174-1-core Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/257ad524 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/257ad524 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/257ad524 Branch: refs/heads/trunk Commit: 257ad524d305bd052feb8bf81d6736f74a19983f Parents: e472ee7 Author: Eno Thereska <[email protected]> Authored: Mon May 8 09:40:57 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon May 8 09:40:57 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/internals/RocksDBStore.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/257ad524/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index c879b91..da582b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -126,9 +126,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { options.setErrorIfExists(false); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); // this is the recommended way to increase parallelism in RocksDb - // note that the current implementation increases the number of compaction threads - // but not flush threads. - options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); + // note that the current implementation of setIncreaseParallelism affects the number + // of compaction threads but not flush threads (the latter remains one). Also + // the parallelism value needs to be at least two because of the code in + // https://github.com/facebook/rocksdb/blob/62ad0a9b19f0be4cefa70b6b32876e764b7f3c11/util/options.cc#L580 + // subtracts one from the value passed to determine the number of compaction threads + // (this could be a bug in the RocksDB code and their devs have been contacted). + options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2)); wOptions = new WriteOptions(); wOptions.setDisableWAL(true);
