[ https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482183#comment-17482183 ]
ChangjiGuo commented on FLINK-25475: ------------------------------------ Hi, [~xuyangzhong]. Thanks for your reply! Similar to this kind of sql: {code:sql} SELECT b, SUM(cnt) FROM ( SELECT b, COUNT(a) as cnt, HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start, HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end FROM wmTable1 GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) ) GROUP BY b {code} > When windowAgg and groupAgg are included at the same time, there is no > assigner generated but MiniBatch optimization is still used. > ----------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-25475 > URL: https://issues.apache.org/jira/browse/FLINK-25475 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.14.2 > Reporter: ChangjiGuo > Priority: Major > Attachments: image-2021-12-29-16-04-50-211.png, > image-2021-12-29-16-05-15-519.png > > > If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule > will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction > or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will > still be generated when translated into transformation. > It will only judge whether to enable minibacth. > {code:java} > val operator = if (isMiniBatchEnabled) { > val aggFunction = new MiniBatchGroupAggFunction( > aggsHandler, > recordEqualiser, > accTypes, > inputRowType, > inputCountIndex, > generateUpdateBefore) > new KeyedMapBundleOperator( > aggFunction, > AggregateUtil.createMiniBatchTrigger(tableConfig)) > } else { > val aggFunction = new GroupAggFunction( > tableConfig.getMinIdleStateRetentionTime, > tableConfig.getMaxIdleStateRetentionTime, > aggsHandler, > recordEqualiser, > accTypes, > inputCountIndex, > generateUpdateBefore) > val operator = new KeyedProcessOperator[RowData, RowData, > RowData](aggFunction) > operator > } {code} > for example: > before: > !image-2021-12-29-16-04-50-211.png! > after: > !image-2021-12-29-16-05-15-519.png! > The WatermarkAssigner will send watermark to downstream, and the finishBundle > method will be called frequently, which does not match the expected result. -- This message was sent by Atlassian Jira (v8.20.1#820001)