[ 
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888227#comment-15888227
 ] 

ASF GitHub Bot commented on FLINK-5768:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103469278
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala
 ---
    @@ -19,61 +19,71 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    +import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.CombineFunction
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
     import org.apache.flink.types.Row
     
    -import scala.collection.JavaConversions._
    -
     /**
    - * It wraps the aggregate logic inside of
    - * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    - * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    - *
    - * @param aggregates          The aggregate functions.
    - * @param groupKeysMapping    The index mapping of group keys between 
intermediate aggregate Row
    - *                            and output Row.
    - * @param aggregateMapping    The index mapping between aggregate function 
list and aggregated value
    - *                            index in output Row.
    - * @param groupingSetsMapping The index mapping of keys in grouping sets 
between intermediate
    - *                            Row and output Row.
    - */
    +  * It wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    +  * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    +  *
    +  * @param aggregates          The aggregate functions.
    +  * @param groupKeysMapping    The index mapping of group keys between 
intermediate aggregate Row
    +  *                            and output Row.
    +  * @param aggregateMapping    The index mapping between aggregate 
function list and aggregated
    +  *                            value
    +  *                            index in output Row.
    +  * @param groupingSetsMapping The index mapping of keys in grouping sets 
between intermediate
    +  *                            Row and output Row.
    +  * @param finalRowArity       the arity of the final resulting row
    +  */
     class AggregateReduceCombineFunction(
    -    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val aggregates: Array[AggregateFunction[_ <: Any]],
         private val groupKeysMapping: Array[(Int, Int)],
         private val aggregateMapping: Array[(Int, Int)],
         private val groupingSetsMapping: Array[(Int, Int)],
    -    private val intermediateRowArity: Int,
         private val finalRowArity: Int)
       extends AggregateReduceGroupFunction(
         aggregates,
         groupKeysMapping,
         aggregateMapping,
         groupingSetsMapping,
    -    intermediateRowArity,
    -    finalRowArity)
    -  with CombineFunction[Row, Row] {
    +    finalRowArity) with CombineFunction[Row, Row] {
     
       /**
    -   * For sub-grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
    -   *
    -   * @param records  Sub-grouped intermediate aggregate Rows iterator.
    -   * @return Combined intermediate aggregate Row.
    -   *
    -   */
    +    * For sub-grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
    +    *
    +    * @param records Sub-grouped intermediate aggregate Rows iterator.
    +    * @return Combined intermediate aggregate Row.
    +    *
    +    */
       override def combine(records: Iterable[Row]): Row = {
     
    -    // Initiate intermediate aggregate value.
    -    aggregates.foreach(_.initiate(aggregateBuffer))
    -
    -    // Merge intermediate aggregate value to buffer.
    +    // merge intermediate aggregate value to buffer.
         var last: Row = null
    -    records.foreach((record) => {
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +    val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    +      new JArrayList[Accumulator]()
    +    }
    +
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(
    --- End diff --
    
    Same as for `reduce()`. We cannot materialize the whole group but must 
merge the accumulators pairwise.


> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
>                 Key: FLINK-5768
>                 URL: https://issues.apache.org/jira/browse/FLINK-5768
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to 
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in 
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use 
> new aggregation functions.
> 3. Clean up unused class and method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to