[ 
https://issues.apache.org/jira/browse/FLINK-23663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-23663:
---------------------------------
    Description: 
{{ChangelogNormalize}} is an expensive stateful operation as it stores data for 
each key. 

Filters are generally not pushed through a ChangelogNormalize node which means 
that users have no possibility to at least limit the key space. Pushing filters 
like {{a < 10}} into a source like {{upsert-kafka}} that is emitting {{+I[key1, 
a=9]}} and {{-D[key1, a=10]}}, is problematic as the deletion will be filtered 
and leads to wrong results. But limiting the filter push down to key space 
should be safe.

Furthermore, it seems the current implementation is also wrong as it pushes any 
kind of filter through {{ChangelogNormalize}} but only if the source implements 
filter push down.

  was:
{{ChangelogNormalize}} is an expensive stateful operation as it stores data for 
each key. 

Filters are generally not pushed through a ChangelogNormalize node which means 
that users have no possibility to at least limit the key space. Pushing filters 
like {{a < 10}} into a source like {{upsert-kafka}} that is emitting {{+I[key1, 
a=9]}} and {{-D[key1, a=10]}}, is problematic as the deletion will be filtered 
and leads to wrong results. But limiting the filter push down to key space 
should be safe.

Furthermore, it seems the current implementation is also wrong as it pushes 
filters through {{ChangelogNormalize}} but only if the source implements filter 
push down.


> Reduce state size in ChangelogNormalize through filter push down
> ----------------------------------------------------------------
>
>                 Key: FLINK-23663
>                 URL: https://issues.apache.org/jira/browse/FLINK-23663
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Timo Walther
>            Priority: Major
>
> {{ChangelogNormalize}} is an expensive stateful operation as it stores data 
> for each key. 
> Filters are generally not pushed through a ChangelogNormalize node which 
> means that users have no possibility to at least limit the key space. Pushing 
> filters like {{a < 10}} into a source like {{upsert-kafka}} that is emitting 
> {{+I[key1, a=9]}} and {{-D[key1, a=10]}}, is problematic as the deletion will 
> be filtered and leads to wrong results. But limiting the filter push down to 
> key space should be safe.
> Furthermore, it seems the current implementation is also wrong as it pushes 
> any kind of filter through {{ChangelogNormalize}} but only if the source 
> implements filter push down.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to