Timo Walther created FLINK-23663:
------------------------------------
Summary: Reduce state size in ChangelogNormalize through filter
push down
Key: FLINK-23663
URL: https://issues.apache.org/jira/browse/FLINK-23663
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Timo Walther
{{ChangelogNormalize}} is an expensive stateful operation as it stores data for
each key.
Filters are generally not pushed through a ChangelogNormalize node which means
that users have no possibility to at least limit the key space. Pushing filters
like {{a < 10}} into a source like {{upsert-kafka}} that is emitting {{+I[key1,
a=9]}} and {{-D[key1, a=10]}}, is problematic as the deletion will be filtered
and leads to wrong results. But limiting the filter push down to key space
should be safe.
Furthermore, it seems the current implementation is also wrong as it pushes
filters through {{ChangelogNormalize}} but only if the source implements filter
push down.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)