[
https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846258#comment-17846258
]
JJJJude edited comment on FLINK-29625 at 5/14/24 10:20 AM:
-----------------------------------------------------------
Hi [~jiabao.sun], I encountered the same problem, see discuss:
[https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522|http://example.com/]
I hope that the optimization of ??ChangelogNormalize?? can be disabled by
option, because in the CDC multi-stream join scenario, ??UPDATE_BEFORE??(-U)
will emit *-D/+I* after passing through the ??JOIN?? operator and be passed to
the downstream. *-D* will it to be used as a dimension table. Associated to the
middle state ??NULL??
was (Author: wczhu):
Hi [~jiabao.sun], I encountered the same problem, see discuss:
[https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522|http://example.com]
I hope that the optimization of `ChangelogNormalize` can be disabled by option,
because in the CDC multi-stream join scenario, `UPDATE_BEFORE`(-U) will emit
-D/+I after passing through the `JOIN` operator and be passed to the
downstream. `-D` will it to be used as a dimension table. Associated to the
middle state `NULL`
> Optimize changelog normalize for upsert source
> ----------------------------------------------
>
> Key: FLINK-29625
> URL: https://issues.apache.org/jira/browse/FLINK-29625
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.15.3
> Reporter: jiabao.sun
> Priority: Major
>
> Currently, Flink will add an expensive operator _changelog normalize_ to the
> source of the upsert changelog mode to complete the _update_before_ value.
> Even inserting directly from upsert-kafka source to upsert-kafka sink will
> still add this operator, and there is an extra operator to clear
> _upsert_before_ messages, which is obviously redundant.
> In CDC scenarios, some databases do not provide update before images, such as
> Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres
> ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process
> these changelog will have a lot of state overhead.
> I don't know much about why this operator is needed, so I take the liberty to
> ask if we can get rid of changelog normalize completely or optimistic about
> it, adding it only if a normalized changelog is required by a downstream
> operator.
> If this optimization is worthwhile, I'm happy to help with it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)