[ https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846258#comment-17846258 ]
JJJJude edited comment on FLINK-29625 at 5/14/24 10:21 AM: ----------------------------------------------------------- Hi [~jiabao.sun], I encountered the same problem, see discuss: [https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522|http://example.com/] I hope that the optimization of *_ChangelogNormalize_* can be disabled by option, because in the CDC multi-stream join scenario, *_UPDATE_BEFORE(-U)_* will emit *-D/+I* after passing through the _*JOIN*_ operator and be passed to the downstream. *-D* will it to be used as a dimension table. Associated to the middle state _*NULL*_ was (Author: wczhu): Hi [~jiabao.sun], I encountered the same problem, see discuss: [https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522|http://example.com/] I hope that the optimization of ??ChangelogNormalize?? can be disabled by option, because in the CDC multi-stream join scenario, ??UPDATE_BEFORE??(-U) will emit *-D/+I* after passing through the ??JOIN?? operator and be passed to the downstream. *-D* will it to be used as a dimension table. Associated to the middle state ??NULL?? > Optimize changelog normalize for upsert source > ---------------------------------------------- > > Key: FLINK-29625 > URL: https://issues.apache.org/jira/browse/FLINK-29625 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Affects Versions: 1.15.3 > Reporter: jiabao.sun > Priority: Major > > Currently, Flink will add an expensive operator _changelog normalize_ to the > source of the upsert changelog mode to complete the _update_before_ value. > Even inserting directly from upsert-kafka source to upsert-kafka sink will > still add this operator, and there is an extra operator to clear > _upsert_before_ messages, which is obviously redundant. > In CDC scenarios, some databases do not provide update before images, such as > Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres > ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process > these changelog will have a lot of state overhead. > I don't know much about why this operator is needed, so I take the liberty to > ask if we can get rid of changelog normalize completely or optimistic about > it, adding it only if a normalized changelog is required by a downstream > operator. > If this optimization is worthwhile, I'm happy to help with it. -- This message was sent by Atlassian Jira (v8.20.10#820010)