[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973420#comment-15973420 ]
ASF GitHub Bot commented on FLINK-6242: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111995303 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala --- @@ -19,88 +19,71 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import java.util.{ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} +import org.slf4j.LoggerFactory /** * [[RichGroupReduceFunction]] to compute the final result of a pre-aggregated aggregation * for batch (DataSet) queries. * - * @param aggregates The aggregate functions. - * @param aggOutFields The positions of the aggregation results in the output + * @param genAggregations Code-generated [[GeneratedAggregations]] * @param gkeyOutFields The positions of the grouping keys in the output * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. - * @param finalRowArity The arity of the final resulting row */ class DataSetFinalAggFunction( - private val aggregates: Array[AggregateFunction[_ <: Any]], - private val aggOutFields: Array[Int], + private val genAggregations: GeneratedAggregationsFunction, private val gkeyOutFields: Array[Int], - private val groupingSetsMapping: Array[(Int, Int)], - private val finalRowArity: Int) - extends RichGroupReduceFunction[Row, Row] { + private val groupingSetsMapping: Array[(Int, Int)]) + extends RichGroupReduceFunction[Row, Row] + with Compiler[GeneratedAggregations] { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(aggOutFields) Preconditions.checkNotNull(gkeyOutFields) Preconditions.checkNotNull(groupingSetsMapping) private var output: Row = _ + private var accumulators: Row = _ + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ private val intermediateGKeys: Option[Array[Int]] = if (!groupingSetsMapping.isEmpty) { Some(gkeyOutFields) } else { None } - private val numAggs = aggregates.length - private val numGKeys = gkeyOutFields.length - - private val accumulators: Array[JArrayList[Accumulator]] = - Array.fill(numAggs)(new JArrayList[Accumulator](2)) - override def open(config: Configuration) { - output = new Row(finalRowArity) - - // init lists with two empty accumulators - for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulators(i).add(accumulator) - accumulators(i).add(accumulator) - } + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + output = function.createOutputRow() + accumulators = function.createAccumulators() } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { val iterator = records.iterator() // reset first accumulator - var i = 0 - while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulators(i).get(0)) - i += 1 - } + function.resetAccumulator(accumulators) + var i = 0 while (iterator.hasNext) { val record = iterator.next() --- End diff -- we can make `record` a `var` and move its definition outside of the loop. Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. > codeGen DataSet Goupingwindow Aggregates > ---------------------------------------- > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)