Repository: flink Updated Branches: refs/heads/release-1.3 b307081a3 -> 7893aba41
[FLINK-7971] [table] Fix potential NPE in non-windowed aggregation. This closes #4941. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6ac86f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6ac86f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6ac86f4 Branch: refs/heads/release-1.3 Commit: f6ac86f4ee69207d3df21111b33c2b5c49121694 Parents: b307081 Author: Xpray <leonxp...@gmail.com> Authored: Fri Nov 3 15:19:42 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Nov 8 18:52:29 2017 +0100 ---------------------------------------------------------------------- .../flink/table/runtime/aggregate/GroupAggProcessFunction.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f6ac86f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index 385778c..794523c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -98,11 +98,14 @@ class GroupAggProcessFunction( if (null == accumulators) { firstRow = true accumulators = function.createAccumulators() - inputCnt = 0L } else { firstRow = false } + if (null == inputCnt) { + inputCnt = 0L + } + // Set group keys value to the final output function.setForwardedFields(input, newRow.row) function.setForwardedFields(input, prevRow.row)