Repository: kafka Updated Branches: refs/heads/0.10.0 73ec226d3 -> 175fbb559
HOTFIX: Check hasNext in KStreamWindowReduce Author: Guozhang Wang <[email protected]> Reviewers: Damian Guy, Matthias J. Sax Closes #1520 from guozhangwang/KHotfix-iter-hasNext-window-value-getter Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/175fbb55 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/175fbb55 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/175fbb55 Branch: refs/heads/0.10.0 Commit: 175fbb559073f21d9ae2b65be22fddbae4e152ea Parents: 73ec226 Author: Guozhang Wang <[email protected]> Authored: Sat Jun 18 11:25:33 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sat Jun 18 11:56:31 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/internals/KStreamWindowReduce.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/175fbb55/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index a526506..510c138 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -135,13 +135,13 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr return new KTableValueGetterSupplier<Windowed<K>, V>() { public KTableValueGetter<Windowed<K>, V> get() { - return new KStreamAggregateValueGetter(); + return new KStreamWindowReduceValueGetter(); } }; } - private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> { + private class KStreamWindowReduceValueGetter implements KTableValueGetter<Windowed<K>, V> { private WindowStore<K, V> windowStore; @@ -159,7 +159,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr // this iterator should only contain one element try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(), window.start())) { - return iter.next().value; + return iter.hasNext() ? iter.next().value : null; } } }
