[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973418#comment-15973418
 ] 

ASF GitHub Bot commented on FLINK-6242:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112001966
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
    @@ -30,78 +30,46 @@ import org.apache.flink.types.Row
       *
       * It is used for sliding on batch for both time and count-windows.
       *
    -  * @param aggregates aggregate functions.
    -  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
    -  *                         and output Row.
    -  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
    -  *                         index in output Row.
    -  * @param finalRowArity output row field count
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param finalRowWindowStartPos relative window-start position to last 
field of output row
       * @param finalRowWindowEndPos relative window-end position to last field 
of output row
       * @param windowSize size of the window, used to determine window-end for 
output row
       */
     class DataSetSlideWindowAggReduceCombineFunction(
    -    aggregates: Array[AggregateFunction[_ <: Any]],
    -    groupKeysMapping: Array[(Int, Int)],
    -    aggregateMapping: Array[(Int, Int)],
    -    finalRowArity: Int,
    +    genAggregations: GeneratedAggregationsFunction,
    +    keysAndAggregatesArity: Int,
         finalRowWindowStartPos: Option[Int],
         finalRowWindowEndPos: Option[Int],
         windowSize: Long)
       extends DataSetSlideWindowAggReduceGroupFunction(
    -    aggregates,
    -    groupKeysMapping,
    -    aggregateMapping,
    -    finalRowArity,
    +    genAggregations,
    +    keysAndAggregatesArity,
         finalRowWindowStartPos,
         finalRowWindowEndPos,
         windowSize)
       with CombineFunction[Row, Row] {
     
    -  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
    -  private val intermediateRow: Row = new Row(intermediateRowArity)
    +  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
     
       override def combine(records: Iterable[Row]): Row = {
     
    -    // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulatorList(i).get(0))
    -      i += 1
    -    }
    +    // reset accumulator
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
         while (iterator.hasNext) {
           val record = iterator.next()
     
    -      // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(groupKeysMapping.length + 
i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // check if this record is the last record
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop to save the check of the condition in the loop 
body.


> codeGen DataSet Goupingwindow Aggregates
> ----------------------------------------
>
>                 Key: FLINK-6242
>                 URL: https://issues.apache.org/jira/browse/FLINK-6242
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to