[
https://issues.apache.org/jira/browse/FLINK-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17058858#comment-17058858
]
Jark Wu commented on FLINK-16047:
---------------------------------
I will pick up this issue and submit a PR soon.
> Blink planner produces wrong aggregate results with state clean up
> ------------------------------------------------------------------
>
> Key: FLINK-16047
> URL: https://issues.apache.org/jira/browse/FLINK-16047
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.9.0
> Reporter: Timo Walther
> Priority: Critical
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
>
> It seems that FLINK-10674 has not been ported to the Blink planner.
> Because state clean up happens in processing time, it might be the case that
> retractions are arriving after the state has been cleaned up. Before these
> changes, a new accumulator was created and invalid retraction messages were
> emitted. This change drops retraction messages for which no accumulator
> exists.
> These lines are missing in
> {{org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction}}:
> {code}
> if (null == accumulators) {
> // Don't create a new accumulator for a retraction message. This
> // might happen if the retraction message is the first message for the
> // key or after a state clean up.
> if (!inputC.change) {
> return
> }
> // first accumulate message
> firstRow = true
> accumulators = function.createAccumulators()
> } else {
> firstRow = false
> }
> {code}
> The bug has not been verified. I spotted it only by looking at the code.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)