[ 
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888239#comment-15888239
 ] 

ASF GitHub Bot commented on FLINK-5768:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103469070
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
 ---
    @@ -18,32 +18,33 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    +import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.RichGroupReduceFunction
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
     import org.apache.flink.types.Row
     import org.apache.flink.util.{Collector, Preconditions}
     
    -import scala.collection.JavaConversions._
    -
     /**
    - * It wraps the aggregate logic inside of
    - * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    - *
    - * @param aggregates          The aggregate functions.
    - * @param groupKeysMapping    The index mapping of group keys between 
intermediate aggregate Row
    - *                            and output Row.
    - * @param aggregateMapping    The index mapping between aggregate function 
list and aggregated value
    - *                            index in output Row.
    - * @param groupingSetsMapping The index mapping of keys in grouping sets 
between intermediate
    - *                            Row and output Row.
    - */
    +  * It wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    +  *
    +  * @param aggregates          The aggregate functions.
    +  * @param groupKeysMapping    The index mapping of group keys between 
intermediate aggregate Row
    +  *                            and output Row.
    +  * @param aggregateMapping    The index mapping between aggregate 
function list and aggregated
    +  *                            value
    +  *                            index in output Row.
    +  * @param groupingSetsMapping The index mapping of keys in grouping sets 
between intermediate
    +  *                            Row and output Row.
    +  * @param finalRowArity       The arity of the final resulting row
    +  */
     class AggregateReduceGroupFunction(
    -    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val aggregates: Array[AggregateFunction[_ <: Any]],
         private val groupKeysMapping: Array[(Int, Int)],
    --- End diff --
    
    The group key positions in the input are known, right?
    So we can also use an `Array[Int]` instead?


> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
>                 Key: FLINK-5768
>                 URL: https://issues.apache.org/jira/browse/FLINK-5768
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to 
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in 
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use 
> new aggregation functions.
> 3. Clean up unused class and method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to