Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112001966
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
    @@ -30,78 +30,46 @@ import org.apache.flink.types.Row
       *
       * It is used for sliding on batch for both time and count-windows.
       *
    -  * @param aggregates aggregate functions.
    -  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
    -  *                         and output Row.
    -  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
    -  *                         index in output Row.
    -  * @param finalRowArity output row field count
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param finalRowWindowStartPos relative window-start position to last 
field of output row
       * @param finalRowWindowEndPos relative window-end position to last field 
of output row
       * @param windowSize size of the window, used to determine window-end for 
output row
       */
     class DataSetSlideWindowAggReduceCombineFunction(
    -    aggregates: Array[AggregateFunction[_ <: Any]],
    -    groupKeysMapping: Array[(Int, Int)],
    -    aggregateMapping: Array[(Int, Int)],
    -    finalRowArity: Int,
    +    genAggregations: GeneratedAggregationsFunction,
    +    keysAndAggregatesArity: Int,
         finalRowWindowStartPos: Option[Int],
         finalRowWindowEndPos: Option[Int],
         windowSize: Long)
       extends DataSetSlideWindowAggReduceGroupFunction(
    -    aggregates,
    -    groupKeysMapping,
    -    aggregateMapping,
    -    finalRowArity,
    +    genAggregations,
    +    keysAndAggregatesArity,
         finalRowWindowStartPos,
         finalRowWindowEndPos,
         windowSize)
       with CombineFunction[Row, Row] {
     
    -  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
    -  private val intermediateRow: Row = new Row(intermediateRowArity)
    +  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
     
       override def combine(records: Iterable[Row]): Row = {
     
    -    // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulatorList(i).get(0))
    -      i += 1
    -    }
    +    // reset accumulator
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
         while (iterator.hasNext) {
           val record = iterator.next()
     
    -      // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(groupKeysMapping.length + 
i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // check if this record is the last record
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop to save the check of the condition in the loop 
body.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to