Repository: kafka Updated Branches: refs/heads/trunk f7ad3d1b1 -> feda3f68e
HOTFIX: open window segments on init guozhangwang A window store should open all existing segments. This is important for segment cleanup, and it also ensures that the first fetch() call returns the hits, the values in the search range. (previously, it missed the hits in fetch() immediately after initialization). Author: Yasuhiro Matsuda <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #886 from ymatsuda/hotfix3 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/feda3f68 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/feda3f68 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/feda3f68 Branch: refs/heads/trunk Commit: feda3f68e98b5269431db9f2a5f131c03a16f651 Parents: f7ad3d1 Author: Yasuhiro Matsuda <[email protected]> Authored: Mon Feb 8 16:48:06 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Feb 8 16:48:06 2016 -0800 ---------------------------------------------------------------------- .../streams/state/internals/RocksDBStore.java | 8 +- .../state/internals/RocksDBWindowStore.java | 79 +++++++++++++++----- .../state/internals/RocksDBWindowStoreTest.java | 21 ++---- 3 files changed, 75 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 6176767..11bf96e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -60,6 +60,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private static final String DB_FILE_DIR = "rocksdb"; private final String name; + private final String parentDir; private final Options options; private final WriteOptions wOptions; @@ -91,7 +92,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } public RocksDBStore(String name, Serdes<K, V> serdes) { + this(name, DB_FILE_DIR, serdes); + } + + public RocksDBStore(String name, String parentDir, Serdes<K, V> serdes) { this.name = name; + this.parentDir = parentDir; this.serdes = serdes; // initialize the rocksdb options @@ -131,7 +137,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { public void openDB(ProcessorContext context) { this.context = context; - this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name); + this.dbDir = new File(new File(this.context.stateDir(), parentDir), this.name); this.db = openDB(this.dbDir, this.options, TTL_SECONDS); } http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 581b742..825c70d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.WindowStoreUtils; +import java.io.File; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; @@ -47,8 +48,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private static class Segment extends RocksDBStore<byte[], byte[]> { public final long id; - Segment(String name, long id) { - super(name, WindowStoreUtils.INNER_SERDES); + Segment(String segmentName, String windowName, long id) { + super(segmentName, windowName, WindowStoreUtils.INNER_SERDES); this.id = id; } @@ -159,6 +160,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { public void init(ProcessorContext context) { this.context = context; + openExistingSegments(); + this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null; @@ -169,6 +172,26 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { putInternal(key, value); } }); + + flush(); + } + + private void openExistingSegments() { + try { + File dir = new File(context.stateDir(), name); + + if (dir.exists()) { + for (String segmentName : dir.list()) { + long segmentId = segmentIdFromSegmentName(segmentName); + if (segmentId >= 0) + getSegment(segmentId); + } + } else { + dir.mkdir(); + } + } catch (Exception ex) { + + } } @Override @@ -228,11 +251,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } // If the record is within the retention period, put it in the store. - if (segmentId > currentSegmentId - segments.length) { + Segment segment = getSegment(segmentId); + if (segment != null) { if (retainDuplicates) seqnum = (seqnum + 1) & 0x7FFFFFFF; byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes); - getSegment(segmentId).put(binaryKey, serdes.rawValue(value)); + segment.put(binaryKey, serdes.rawValue(value)); return binaryKey; } else { return null; @@ -249,16 +273,16 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } // If the record is within the retention period, put it in the store. - if (segmentId > currentSegmentId - segments.length) - getSegment(segmentId).put(binaryKey, binaryValue); + Segment segment = getSegment(segmentId); + if (segment != null) + segment.put(binaryKey, binaryValue); } private byte[] getInternal(byte[] binaryKey) { long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey)); - Segment segment = segments[(int) (segmentId % segments.length)]; - - if (segment != null && segment.id == segmentId) { + Segment segment = getSegment(segmentId); + if (segment != null) { return segment.get(binaryKey); } else { return null; @@ -277,9 +301,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>(); for (long segmentId = segFrom; segmentId <= segTo; segmentId++) { - Segment segment = segments[(int) (segmentId % segments.length)]; - - if (segment != null && segment.id == segmentId) + Segment segment = getSegment(segmentId); + if (segment != null) iterators.add(segment.range(binaryFrom, binaryTo)); } @@ -291,14 +314,23 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } private Segment getSegment(long segmentId) { - int index = (int) (segmentId % segments.length); + if (segmentId > currentSegmentId - segments.length) { + int index = (int) (segmentId % segments.length); - if (segments[index] == null) { - segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId); - segments[index].openDB(context); - } + if (segments[index] != null && segments[index].id != segmentId) { + cleanup(); + } + + if (segments[index] == null) { + segments[index] = new Segment(segmentName(segmentId), name, segmentId); + segments[index].openDB(context); + } + + return segments[index]; - return segments[index]; + } else { + return null; + } } private void cleanup() { @@ -316,10 +348,19 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } // this method is defined public since it is used for unit tests - public String directorySuffix(long segmentId) { + public String segmentName(long segmentId) { return formatter.format(new Date(segmentId * segmentInterval)); } + public long segmentIdFromSegmentName(String segmentName) { + try { + Date date = formatter.parse(segmentName); + return date.getTime() / segmentInterval; + } catch (Exception ex) { + return -1L; + } + } + // this method is defined public since it is used for unit tests public Set<Long> segmentIds() { HashSet<Long> segmentIds = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 94385c8..3eda1be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -39,6 +39,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -52,6 +53,7 @@ public class RocksDBWindowStoreTest { private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer(); + private final String windowName = "window"; private final int numSegments = 3; private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL; private final long retentionPeriod = segmentSize * (numSegments - 1); @@ -60,7 +62,7 @@ public class RocksDBWindowStoreTest { @SuppressWarnings("unchecked") protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) { - StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null); + StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, serdes, null); WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); store.init(context); @@ -516,7 +518,7 @@ public class RocksDBWindowStoreTest { // check segment directories store.flush(); assertEquals( - Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)), + Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), segmentDirs(baseDir) ); } finally { @@ -606,7 +608,7 @@ public class RocksDBWindowStoreTest { (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); try { - context.restore("window", changeLog); + context.restore(windowName, changeLog); assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds()); @@ -623,7 +625,7 @@ public class RocksDBWindowStoreTest { // check segment directories store.flush(); assertEquals( - Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)), + Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), segmentDirs(baseDir) ); } finally { @@ -645,16 +647,9 @@ public class RocksDBWindowStoreTest { } private Set<String> segmentDirs(File baseDir) { - File rocksDbDir = new File(baseDir, "rocksdb"); - String[] subdirs = rocksDbDir.list(); + File windowDir = new File(baseDir, windowName); - HashSet<String> set = new HashSet<>(); - - for (String subdir : subdirs) { - if (subdir.startsWith("window-")) - set.add(subdir.substring(7)); - } - return set; + return new HashSet<>(Arrays.asList(windowDir.list())); } private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
