Repository: kafka Updated Branches: refs/heads/trunk 794e6dbd1 -> da0b5b859
KAFKA-5216: Fix peekNextKey in cached window/session store iterators guozhangwang mjsax dguy Author: Xavier Léauté <[email protected]> Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang Closes #3016 from xvrl/kafka-5216 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da0b5b85 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da0b5b85 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da0b5b85 Branch: refs/heads/trunk Commit: da0b5b8596fa20836b8c80c473b3f37af96c9b96 Parents: 794e6db Author: Xavier Léauté <[email protected]> Authored: Fri May 12 15:27:03 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 15:27:03 2017 -0700 ---------------------------------------------------------------------- .../AbstractMergedSortedCacheStoreIterator.java | 2 +- ...SortedCacheWrappedWindowStoreIteratorTest.java | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/da0b5b85/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java index 438c5b2..c5c1a2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -134,7 +134,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa } if (nextStoreKey == null) { - return serdes.keyFrom(nextCacheKey.get()); + return deserializeCacheKey(nextCacheKey); } final int comparison = compare(nextCacheKey, nextStoreKey); http://git-wip-us.apache.org/repos/asf/kafka/blob/da0b5b85/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java index 5bc4e88..2048688 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java @@ -72,7 +72,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { } @Test - public void shouldPeekNextKey() throws Exception { + public void shouldPeekNextStoreKey() throws Exception { windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes())); cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes())); Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); @@ -85,4 +85,18 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { assertThat(iterator.peekNextKey(), equalTo(10L)); } -} \ No newline at end of file + @Test + public void shouldPeekNextCacheKey() throws Exception { + windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes())); + cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new LRUCacheEntry("b".getBytes())); + Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); + Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); + final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes); + final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())); + assertThat(iterator.peekNextKey(), equalTo(0L)); + iterator.next(); + assertThat(iterator.peekNextKey(), equalTo(10L)); + } + +}
