[
https://issues.apache.org/jira/browse/FLINK-22781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351684#comment-17351684
]
Andy commented on FLINK-22781:
------------------------------
[~godfreyhe] Thanks for point out the drawback.
After I look deep into the problem, I found the output of.
`StreamExecChangelogNormalize` is changed when enable miniBatch.
When disable minibatch, the output of `StreamExecChangelogNormalize` for key
'Euro' is :
changelogRow("+U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)),
changelogRow("-U", "Euro", "no1", JLong.valueOf(114L), localDateTime(1L)),
changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L))
when enable minibatch, the output of `StreamExecChangelogNormalize` for key
'Euro' is :
changelogRow("+U", "Euro", "no1", JLong.valueOf(118L), localDateTime(6L))
the following window aggregate would send out a
'Euro,0,1970-01-01T00:00,1970-01-01T00:00:05' when disable minibatch while the
record would not appear when enbale minibatch.
*In conclude, the problem is caused by two reasons:*
*1. When enable minibatch, emit behavior is different for
`StreamExecChangelogNormalize`*
*2. WindowOperator should not emit data if the counter becomes to 0*
*because WindowOperator already stores the counter for per key, we could do a
simple fix without destroy state compatibility.*
It's my pleasure to fix the bug, please assign the issue to me.
> Incorrect result for group window aggregate when mini-batch is enabled
> ----------------------------------------------------------------------
>
> Key: FLINK-22781
> URL: https://issues.apache.org/jira/browse/FLINK-22781
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.14.0
> Reporter: godfrey he
> Priority: Critical
>
> We can reproduce this issue through adding the following code to
> {{GroupWindowITCase#testWindowAggregateOnUpsertSource}} method:
> {code:java}
> tEnv.getConfig.getConfiguration.setBoolean(
> ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
> tEnv.getConfig.getConfiguration.set(
> ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
> Duration.ofSeconds(1))
> tEnv.getConfig.getConfiguration.setLong(
> ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 10L)
> {code}
> The reason is the group window without any data (the data may be retracted)
> should not send any record.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)