[
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973415#comment-15973415
]
ASF GitHub Bot commented on FLINK-6242:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r112002278
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
---
@@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
override def combine(records: Iterable[Row]): Row = {
// reset first accumulator
- var i = 0
- while (i < aggregates.length) {
- aggregates(i).resetAccumulator(accumulatorList(i).get(0))
- i += 1
- }
+ function.resetAccumulator(accumulators)
val iterator = records.iterator()
+
while (iterator.hasNext) {
val record = iterator.next()
- i = 0
- while (i < aggregates.length) {
- // insert received accumulator into acc list
- val newAcc = record.getField(groupingKeysLength +
i).asInstanceOf[Accumulator]
- accumulatorList(i).set(1, newAcc)
- // merge acc list
- val retAcc = aggregates(i).merge(accumulatorList(i))
- // insert result into acc list
- accumulatorList(i).set(0, retAcc)
- i += 1
- }
+ function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
// check if this record is the last record
if (!iterator.hasNext) {
--- End diff --
move this behind the loop
> codeGen DataSet Goupingwindow Aggregates
> ----------------------------------------
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Shaoxuan Wang
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)