[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439361#comment-17439361
 ] 

Timo Walther commented on FLINK-20370:
--------------------------------------

[~wenlong.lwl]

bq. If the input of sink actually has the same pk with sink, and is 
insert-only, I think there is no distribution disorder.

That is correct.

But let's assume the following schema:

{code}
CREATE TABLE A (uid INT, name STRING)

CREATE TABLE B (uid INT, name STRING, PRIMARY KEY(uid))

INSERT INTO B SELECT * FROM A;
{code}

In databases this query is totally fine because partitions don't exist. Please 
correct me if I'm wrong but in Flink it could cause issues, because we don't 
shuffle on uid here and different uid could end up in different Kafka 
partitions in the end.


> Result is wrong when sink primary key is not the same with query
> ----------------------------------------------------------------
>
>                 Key: FLINK-20370
>                 URL: https://issues.apache.org/jira/browse/FLINK-20370
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: Jark Wu
>            Assignee: lincoln lee
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>    +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>       :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>       :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>       :     +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>       :        +- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>       :           +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>       :              +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>       :                 +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>       :                    +- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>       :                       +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>       +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>          +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
>             +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>                +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>                   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to