[ 
https://issues.apache.org/jira/browse/FLINK-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-16047:
---------------------------------
    Description: 
It seems that FLINK-10674 has not been ported to the Blink planner.

Because state clean up happens in processing time, it might be the case that 
retractions are arriving after the state has been cleaned up. Before these 
changes, a new accumulator was created and invalid retraction messages were 
emitted. This change drops retraction messages for which no accumulator exists.

These lines are missing in 
{{org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction}}:
{code}
if (null == accumulators) {
      // Don't create a new accumulator for a retraction message. This
      // might happen if the retraction message is the first message for the
      // key or after a state clean up.
      if (!inputC.change) {
        return
      }
      // first accumulate message
      firstRow = true
      accumulators = function.createAccumulators()
    } else {
      firstRow = false
    }
{code}

The bug has not been verified. I spotted it only by looking at the code.

  was:
It seems that FLINK-10674 has not been ported to the Blink planner.

Because state clean up happens in processing time, it might be the case that 
retractions are arriving after the state has been cleaned up. Before these 
changes, a new accumulator was created and invalid retraction messages were 
emitted. This change drops retraction messages for which no accumulator exists.

These lines are missing:
{code}
if (null == accumulators) {
      // Don't create a new accumulator for a retraction message. This
      // might happen if the retraction message is the first message for the
      // key or after a state clean up.
      if (!inputC.change) {
        return
      }
      // first accumulate message
      firstRow = true
      accumulators = function.createAccumulators()
    } else {
      firstRow = false
    }
{code}

The bug has not been verified. I spotted it only by looking at the code.


> Blink planner produces wrong aggregate results with state clean up
> ------------------------------------------------------------------
>
>                 Key: FLINK-16047
>                 URL: https://issues.apache.org/jira/browse/FLINK-16047
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.0
>            Reporter: Timo Walther
>            Priority: Critical
>
> It seems that FLINK-10674 has not been ported to the Blink planner.
> Because state clean up happens in processing time, it might be the case that 
> retractions are arriving after the state has been cleaned up. Before these 
> changes, a new accumulator was created and invalid retraction messages were 
> emitted. This change drops retraction messages for which no accumulator 
> exists.
> These lines are missing in 
> {{org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction}}:
> {code}
> if (null == accumulators) {
>       // Don't create a new accumulator for a retraction message. This
>       // might happen if the retraction message is the first message for the
>       // key or after a state clean up.
>       if (!inputC.change) {
>         return
>       }
>       // first accumulate message
>       firstRow = true
>       accumulators = function.createAccumulators()
>     } else {
>       firstRow = false
>     }
> {code}
> The bug has not been verified. I spotted it only by looking at the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to