Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5555#discussion_r170577231 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -105,11 +106,32 @@ class AggregationCodeGenerator( // get unique function name val funcName = newName(name) + + // get distinct filter of acc fields for each aggregate functions + val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + val distinctAggType = s"${classOf[DistinctAggDelegateFunction[_, _]].getName}" + val isDistinctAcc = aggregates.map(_.isInstanceOf[DistinctAggDelegateFunction[_, _]]) + // register UDAGGs val aggs = aggregates.map(a => addReusableFunction(a, contextTerm)) + // register real aggregate functions without distinct delegate + val realAggregates: Array[AggregateFunction[_ <: Any, _ <: Any]] = aggregates.map { + case distinctAggDelegate: DistinctAggDelegateFunction[_, _] => + distinctAggDelegate.realAgg + case agg: AggregateFunction[_, _] => + agg + } + + val realAggTypes = aggregates.map { --- End diff -- Can be replaced by `realAggregates.map(_.getClass.getName)`
---