Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r112002140
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
*
* It is used for sliding on batch for both time and count-windows.
*
- * @param aggregates aggregate functions.
- * @param groupKeysMapping index mapping of group keys between
intermediate aggregate Row
- * and output Row.
- * @param aggregateMapping index mapping between aggregate function list
and aggregated value
- * index in output Row.
- * @param finalRowArity output row field count
+ * @param genAggregations Code-generated [[GeneratedAggregations]]
+ * @param keysAndAggregatesArity The total arity of keys and aggregates
* @param finalRowWindowStartPos relative window-start position to last
field of output row
* @param finalRowWindowEndPos relative window-end position to last field
of output row
* @param windowSize size of the window, used to determine window-end for
output row
*/
class DataSetSlideWindowAggReduceCombineFunction(
- aggregates: Array[AggregateFunction[_ <: Any]],
- groupKeysMapping: Array[(Int, Int)],
- aggregateMapping: Array[(Int, Int)],
- finalRowArity: Int,
+ genAggregations: GeneratedAggregationsFunction,
+ keysAndAggregatesArity: Int,
finalRowWindowStartPos: Option[Int],
finalRowWindowEndPos: Option[Int],
windowSize: Long)
extends DataSetSlideWindowAggReduceGroupFunction(
- aggregates,
- groupKeysMapping,
- aggregateMapping,
- finalRowArity,
+ genAggregations,
+ keysAndAggregatesArity,
finalRowWindowStartPos,
finalRowWindowEndPos,
windowSize)
with CombineFunction[Row, Row] {
- private val intermediateRowArity: Int = groupKeysMapping.length +
aggregateMapping.length + 1
- private val intermediateRow: Row = new Row(intermediateRowArity)
+ private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
override def combine(records: Iterable[Row]): Row = {
- // reset first accumulator
- var i = 0
- while (i < aggregates.length) {
- aggregates(i).resetAccumulator(accumulatorList(i).get(0))
- i += 1
- }
+ // reset accumulator
+ function.resetAccumulator(accumulators)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
--- End diff --
make `record` a `var` and declare it outside of the loop.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---