[ https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973418#comment-15973418 ]
ASF GitHub Bot commented on FLINK-6242: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r112001966 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala --- @@ -30,78 +30,46 @@ import org.apache.flink.types.Row * * It is used for sliding on batch for both time and count-windows. * - * @param aggregates aggregate functions. - * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row - * and output Row. - * @param aggregateMapping index mapping between aggregate function list and aggregated value - * index in output Row. - * @param finalRowArity output row field count + * @param genAggregations Code-generated [[GeneratedAggregations]] + * @param keysAndAggregatesArity The total arity of keys and aggregates * @param finalRowWindowStartPos relative window-start position to last field of output row * @param finalRowWindowEndPos relative window-end position to last field of output row * @param windowSize size of the window, used to determine window-end for output row */ class DataSetSlideWindowAggReduceCombineFunction( - aggregates: Array[AggregateFunction[_ <: Any]], - groupKeysMapping: Array[(Int, Int)], - aggregateMapping: Array[(Int, Int)], - finalRowArity: Int, + genAggregations: GeneratedAggregationsFunction, + keysAndAggregatesArity: Int, finalRowWindowStartPos: Option[Int], finalRowWindowEndPos: Option[Int], windowSize: Long) extends DataSetSlideWindowAggReduceGroupFunction( - aggregates, - groupKeysMapping, - aggregateMapping, - finalRowArity, + genAggregations, + keysAndAggregatesArity, finalRowWindowStartPos, finalRowWindowEndPos, windowSize) with CombineFunction[Row, Row] { - private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1 - private val intermediateRow: Row = new Row(intermediateRowArity) + private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1) override def combine(records: Iterable[Row]): Row = { - // reset first accumulator - var i = 0 - while (i < aggregates.length) { - aggregates(i).resetAccumulator(accumulatorList(i).get(0)) - i += 1 - } + // reset accumulator + function.resetAccumulator(accumulators) val iterator = records.iterator() while (iterator.hasNext) { val record = iterator.next() - // accumulate - i = 0 - while (i < aggregates.length) { - // insert received accumulator into acc list - val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - i += 1 - } + function.mergeAccumulatorsPairWithKeyOffset(accumulators, record) // check if this record is the last record if (!iterator.hasNext) { --- End diff -- move this behind the loop to save the check of the condition in the loop body. > codeGen DataSet Goupingwindow Aggregates > ---------------------------------------- > > Key: FLINK-6242 > URL: https://issues.apache.org/jira/browse/FLINK-6242 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)