[
https://issues.apache.org/jira/browse/FLINK-27275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528025#comment-17528025
]
Fangliang Liu edited comment on FLINK-27275 at 5/6/22 6:46 AM:
---------------------------------------------------------------
Now the jdbc connector completes the update operation through a combination of
INSERT... ON DUPLICATE KEY UPDATE syntax && PRIMARY KEY && VALUES syntax. I
think COALESCE syntax can be added on this basis. At the same time, add a
replaceByNULL(default:ture) option in with section of DDL. I think this is a
good solution for mysql.
[~martijnvisser], [~fsk119], [~jark] Looking forward to your reply.
was (Author: liufangliang):
Now the jdbc connector completes the update operation through a combination of
INSERT... ON DUPLICATE KEY UPDATE syntax && PRIVATE key && VALUES syntax. I
think COALESCE syntax can be added on this basis. At the same time, add a
replaceByNULL(default:ture) option in with section of DDL. I think this is a
good solution for mysql.
[~martijnvisser], [~fsk119], [~jark] Looking forward to your reply.
> Support partial insert in flink-connector-jdbc
> ----------------------------------------------
>
> Key: FLINK-27275
> URL: https://issues.apache.org/jira/browse/FLINK-27275
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / JDBC
> Affects Versions: 1.14.3
> Reporter: Fangliang Liu
> Priority: Major
>
> I use the following statement to create a flink job.
> a field in the source data is null, but the field in mysql is not null, then
> the field will be overwritten by null after the record is written to the
> database. If I don't want this historical value to be overwritten by null, it
> is necessary to take out the historical value in the database in this task,
> and then update it to the database again, which is very costly for us.
> So, i think the choice of whether to update to the database if a field is
> null can actually be left to the use.
> {code:java}
> CREATE TABLE IF NOT EXISTS t_source (
> `user_id` bigint,
> `A` string,
> `B` string,
> `C` string,
> `flag` varchar(256)
> )WITH (
> 'connector' = 'kafka',
> 'format' = 'canal-json',
> 'scan.startup.mode' = 'latest-offset',
> ... ...
> );
> CREATE TABLE IF NOT EXISTS t_sink (
> `user_id` bigint,
> `A` string,
> `B` string,
> `C` string,
> `flag` varchar(256),
> PRIMARY KEY (`user_id`) NOT ENFORCED
> )WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
> 'table-name' = 'user',
> ... ...
> );
> INSERT INTO t_sink(
> `user_id`,
> `A`,
> `B`,
> `C`,
> `flag`
> ) SELECT `user_id`, `A`, `B`, `C`, `flag` FROM t_source;
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)