[
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973420#comment-15973420
]
ASF GitHub Bot commented on FLINK-6242:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r111995303
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
---
@@ -19,88 +19,71 @@
package org.apache.flink.table.runtime.aggregate
import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler,
GeneratedAggregationsFunction}
import org.apache.flink.types.Row
import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
/**
* [[RichGroupReduceFunction]] to compute the final result of a
pre-aggregated aggregation
* for batch (DataSet) queries.
*
- * @param aggregates The aggregate functions.
- * @param aggOutFields The positions of the aggregation results in the
output
+ * @param genAggregations Code-generated [[GeneratedAggregations]]
* @param gkeyOutFields The positions of the grouping keys in the output
* @param groupingSetsMapping The mapping of grouping set keys between
input and output positions.
- * @param finalRowArity The arity of the final resulting row
*/
class DataSetFinalAggFunction(
- private val aggregates: Array[AggregateFunction[_ <: Any]],
- private val aggOutFields: Array[Int],
+ private val genAggregations: GeneratedAggregationsFunction,
private val gkeyOutFields: Array[Int],
- private val groupingSetsMapping: Array[(Int, Int)],
- private val finalRowArity: Int)
- extends RichGroupReduceFunction[Row, Row] {
+ private val groupingSetsMapping: Array[(Int, Int)])
+ extends RichGroupReduceFunction[Row, Row]
+ with Compiler[GeneratedAggregations] {
- Preconditions.checkNotNull(aggregates)
- Preconditions.checkNotNull(aggOutFields)
Preconditions.checkNotNull(gkeyOutFields)
Preconditions.checkNotNull(groupingSetsMapping)
private var output: Row = _
+ private var accumulators: Row = _
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+ private var function: GeneratedAggregations = _
private val intermediateGKeys: Option[Array[Int]] = if
(!groupingSetsMapping.isEmpty) {
Some(gkeyOutFields)
} else {
None
}
- private val numAggs = aggregates.length
- private val numGKeys = gkeyOutFields.length
-
- private val accumulators: Array[JArrayList[Accumulator]] =
- Array.fill(numAggs)(new JArrayList[Accumulator](2))
-
override def open(config: Configuration) {
- output = new Row(finalRowArity)
-
- // init lists with two empty accumulators
- for (i <- aggregates.indices) {
- val accumulator = aggregates(i).createAccumulator()
- accumulators(i).add(accumulator)
- accumulators(i).add(accumulator)
- }
+ LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+ s"Code:\n$genAggregations.code")
+ val clazz = compile(
+ getClass.getClassLoader,
+ genAggregations.name,
+ genAggregations.code)
+ LOG.debug("Instantiating AggregateHelper.")
+ function = clazz.newInstance()
+
+ output = function.createOutputRow()
+ accumulators = function.createAccumulators()
}
override def reduce(records: Iterable[Row], out: Collector[Row]): Unit =
{
val iterator = records.iterator()
// reset first accumulator
- var i = 0
- while (i < aggregates.length) {
- aggregates(i).resetAccumulator(accumulators(i).get(0))
- i += 1
- }
+ function.resetAccumulator(accumulators)
+ var i = 0
while (iterator.hasNext) {
val record = iterator.next()
--- End diff --
we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of
the while loop and set the `output` fields after the loop has terminated.
> codeGen DataSet Goupingwindow Aggregates
> ----------------------------------------
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)