[ https://issues.apache.org/jira/browse/FLINK-3474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15176085#comment-15176085 ]
ASF GitHub Bot commented on FLINK-3474: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1746#discussion_r54761986 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -17,26 +17,77 @@ */ package org.apache.flink.api.table.runtime.aggregate +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.table.Row + /** - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it - * with grouped aggregate field values, and finally get the aggregated value. - * @tparam T the output type + * The interface for all Flink aggregate functions, which expressed in terms of initiate(), + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases: + * -- In Map phase, use prepare() to transform aggregate field value into intermediate + * aggregate value. + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value. + * For associative decomposable aggregate functions, they support partial aggregate. To optimize + * the performance, a Combine phase would be added between Map phase and GroupReduce phase, + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values + * into aggregate buffer. + * + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start + * field index in Row, so different aggregate functions could share the same Row as intermediate + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM + * primitives, and Flink use intermediateDataType() to get its data types in SQL side. + * + * @tparam T Aggregated value type. */ trait Aggregate[T] extends Serializable { + + protected var aggOffsetInRow: Int = _ + /** - * Initialize the aggregate state. + * Initiate the intermediate aggregate value in Row. + * @param intermediate */ - def initiateAggregate + def initiate(intermediate: Row): Unit /** - * Feed the aggregate field value. + * Transform the aggregate field value into intermediate aggregate data. * @param value + * @param intermediate */ - def aggregate(value: Any) + def prepare(value: Any, intermediate: Row): Unit --- End diff -- Yes, this would be aggregation specific. For example for a `SUM` aggregation, `prepare` could insert a `0`, which is basically the same what `initiate` would do. However, it is also OK, to do it in `prepare` directly. > Partial aggregate interface design and sort-based implementation > ---------------------------------------------------------------- > > Key: FLINK-3474 > URL: https://issues.apache.org/jira/browse/FLINK-3474 > Project: Flink > Issue Type: Sub-task > Components: Table API > Reporter: Chengxiang Li > Assignee: Chengxiang Li > > The scope of this sub task includes: > # Partial aggregate interface. > # Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX. > # DataSetAggregateRule which translate logical calcite aggregate node to > Flink user functions. As hash-based combiner is not available yet(see PR > #1517), we would use sort-based combine as default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)