[
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888219#comment-15888219
]
ASF GitHub Bot commented on FLINK-5768:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3423#discussion_r103472747
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala
---
@@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
override def open(config: Configuration) {
Preconditions.checkNotNull(aggregates)
Preconditions.checkNotNull(groupKeysMapping)
- aggregateBuffer = new Row(intermediateRowArity)
+ aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
output = new Row(finalRowArity)
if (!groupingSetsMapping.isEmpty) {
intermediateGroupKeys = Some(groupKeysMapping.map(_._1))
}
}
/**
- * For grouped intermediate aggregate Rows, merge all of them into
aggregate buffer,
- * calculate aggregated values output by aggregate buffer, and set them
into output
- * Row based on the mapping relation between intermediate aggregate data
and output data.
- *
- * @param records Grouped intermediate aggregate Rows iterator.
- * @param out The collector to hand results to.
- *
- */
+ * For grouped intermediate aggregate Rows, merge all of them into
aggregate buffer,
+ * calculate aggregated values output by aggregate buffer, and set them
into output
+ * Row based on the mapping relation between intermediate aggregate
data and output data.
+ *
+ * @param records Grouped intermediate aggregate Rows iterator.
+ * @param out The collector to hand results to.
+ *
+ */
override def reduce(records: Iterable[Row], out: Collector[Row]): Unit =
{
- // Initiate intermediate aggregate value.
- aggregates.foreach(_.initiate(aggregateBuffer))
-
- // Merge intermediate aggregate value to buffer.
+ // merge intermediate aggregate value to buffer.
var last: Row = null
- records.foreach((record) => {
- aggregates.foreach(_.merge(record, aggregateBuffer))
+ val iterator = records.iterator()
+ val accumulatorList = Array.fill(aggregates.length) {
+ new JArrayList[Accumulator]()
+ }
+
+ while (iterator.hasNext) {
+ val record = iterator.next()
+ for (i <- aggregates.indices) {
+ accumulatorList(i).add(
--- End diff --
I'm wondering whether it would be better to have no preparing mapper and
instead have two separate path for combineable and non-combineable.
- Combinable: `input -> groupCombine() -> groupReduce()` where
`groupCombine` uses `accumulate()` and `groupReduce` pairwise merges.
- Non-Combinable: `input -> groupReduce()` where `groupReduce` uses
`accumulate()`
The combinable `groupCombine` and non-combinable `groupReduce` might even
use the same code (except that `groupReduce` needs to call `getValue()` in the
end).
> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
> Key: FLINK-5768
> URL: https://issues.apache.org/jira/browse/FLINK-5768
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use
> new aggregation functions.
> 3. Clean up unused class and method.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)