[ https://issues.apache.org/jira/browse/FLINK-5433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814399#comment-15814399 ]
Shaoxuan Wang commented on FLINK-5433: -------------------------------------- It seems initiate function has only been used in AggregateReduceCombineFunction and AggregateReduceGroupFunction. DataStream with Incremental Aggregation will not be able to get a chance to execute initiate. > initiate function of Aggregate does not take effect for DataStream aggregation > ------------------------------------------------------------------------------ > > Key: FLINK-5433 > URL: https://issues.apache.org/jira/browse/FLINK-5433 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Shaoxuan Wang > > The initiate function of Aggregate works for dataset aggregation, but does > not work for DataStream aggregation. > For instance, when giving an initial value, say 2, for CountAggregate. The > result of dataset aggregate will take this change into account, but > dataStream aggregate will not. > {code} > class CountAggregate extends Aggregate[Long] { > override def initiate(intermediate: Row): Unit = { > intermediate.setField(countIndex, 2L) > } > } > {code} > The output for dataset test(testWorkingAggregationDataTypes) will result in > .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) > expected: [1,1,1,1,1.5,1.5,2] > received: [1,1,1,1,1.5,1.5,4] (the result of last count aggregate is bigger > than expect value by 2, as expected) > But the output for datastream > test(testProcessingTimeSlidingGroupWindowOverCount) will remain the same: > .select('string, 'int.count, 'int.avg) > Expected :List(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, Hi,1,1) > Actual :MutableList(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, > Hi,1,1) -- This message was sent by Atlassian JIRA (v6.3.4#6332)