[ 
https://issues.apache.org/jira/browse/FLINK-27275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528025#comment-17528025
 ] 

Fangliang Liu edited comment on FLINK-27275 at 5/6/22 6:46 AM:
---------------------------------------------------------------

Now the jdbc connector completes the update operation through a combination of 
INSERT... ON DUPLICATE KEY UPDATE syntax && PRIMARY KEY && VALUES syntax. I 
think COALESCE syntax can be added on this basis. At the same time, add a 
replaceByNULL(default:ture) option in with section of DDL. I think this is a 
good solution for mysql.

[~martijnvisser], [~fsk119], [~jark]  Looking forward to your reply.


was (Author: liufangliang):
Now the jdbc connector completes the update operation through a combination of 
INSERT... ON DUPLICATE KEY UPDATE syntax && PRIVATE key && VALUES syntax. I 
think COALESCE syntax can be added on this basis. At the same time, add a 
replaceByNULL(default:ture) option in with section of DDL. I think this is a 
good solution for mysql.

[~martijnvisser], [~fsk119], [~jark]  Looking forward to your reply.

> Support partial insert in flink-connector-jdbc
> ----------------------------------------------
>
>                 Key: FLINK-27275
>                 URL: https://issues.apache.org/jira/browse/FLINK-27275
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / JDBC
>    Affects Versions: 1.14.3
>            Reporter: Fangliang Liu
>            Priority: Major
>
> I use the following statement to create a flink job.
> a field in the source data is null, but the field in mysql is not null, then 
> the field will be overwritten by null after the record is written to the 
> database. If I don't want this historical value to be overwritten by null, it 
> is necessary to take out the historical value in the database in this task, 
> and then update it to the database again, which is very costly for us.
> So, i think the choice of whether to update to the database if a field is 
> null can actually be left to the use.
> {code:java}
>  CREATE TABLE IF NOT EXISTS t_source (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256)
> )WITH (
>     'connector' = 'kafka',
>     'format' = 'canal-json',
>     'scan.startup.mode' = 'latest-offset',
>     ... ...
> );
> CREATE TABLE IF NOT EXISTS t_sink (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256),
>     PRIMARY KEY (`user_id`) NOT ENFORCED
> )WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
>     'table-name' = 'user',
>     ... ...
>  ); 
> INSERT INTO t_sink(
>     `user_id`,
>     `A`,
>     `B`,
>     `C`,
>     `flag`
> ) SELECT  `user_id`, `A`, `B`, `C`, `flag` FROM t_source;
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to