Repository: kafka Updated Branches: refs/heads/trunk 4ee68b43c -> d2fc6f36c
MINOR: fix RocksDBStore range search The range is inclusive according to KeyValueStore's java doc. Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #883 from ymatsuda/minor Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2fc6f36 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2fc6f36 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2fc6f36 Branch: refs/heads/trunk Commit: d2fc6f36cc0b98d4d0acaa62ce3f2283c4e60581 Parents: 4ee68b4 Author: Yasuhiro Matsuda <[email protected]> Authored: Tue Feb 9 05:00:38 2016 +0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Feb 9 05:00:38 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 2 +- .../kafka/streams/state/internals/RocksDBWindowStore.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d2fc6f36/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 5c57854..6176767 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -456,7 +456,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @Override public boolean hasNext() { - return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0; + return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2fc6f36/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index d6baf30..581b742 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -189,6 +189,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { @Override public void close() { + flush(); for (Segment segment : segments) { if (segment != null) segment.close(); @@ -271,7 +272,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { long segTo = segmentId(Math.max(0L, timeTo)); byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); - byte[] binaryUntil = WindowStoreUtils.toBinaryKey(key, timeTo + 1L, 0, serdes); + byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes); ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>(); @@ -279,7 +280,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { Segment segment = segments[(int) (segmentId % segments.length)]; if (segment != null && segment.id == segmentId) - iterators.add(segment.range(binaryFrom, binaryUntil)); + iterators.add(segment.range(binaryFrom, binaryTo)); } if (iterators.size() > 0) {
