Repository: kafka Updated Branches: refs/heads/0.10.2 d7850a400 -> 0e8b08477
KAFKA-5174: Have at least 2 threads for compaction and flushing in RocksDB This fix needs to be backported to 0.10.2 as well. Author: Eno Thereska <[email protected]> Reviewers: Damian Guy, Ismael Juma, Guozhang Wang Closes #2982 from enothereska/KAFKA-5174-1-core Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e8b0847 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e8b0847 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e8b0847 Branch: refs/heads/0.10.2 Commit: 0e8b0847749d78815c1ec3a066a82a1f5e2e4165 Parents: d7850a4 Author: Eno Thereska <[email protected]> Authored: Mon May 8 09:40:57 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon May 8 09:56:24 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/state/internals/RocksDBStore.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0e8b0847/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index acfb5b1..a778cd8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -124,9 +124,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { options.setErrorIfExists(false); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); // this is the recommended way to increase parallelism in RocksDb - // note that the current implementation increases the number of compaction threads - // but not flush threads. - options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); + // note that the current implementation of setIncreaseParallelism affects the number + // of compaction threads but not flush threads (the latter remains one). Also + // the parallelism value needs to be at least two because of the code in + // https://github.com/facebook/rocksdb/blob/62ad0a9b19f0be4cefa70b6b32876e764b7f3c11/util/options.cc#L580 + // subtracts one from the value passed to determine the number of compaction threads + // (this could be a bug in the RocksDB code and their devs have been contacted). + options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2)); wOptions = new WriteOptions(); wOptions.setDisableWAL(true);
