Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5555#discussion_r182254735 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -327,19 +392,41 @@ class AggregationCodeGenerator( for (i <- aggs.indices) yield if (partialResults) { - j""" - | output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin + if (isDistinctAggs(i)) { + + j""" + | $distinctAccType distinctAcc$i = ($distinctAccType) accs.getField($i); + | output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) distinctAcc$i.getRealAcc());""".stripMargin --- End diff -- We need to forward the distinct maps as well. `partialResults` is used when an operator needs to emit partial aggregation results such as a combine function in batch execution. So we don't need to distinguish the `isDistinctAggs(i)` case here.
---