[
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888217#comment-15888217
]
ASF GitHub Bot commented on FLINK-5768:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3423#discussion_r103467528
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
---
@@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
override def open(config: Configuration) {
Preconditions.checkNotNull(aggregates)
Preconditions.checkNotNull(groupKeysMapping)
- aggregateBuffer = new Row(intermediateRowArity)
+ aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
output = new Row(finalRowArity)
if (!groupingSetsMapping.isEmpty) {
intermediateGroupKeys = Some(groupKeysMapping.map(_._1))
}
}
/**
- * For grouped intermediate aggregate Rows, merge all of them into
aggregate buffer,
- * calculate aggregated values output by aggregate buffer, and set them
into output
- * Row based on the mapping relation between intermediate aggregate data
and output data.
- *
- * @param records Grouped intermediate aggregate Rows iterator.
- * @param out The collector to hand results to.
- *
- */
+ * For grouped intermediate aggregate Rows, merge all of them into
aggregate buffer,
+ * calculate aggregated values output by aggregate buffer, and set them
into output
+ * Row based on the mapping relation between intermediate aggregate
data and output data.
+ *
+ * @param records Grouped intermediate aggregate Rows iterator.
+ * @param out The collector to hand results to.
+ *
+ */
override def reduce(records: Iterable[Row], out: Collector[Row]): Unit =
{
- // Initiate intermediate aggregate value.
- aggregates.foreach(_.initiate(aggregateBuffer))
-
- // Merge intermediate aggregate value to buffer.
+ // merge intermediate aggregate value to buffer.
var last: Row = null
- records.foreach((record) => {
- aggregates.foreach(_.merge(record, aggregateBuffer))
+ val iterator = records.iterator()
+ val accumulatorList = Array.fill(aggregates.length) {
+ new JArrayList[Accumulator]()
+ }
+
+ while (iterator.hasNext) {
+ val record = iterator.next()
+ for (i <- aggregates.indices) {
+ accumulatorList(i).add(
--- End diff --
This materializes the whole group in Lists and will fail for large groups
(or in case of a non-grouped aggregate).
We need to change this to pairwise merges. The `List` should be reused.
> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
> Key: FLINK-5768
> URL: https://issues.apache.org/jira/browse/FLINK-5768
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use
> new aggregation functions.
> 3. Clean up unused class and method.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)