[
https://issues.apache.org/jira/browse/FLINK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616923#comment-17616923
]
Jiabao Sun commented on FLINK-29625:
------------------------------------
Thanks [~martijnvisser] for the reply.
At present, this operator is not only added when deduplicating the changelog,
but also for all upsert sources.
[https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala#L71-L74]
{code:java}
if (
isUpsertSource(resolvedSchema, table.tableSource) ||
isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource,
tableConfig)
) {
// generate changelog normalize node
{code}
Is this operator necessary for an exactly-once upsert stream? Like:
{code:sql}
insert into upsert_kafka_sink from select * from upsert_kafka_source;
{code}
> Optimize changelog normalize
> ----------------------------
>
> Key: FLINK-29625
> URL: https://issues.apache.org/jira/browse/FLINK-29625
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.15.2
> Reporter: Jiabao Sun
> Priority: Major
>
> Currently, Flink will add an expensive operator _changelog normalize_ to the
> source of the upsert changelog mode to complete the _update_before_ value.
> Even inserting directly from upsert-kafka source to upsert-kafka sink will
> still add this operator, and there is an extra operator to clear
> _upsert_before_ messages, which is obviously redundant.
> In CDC scenarios, some databases do not provide update before images, such as
> Cassandra、MongoDB、TiDB({_}Old Value{_} is not turned on) and Postgres
> ({_}REPLICA IDENTITY{_} is not set to {_}FULL{_}). Using Flink SQL to process
> these changelog will have a lot of state overhead.
> I don't know much about why this operator is needed, so I take the liberty to
> ask if we can get rid of changelog normalize completely or optimistic about
> it, adding it only if a normalized changelog is required by an after operator.
> If this optimization is worthwhile, I'm happy to help with it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)