Repository: kafka Updated Branches: refs/heads/trunk 4c6d7ed95 -> 0b0925a16
HOTFIX: Check hasNext in KStreamWindowReduce Author: Guozhang Wang <[email protected]> Reviewers: Damian Guy, Matthias J. Sax Closes #1520 from guozhangwang/KHotfix-iter-hasNext-window-value-getter Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b0925a1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b0925a1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b0925a1 Branch: refs/heads/trunk Commit: 0b0925a16f6cc94ad96fbc4dc2bcf48bf96557e6 Parents: 4c6d7ed Author: Guozhang Wang <[email protected]> Authored: Sat Jun 18 11:25:33 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sat Jun 18 11:25:33 2016 -0700 ---------------------------------------------------------------------- .../kafka/streams/kstream/internals/KStreamWindowReduce.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0b0925a1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index 46d99a8..763ccdd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -135,13 +135,13 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr return new KTableValueGetterSupplier<Windowed<K>, V>() { public KTableValueGetter<Windowed<K>, V> get() { - return new KStreamAggregateValueGetter(); + return new KStreamWindowReduceValueGetter(); } }; } - private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> { + private class KStreamWindowReduceValueGetter implements KTableValueGetter<Windowed<K>, V> { private WindowStore<K, V> windowStore; @@ -159,7 +159,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr // this iterator should only contain one element try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(), window.start())) { - return iter.next().value; + return iter.hasNext() ? iter.next().value : null; } } }
