Repository: kafka
Updated Branches:
  refs/heads/trunk f7ad3d1b1 -> feda3f68e


HOTFIX: open window segments on init

guozhangwang

A window store should open all existing segments. This is important for segment 
cleanup, and it also ensures that the first fetch() call returns the hits, the 
values in the search range. (previously, it missed the hits in fetch() 
immediately after initialization).

Author: Yasuhiro Matsuda <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #886 from ymatsuda/hotfix3


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/feda3f68
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/feda3f68
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/feda3f68

Branch: refs/heads/trunk
Commit: feda3f68e98b5269431db9f2a5f131c03a16f651
Parents: f7ad3d1
Author: Yasuhiro Matsuda <[email protected]>
Authored: Mon Feb 8 16:48:06 2016 -0800
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Mon Feb 8 16:48:06 2016 -0800

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   |  8 +-
 .../state/internals/RocksDBWindowStore.java     | 79 +++++++++++++++-----
 .../state/internals/RocksDBWindowStoreTest.java | 21 ++----
 3 files changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6176767..11bf96e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -60,6 +60,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
     private static final String DB_FILE_DIR = "rocksdb";
 
     private final String name;
+    private final String parentDir;
 
     private final Options options;
     private final WriteOptions wOptions;
@@ -91,7 +92,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
     }
 
     public RocksDBStore(String name, Serdes<K, V> serdes) {
+        this(name, DB_FILE_DIR, serdes);
+    }
+
+    public RocksDBStore(String name, String parentDir, Serdes<K, V> serdes) {
         this.name = name;
+        this.parentDir = parentDir;
         this.serdes = serdes;
 
         // initialize the rocksdb options
@@ -131,7 +137,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
 
     public void openDB(ProcessorContext context) {
         this.context = context;
-        this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), 
this.name);
+        this.dbDir = new File(new File(this.context.stateDir(), parentDir), 
this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 581b742..825c70d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.WindowStoreUtils;
 
 
+import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
@@ -47,8 +48,8 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
     private static class Segment extends RocksDBStore<byte[], byte[]> {
         public final long id;
 
-        Segment(String name, long id) {
-            super(name, WindowStoreUtils.INNER_SERDES);
+        Segment(String segmentName, String windowName, long id) {
+            super(segmentName, windowName, WindowStoreUtils.INNER_SERDES);
             this.id = id;
         }
 
@@ -159,6 +160,8 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
     public void init(ProcessorContext context) {
         this.context = context;
 
+        openExistingSegments();
+
         this.changeLogger = this.loggingEnabled ?
                 new RawStoreChangeLogger(name, context) : null;
 
@@ -169,6 +172,26 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
                 putInternal(key, value);
             }
         });
+
+        flush();
+    }
+
+    private void openExistingSegments() {
+        try {
+            File dir = new File(context.stateDir(), name);
+
+            if (dir.exists()) {
+                for (String segmentName : dir.list()) {
+                    long segmentId = segmentIdFromSegmentName(segmentName);
+                    if (segmentId >= 0)
+                        getSegment(segmentId);
+                }
+            } else {
+                dir.mkdir();
+            }
+        } catch (Exception ex) {
+
+        }
     }
 
     @Override
@@ -228,11 +251,12 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
         }
 
         // If the record is within the retention period, put it in the store.
-        if (segmentId > currentSegmentId - segments.length) {
+        Segment segment = getSegment(segmentId);
+        if (segment != null) {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
             byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, 
seqnum, serdes);
-            getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
+            segment.put(binaryKey, serdes.rawValue(value));
             return binaryKey;
         } else {
             return null;
@@ -249,16 +273,16 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
         }
 
         // If the record is within the retention period, put it in the store.
-        if (segmentId > currentSegmentId - segments.length)
-            getSegment(segmentId).put(binaryKey, binaryValue);
+        Segment segment = getSegment(segmentId);
+        if (segment != null)
+            segment.put(binaryKey, binaryValue);
     }
 
     private byte[] getInternal(byte[] binaryKey) {
         long segmentId = 
segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
 
-        Segment segment = segments[(int) (segmentId % segments.length)];
-
-        if (segment != null && segment.id == segmentId) {
+        Segment segment = getSegment(segmentId);
+        if (segment != null) {
             return segment.get(binaryKey);
         } else {
             return null;
@@ -277,9 +301,8 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
         ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new 
ArrayList<>();
 
         for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
-            Segment segment = segments[(int) (segmentId % segments.length)];
-
-            if (segment != null && segment.id == segmentId)
+            Segment segment = getSegment(segmentId);
+            if (segment != null)
                 iterators.add(segment.range(binaryFrom, binaryTo));
         }
 
@@ -291,14 +314,23 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
     }
 
     private Segment getSegment(long segmentId) {
-        int index = (int) (segmentId % segments.length);
+        if (segmentId > currentSegmentId - segments.length) {
+            int index = (int) (segmentId % segments.length);
 
-        if (segments[index] == null) {
-            segments[index] = new Segment(name + "-" + 
directorySuffix(segmentId), segmentId);
-            segments[index].openDB(context);
-        }
+            if (segments[index] != null && segments[index].id != segmentId) {
+                cleanup();
+            }
+
+            if (segments[index] == null) {
+                segments[index] = new Segment(segmentName(segmentId), name, 
segmentId);
+                segments[index].openDB(context);
+            }
+
+            return segments[index];
 
-        return segments[index];
+        } else {
+            return null;
+        }
     }
 
     private void cleanup() {
@@ -316,10 +348,19 @@ public class RocksDBWindowStore<K, V> implements 
WindowStore<K, V> {
     }
 
     // this method is defined public since it is used for unit tests
-    public String directorySuffix(long segmentId) {
+    public String segmentName(long segmentId) {
         return formatter.format(new Date(segmentId * segmentInterval));
     }
 
+    public long segmentIdFromSegmentName(String segmentName) {
+        try {
+            Date date = formatter.parse(segmentName);
+            return date.getTime() / segmentInterval;
+        } catch (Exception ex) {
+            return -1L;
+        }
+    }
+
     // this method is defined public since it is used for unit tests
     public Set<Long> segmentIds() {
         HashSet<Long> segmentIds = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 94385c8..3eda1be 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -39,6 +39,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,6 +53,7 @@ public class RocksDBWindowStoreTest {
 
     private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
     private final ByteArrayDeserializer byteArrayDeserializer = new 
ByteArrayDeserializer();
+    private final String windowName = "window";
     private final int numSegments = 3;
     private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
@@ -60,7 +62,7 @@ public class RocksDBWindowStoreTest {
 
     @SuppressWarnings("unchecked")
     protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext 
context, Serdes<K, V> serdes) {
-        StateStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, 
serdes, null);
+        StateStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, 
serdes, null);
 
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context);
@@ -516,7 +518,7 @@ public class RocksDBWindowStoreTest {
                 // check segment directories
                 store.flush();
                 assertEquals(
-                        Utils.mkSet(inner.directorySuffix(4L), 
inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        Utils.mkSet(inner.segmentName(4L), 
inner.segmentName(5L), inner.segmentName(6L)),
                         segmentDirs(baseDir)
                 );
             } finally {
@@ -606,7 +608,7 @@ public class RocksDBWindowStoreTest {
                     (RocksDBWindowStore<Integer, String>) 
((MeteredWindowStore<Integer, String>) store).inner();
 
             try {
-                context.restore("window", changeLog);
+                context.restore(windowName, changeLog);
 
                 assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
 
@@ -623,7 +625,7 @@ public class RocksDBWindowStoreTest {
                 // check segment directories
                 store.flush();
                 assertEquals(
-                        Utils.mkSet(inner.directorySuffix(4L), 
inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        Utils.mkSet(inner.segmentName(4L), 
inner.segmentName(5L), inner.segmentName(6L)),
                         segmentDirs(baseDir)
                 );
             } finally {
@@ -645,16 +647,9 @@ public class RocksDBWindowStoreTest {
     }
 
     private Set<String> segmentDirs(File baseDir) {
-        File rocksDbDir = new File(baseDir, "rocksdb");
-        String[] subdirs = rocksDbDir.list();
+        File windowDir = new File(baseDir, windowName);
 
-        HashSet<String> set = new HashSet<>();
-
-        for (String subdir : subdirs) {
-            if (subdir.startsWith("window-"))
-            set.add(subdir.substring(7));
-        }
-        return set;
+        return new HashSet<>(Arrays.asList(windowDir.list()));
     }
 
     private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], 
byte[]>> changeLog, long startTime) {

Reply via email to