[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440150#comment-17440150
 ] 

Jingsong Lee commented on FLINK-20370:
--------------------------------------

[~twalthr] Kafka partitioning should be another matter, depending on the user's 
connector option, for example, using upsert-kafka or `sink.partitioner`.
In my opinion, if the parallelism changes, such as the parallelism of source 
and sink is different, the final result will be random, which may not meet the 
user's expectations. So the premise is to change parallelism.

> Result is wrong when sink primary key is not the same with query
> ----------------------------------------------------------------
>
>                 Key: FLINK-20370
>                 URL: https://issues.apache.org/jira/browse/FLINK-20370
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: Jark Wu
>            Assignee: lincoln lee
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>    +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>       :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>       :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>       :     +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>       :        +- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>       :           +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>       :              +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>       :                 +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>       :                    +- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>       :                       +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>       +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>          +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
>             +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>                +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>                   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to