[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686706#comment-15686706
 ] 

ASF GitHub Bot commented on FLINK-4937:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2792#discussion_r89088166
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -39,66 +45,69 @@ object AggregateUtil {
       type JavaList[T] = java.util.List[T]
     
       /**
    -   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
    -   * operator functions:
    -   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    -   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
    -   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    -   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
    -   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
    -   * format:
    -   *
    -   * {{{
    -   *                   avg(x) aggOffsetInRow = 2          count(z) 
aggOffsetInRow = 5
    -   *                             |                          |
    -   *                             v                          v
    -   *        +---------+---------+--------+--------+--------+--------+
    -   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    -   *        +---------+---------+--------+--------+--------+--------+
    -   *                                              ^
    -   *                                              |
    -   *                               sum(y) aggOffsetInRow = 4
    -   * }}}
    -   *
    -   */
    -  def createOperatorFunctionsForAggregates(
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int])
    -    : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
    -
    -    val aggregateFunctionsAndFieldIndexes =
    -      transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
    -    // store the aggregate fields of each aggregate function, by the same 
order of aggregates.
    -    val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
    -    val aggregates = aggregateFunctionsAndFieldIndexes._2
    +    * Create prepare MapFunction for aggregates.
    +    * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
    +    * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
    +    * format:
    +    *
    +    * {{{
    +    *                   avg(x) aggOffsetInRow = 2          count(z) 
aggOffsetInRow = 5
    +    *                             |                          |
    +    *                             v                          v
    +    *        +---------+---------+--------+--------+--------+--------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +    *        +---------+---------+--------+--------+--------+--------+
    +    *                                              ^
    +    *                                              |
    +    *                               sum(y) aggOffsetInRow = 4
    +    * }}}
    +    *
    +    */
    +  private[flink] def createPrepareMapFunction(
    +    aggregates: Array[Aggregate[_ <: Any]],
    +    aggFieldIndexes: Array[Int],
    +    groupings: Array[Int],
    +    inputType:
    +    RelDataType): MapFunction[Any, Row] = {
     
         val mapReturnType: RowTypeInfo =
           createAggregateBufferDataType(groupings, aggregates, inputType)
     
         val mapFunction = new AggregateMapFunction[Row, Row](
    -        aggregates, aggFieldIndexes, groupings,
    -        
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
    -
    -    // the mapping relation between field index of intermediate aggregate 
Row and output Row.
    -    val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
    -
    -    // the mapping relation between aggregate function index in list and 
its corresponding
    -    // field index in output Row.
    -    val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
    -
    -    if (groupingOffsetMapping.length != groupings.length ||
    -        aggOffsetMapping.length != namedAggregates.length) {
    -      throw new TableException("Could not find output field in input data 
type " +
    -          "or aggregate functions.")
    -    }
    -
    -    val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
    +      aggregates,
    +      aggFieldIndexes,
    +      groupings,
    +      
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
     
    -    val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
    +    mapFunction
    +  }
     
    -    val reduceGroupFunction =
    +  /**
    +    * Create AggregateGroupReduceFunction for aggregates. It implement
    +    * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if 
it's partial aggregate,
    +    * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well).
    +    *
    +    */
    +  def createAggregateGroupReduceFunction(
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    inputType: RelDataType,
    +    outputType: RelDataType,
    +    groupings: Array[Int],
    +    aggregates: Array[Aggregate[_ <: Any]]): RichGroupReduceFunction[Row, 
Row] = {
    --- End diff --
    
    I think we can remove this parameter and compute it through 
`namedAggregates`.


> Add incremental group window aggregation for streaming Table API
> ----------------------------------------------------------------
>
>                 Key: FLINK-4937
>                 URL: https://issues.apache.org/jira/browse/FLINK-4937
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to