ChangjiGuo created FLINK-25475:
----------------------------------
Summary: When windowAgg and groupAgg are included at the same
time, there is no assigner generated but MiniBatch optimization is still used.
Key: FLINK-25475
URL: https://issues.apache.org/jira/browse/FLINK-25475
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.14.0
Reporter: ChangjiGuo
Attachments: image-2021-12-29-16-04-50-211.png,
image-2021-12-29-16-05-15-519.png
If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule will
not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction or
MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will still be
generated when translated into transformation.
It will only judge whether to enable minibacth.
{code:java}
val operator = if (isMiniBatchEnabled) {
val aggFunction = new MiniBatchGroupAggFunction(
aggsHandler,
recordEqualiser,
accTypes,
inputRowType,
inputCountIndex,
generateUpdateBefore)
new KeyedMapBundleOperator(
aggFunction,
AggregateUtil.createMiniBatchTrigger(tableConfig))
} else {
val aggFunction = new GroupAggFunction(
tableConfig.getMinIdleStateRetentionTime,
tableConfig.getMaxIdleStateRetentionTime,
aggsHandler,
recordEqualiser,
accTypes,
inputCountIndex,
generateUpdateBefore)
val operator = new KeyedProcessOperator[RowData, RowData,
RowData](aggFunction)
operator
} {code}
for example:
before:
!image-2021-12-29-16-04-50-211.png!
after:
!image-2021-12-29-16-05-15-519.png!
The WatermarkAssigner will send watermark to downstream, and the finishBundle
method will be called frequently, which does not match the expected result.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)