[
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440534#comment-17440534
]
Timo Walther commented on FLINK-20370:
--------------------------------------
[~wenlong.lwl] I guess for Kafka you are right. Kafka is doing the partitioning
for us already with the default sink partitioner and given Kafka key. The only
missing case is if you do a {{StreamTableEnvironment.toChangelogStream}} with a
declared primary key, the user needs to take case of the partitioning in this
case. But that might be acceptable.
> Result is wrong when sink primary key is not the same with query
> ----------------------------------------------------------------
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.0
> Reporter: Jark Wu
> Assignee: lincoln lee
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL
> tables (source_city, source_customer). The sink is another MySQL table which
> is in upsert mode with "city_name" primary key. The join key is "city_id".
> In this case, the result will be wrong when updating
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored
> and the old city_name is retained in the sink table.
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city],
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer,
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
> +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id,
> count_customer, sum_gender, id, city_name],
> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
> :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
> : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id,
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS
> sum_gender], changelogMode=[I,UA,D])
> : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
> : +- LocalGroupAggregate(groupBy=[city_id], select=[city_id,
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)],
> changelogMode=[I])
> : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
> : +- ChangelogNormalize(key=[customer_id],
> changelogMode=[I,UB,UA,D])
> : +- Exchange(distribution=[hash[customer_id]],
> changelogMode=[UA,D])
> : +- MiniBatchAssigner(interval=[3000ms],
> mode=[ProcTime], changelogMode=[UA,D])
> : +- TableSourceScan(table=[[default_catalog,
> default_database, source_customer]], fields=[customer_id, city_id, age,
> gender, update_time], changelogMode=[UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
> +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
> +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime],
> changelogMode=[UA,D])
> +- TableSourceScan(table=[[default_catalog,
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key
> on sink in the documentation:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
> We should make this attention to be more highlight in CREATE TABLE page.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)