[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973424#comment-15973424
 ] 

ASF GitHub Bot commented on FLINK-6242:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112002408
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
    @@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions}
       * it does no final aggregate evaluation. It also includes the logic of
       * [[DataSetSlideTimeWindowAggFlatMapFunction]].
       *
    -  * @param aggregates aggregate functions
    -  * @param groupingKeysLength number of grouping keys
    -  * @param timeFieldPos position of aligned time field
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param windowSize window size of the sliding window
       * @param windowSlide window slide of the sliding window
       * @param returnType return type of this function
       */
     class DataSetSlideTimeWindowAggReduceGroupFunction(
    -    private val aggregates: Array[AggregateFunction[_ <: Any]],
    -    private val groupingKeysLength: Int,
    -    private val timeFieldPos: Int,
    +    private val genAggregations: GeneratedAggregationsFunction,
    +    private val keysAndAggregatesArity: Int,
         private val windowSize: Long,
         private val windowSlide: Long,
         @transient private val returnType: TypeInformation[Row])
       extends RichGroupReduceFunction[Row, Row]
       with CombineFunction[Row, Row]
    -  with ResultTypeQueryable[Row] {
    +  with ResultTypeQueryable[Row]
    +  with Compiler[GeneratedAggregations] {
     
    -  Preconditions.checkNotNull(aggregates)
    +  private val timeFieldPos = returnType.getArity - 1
    +  private val intermediateWindowStartPos = keysAndAggregatesArity
     
       protected var intermediateRow: Row = _
    -  // add one field to store window start
    -  protected val intermediateRowArity: Int = groupingKeysLength + 
aggregates.length + 1
    -  protected val accumulatorList: Array[JArrayList[Accumulator]] = 
Array.fill(aggregates.length) {
    -    new JArrayList[Accumulator](2)
    -  }
    -  private val intermediateWindowStartPos: Int = intermediateRowArity - 1
    +  private var accumulators: Row = _
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +  private var function: GeneratedAggregations = _
     
       override def open(config: Configuration) {
    -    intermediateRow = new Row(intermediateRowArity)
    -
    -    // init lists with two empty accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      val accumulator = aggregates(i).createAccumulator()
    -      accumulatorList(i).add(accumulator)
    -      accumulatorList(i).add(accumulator)
    -      i += 1
    -    }
    +    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
    +                s"Code:\n$genAggregations.code")
    +    val clazz = compile(
    +      getClass.getClassLoader,
    +      genAggregations.name,
    +      genAggregations.code)
    +    LOG.debug("Instantiating AggregateHelper.")
    +    function = clazz.newInstance()
    +
    +    accumulators = function.createAccumulators()
    +    intermediateRow = function.createOutputRow()
       }
     
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
     
         // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      val accumulator = aggregates(i).createAccumulator()
    -      accumulatorList(i).set(0, accumulator)
    -      i += 1
    -    }
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
     
         while (iterator.hasNext) {
           val record = iterator.next()
     
           // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(groupingKeysLength + 
i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // trigger tumbling evaluation
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop


> codeGen DataSet Goupingwindow Aggregates
> ----------------------------------------
>
>                 Key: FLINK-6242
>                 URL: https://issues.apache.org/jira/browse/FLINK-6242
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to