[
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973423#comment-15973423
]
ASF GitHub Bot commented on FLINK-6242:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r112002140
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
*
* It is used for sliding on batch for both time and count-windows.
*
- * @param aggregates aggregate functions.
- * @param groupKeysMapping index mapping of group keys between
intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping index mapping between aggregate function list
and aggregated value
- * index in output Row.
- * @param finalRowArity output row field count
+ * @param genAggregations Code-generated [[GeneratedAggregations]]
+ * @param keysAndAggregatesArity The total arity of keys and aggregates
* @param finalRowWindowStartPos relative window-start position to last
field of output row
* @param finalRowWindowEndPos relative window-end position to last field
of output row
* @param windowSize size of the window, used to determine window-end for
output row
*/
class DataSetSlideWindowAggReduceCombineFunction(
- aggregates: Array[AggregateFunction[_ <: Any]],
- groupKeysMapping: Array[(Int, Int)],
- aggregateMapping: Array[(Int, Int)],
- finalRowArity: Int,
+ genAggregations: GeneratedAggregationsFunction,
+ keysAndAggregatesArity: Int,
finalRowWindowStartPos: Option[Int],
finalRowWindowEndPos: Option[Int],
windowSize: Long)
extends DataSetSlideWindowAggReduceGroupFunction(
- aggregates,
- groupKeysMapping,
- aggregateMapping,
- finalRowArity,
+ genAggregations,
+ keysAndAggregatesArity,
finalRowWindowStartPos,
finalRowWindowEndPos,
windowSize)
with CombineFunction[Row, Row] {
- private val intermediateRowArity: Int = groupKeysMapping.length +
aggregateMapping.length + 1
- private val intermediateRow: Row = new Row(intermediateRowArity)
+ private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
override def combine(records: Iterable[Row]): Row = {
- // reset first accumulator
- var i = 0
- while (i < aggregates.length) {
- aggregates(i).resetAccumulator(accumulatorList(i).get(0))
- i += 1
- }
+ // reset accumulator
+ function.resetAccumulator(accumulators)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
--- End diff --
make `record` a `var` and declare it outside of the loop.
> codeGen DataSet Goupingwindow Aggregates
> ----------------------------------------
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)