[ 
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891306#comment-15891306
 ] 

ASF GitHub Bot commented on FLINK-5768:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103801735
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -363,199 +342,112 @@ object AggregateUtil {
               groupingOffsetMapping,
               aggOffsetMapping,
               groupingSetsMapping,
    -          intermediateRowArity,
               outputType.getFieldCount)
           }
         groupReduceFunction
       }
     
       /**
    -    * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] 
for incremental window
    -    * aggregation.
    -    *
    +    * Create an [[AllWindowFunction]] for non-partitioned window 
aggregates.
         */
    -  private[flink] def createIncrementalAggregateReduceFunction(
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int])
    -    : IncrementalAggregateReduceFunction = {
    -
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val groupingOffsetMapping =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)._1
    -
    -    val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
    -    val reduceFunction = new IncrementalAggregateReduceFunction(
    -      aggregates,
    -      groupingOffsetMapping,
    -      intermediateRowArity)
    -    reduceFunction
    -  }
    -
    -  /**
    -    * Create an [[AllWindowFunction]] to compute non-partitioned group 
window aggregates.
    -    */
    -  private[flink] def createAllWindowAggregationFunction(
    +  private[flink] def createAggregationAllWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, 
DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    +      new IncrementalAggregateAllTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
         } else {
    -      new AggregateAllWindowFunction(aggFunction)
    +      new IncrementalAggregateAllWindowFunction(
    +        finalRowArity)
         }
       }
     
       /**
    -    * Create a [[WindowFunction]] to compute partitioned group window 
aggregates.
    -    *
    +    * Create a [[WindowFunction]] for group window aggregates.
         */
    -  private[flink] def createWindowAggregationFunction(
    +  private[flink] def createAggregationGroupWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, 
Tuple, DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    +      new IncrementalAggregateTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
         } else {
    -      new AggregateWindowFunction(aggFunction)
    +      new IncrementalAggregateWindowFunction(
    +        finalRowArity)
         }
       }
     
    -  /**
    -    * Create an [[AllWindowFunction]] to finalize incrementally 
pre-computed non-partitioned
    -    * window aggregates.
    -    */
    -  private[flink] def createAllWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    +  private[flink] def createDataStreamAggregateFunction(
           namedAggregates: Seq[CalcitePair[AggregateCall, String]],
           inputType: RelDataType,
           outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    +      groupKeysIndex: Array[Int]): (ApiAggregateFunction[Row, Row, Row], 
RowTypeInfo) = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupKeysIndex.length)
     
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -      namedAggregates,
    -      inputType,
    -      outputType,
    -      groupings)
    +    val groupKeysMapping = getGroupKeysMapping(inputType, outputType, 
groupKeysIndex)
     
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateAllTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateAllWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    +    if (groupKeysMapping.length != groupKeysIndex.length ||
    +      aggregateMapping.length != namedAggregates.length) {
    +      throw new TableException(
    +        "Could not find output field in input data type or aggregate 
functions.")
         }
    +
    +    val accumulatorRowType = createAccumulatorRowType(inputType, 
groupKeysIndex, aggregates)
    +    val aggFunction = new AggregateAggFunction(
    +      aggregates,
    +      aggFields,
    +      aggregateMapping,
    +      groupKeysIndex,
    +      groupKeysMapping,
    +      outputType.getFieldCount)
    +
    +    (aggFunction, accumulatorRowType)
       }
     
       /**
    -    * Create a [[WindowFunction]] to finalize incrementally pre-computed 
window aggregates.
    +    * Return true if all aggregates can be partially merged. False 
otherwise.
         */
    -  private[flink] def createWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateCalls: Seq[AggregateCall],
           inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    +      groupKeysCount: Int): Boolean = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)
    -
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateList = transformToAggregateFunctions(
    +      aggregateCalls,
    +      inputType,
    +      groupKeysCount)._2
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    -    }
    +    doAllSupportPartialMerge(aggregateList)
       }
     
       /**
    -    * Return true if all aggregates can be partially computed. False 
otherwise.
    +    * Return true if all aggregates can be partially merged. False 
otherwise.
         */
    -  private[flink] def doAllSupportPartialAggregation(
    -    aggregateCalls: Seq[AggregateCall],
    -    inputType: RelDataType,
    -    groupKeysCount: Int): Boolean = {
    -    transformToAggregateFunctions(
    -      aggregateCalls,
    -      inputType,
    -      groupKeysCount)._2.forall(_.supportPartial)
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateList: Array[TableAggregateFunction[_ <: Any]]): Boolean = {
    +    var ret: Boolean = true
    --- End diff --
    
    can be simplified to 
    ```
    aggregateList.forall(ifMethodExitInFunction("merge", _))
    ```


> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
>                 Key: FLINK-5768
>                 URL: https://issues.apache.org/jira/browse/FLINK-5768
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to 
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in 
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use 
> new aggregation functions.
> 3. Clean up unused class and method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to