Repository: kafka
Updated Branches:
  refs/heads/trunk c97a75d98 -> 86a9036a7


MINOR: fix the logic of RocksDBWindowStore using RocksDBStore Segments

Author: Guozhang Wang <[email protected]>

Reviewers: Yasuhiro Matsuda

Closes #849 from guozhangwang/KRemoveInitializedCheck


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86a9036a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86a9036a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86a9036a

Branch: refs/heads/trunk
Commit: 86a9036a7b03c8ae07d014c25a5eedc315544139
Parents: c97a75d
Author: Guozhang Wang <[email protected]>
Authored: Tue Feb 2 10:14:13 2016 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Tue Feb 2 10:14:13 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/state/internals/RocksDBStore.java  | 9 +++++++--
 .../kafka/streams/state/internals/RocksDBWindowStore.java   | 2 +-
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d7e229d..556e7cd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -129,11 +129,16 @@ public class RocksDBStore<K, V> implements 
KeyValueStore<K, V> {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public void init(ProcessorContext context) {
+    public void openDB(ProcessorContext context) {
         this.context = context;
         this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), 
this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void init(ProcessorContext context) {
+        // first open the DB dir
+        openDB(context);
 
         this.changeLogger = this.loggingEnabled ? new 
RawStoreChangeLogger(name, context) : null;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index afb0f09..d6baf30 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -294,7 +294,7 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
 
         if (segments[index] == null) {
             segments[index] = new Segment(name + "-" + 
directorySuffix(segmentId), segmentId);
-            segments[index].init(context);
+            segments[index].openDB(context);
         }
 
         return segments[index];

Reply via email to