[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973428#comment-15973428 ]
ASF GitHub Bot commented on FLINK-6242: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112002707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala --- @@ -18,111 +18,77 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import java.util.{ArrayList => JArrayList} import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row -import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory /** * It wraps the aggregate logic inside of * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. * * It is used for sliding on batch for both time and count-windows. * - * @param aggregates aggregate functions. - * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping index mapping between aggregate function list and aggregated value - * index in output Row. - * @param finalRowArity output row field count + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceGroupFunction( - aggregates: Array[AggregateFunction[_ <: Any]], - groupKeysMapping: Array[(Int, Int)], - aggregateMapping: Array[(Int, Int)], - finalRowArity: Int, + genAggregations: GeneratedAggregationsFunction, + keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], windowSize: Long) - extends RichGroupReduceFunction[Row, Row] { - - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupKeysMapping) + extends RichGroupReduceFunction[Row, Row] + with Compiler[GeneratedAggregations] { private var collector: TimeWindowPropertyCollector = _ + protected val windowStartPos: Int = keysAndAggregatesArity + private var output: Row = _ - private val accumulatorStartPos: Int = groupKeysMapping.length - protected val windowStartPos: Int = accumulatorStartPos + aggregates.length + protected var accumulators: Row = _ - val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { - new JArrayList[Accumulator](2) - } + val LOG = LoggerFactory.getLogger(this.getClass) + protected var function: GeneratedAggregations = _ override def open(config: Configuration) { - output = new Row(finalRowArity) + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() + + output = function.createOutputRow() + accumulators = function.createAccumulators() collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) - - // init lists with two empty accumulators - var i = 0 - while (i < aggregates.length) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - i += 1 - } } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { - // reset first accumulator - var i = 0 - while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - } + // reset accumulator + function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() - // accumulate - i = 0 - while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { --- End diff -- move this behind the loop > codeGen DataSet Goupingwindow Aggregates > ---------------------------------------- > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)