Repository: kafka Updated Branches: refs/heads/trunk c97a75d98 -> 86a9036a7
MINOR: fix the logic of RocksDBWindowStore using RocksDBStore Segments Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda Closes #849 from guozhangwang/KRemoveInitializedCheck Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86a9036a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86a9036a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86a9036a Branch: refs/heads/trunk Commit: 86a9036a7b03c8ae07d014c25a5eedc315544139 Parents: c97a75d Author: Guozhang Wang <[email protected]> Authored: Tue Feb 2 10:14:13 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Feb 2 10:14:13 2016 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/state/internals/RocksDBStore.java | 9 +++++++-- .../kafka/streams/state/internals/RocksDBWindowStore.java | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index d7e229d..556e7cd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -129,11 +129,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } } - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { + public void openDB(ProcessorContext context) { this.context = context; this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name); this.db = openDB(this.dbDir, this.options, TTL_SECONDS); + } + + @SuppressWarnings("unchecked") + public void init(ProcessorContext context) { + // first open the DB dir + openDB(context); this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null; http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index afb0f09..d6baf30 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -294,7 +294,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { if (segments[index] == null) { segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId); - segments[index].init(context); + segments[index].openDB(context); } return segments[index];
