Repository: kafka Updated Branches: refs/heads/0.10.2 29214d336 -> 05c13552f
KAFKA-5216: Fix peekNextKey in cached window/session store iterators guozhangwang mjsax dguy Author: Xavier Léauté <[email protected]> Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang Closes #3016 from xvrl/kafka-5216 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05c13552 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05c13552 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05c13552 Branch: refs/heads/0.10.2 Commit: 05c13552f910b20aa45e221f92614cffe76c08fc Parents: 29214d3 Author: Xavier Léauté <[email protected]> Authored: Fri May 12 15:27:03 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 16:51:21 2017 -0700 ---------------------------------------------------------------------- .../AbstractMergedSortedCacheStoreIterator.java | 2 +- .../MergedSortedCacheWindowStoreIteratorTest.java | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/05c13552/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java index 009dad0..344ca5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -137,7 +137,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V> implements KeyVa } if (nextStoreKey == null) { - return serdes.keyFrom(nextCacheKey.get()); + return deserializeCacheKey(nextCacheKey); } final int comparison = compare(nextCacheKey, nextStoreKey); http://git-wip-us.apache.org/repos/asf/kafka/blob/05c13552/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java index 376fca8..f209632 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java @@ -72,7 +72,7 @@ public class MergedSortedCacheWindowStoreIteratorTest { } @Test - public void shouldPeekNextKey() throws Exception { + public void shouldPeekNextStoreKey() throws Exception { windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes())); cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes())); byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); @@ -85,4 +85,18 @@ public class MergedSortedCacheWindowStoreIteratorTest { assertThat(iterator.peekNextKey(), equalTo(10L)); } -} \ No newline at end of file + @Test + public void shouldPeekNextCacheKey() throws Exception { + windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes())); + cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new LRUCacheEntry("b".getBytes())); + Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes); + Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes); + final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, fromBytes, toBytes); + final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray())); + assertThat(iterator.peekNextKey(), equalTo(0L)); + iterator.next(); + assertThat(iterator.peekNextKey(), equalTo(10L)); + } + +}
