[ 
https://issues.apache.org/jira/browse/FLINK-27275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17523495#comment-17523495
 ] 

Fangliang Liu edited comment on FLINK-27275 at 4/21/22 7:50 AM:
----------------------------------------------------------------

Hi [~fsk119], I will  start to get familiar the behaviour in other databases 
with partial insert.


was (Author: liufangliang):
Hi [~fsk119], I will  start to get familiar the behaviour in other databases 
with partial insert. assign this issue to me, please.

> Support partial insert in flink-connector-jdbc
> ----------------------------------------------
>
>                 Key: FLINK-27275
>                 URL: https://issues.apache.org/jira/browse/FLINK-27275
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / JDBC
>    Affects Versions: 1.14.3
>            Reporter: Fangliang Liu
>            Priority: Major
>
> I use the following statement to create a flink job.
> a field in the source data is null, but the field in mysql is not null, then 
> the field will be overwritten by null after the record is written to the 
> database. If I don't want this historical value to be overwritten by null, it 
> is necessary to take out the historical value in the database in this task, 
> and then update it to the database again, which is very costly for us.
> So, i think the choice of whether to update to the database if a field is 
> null can actually be left to the use.
> {code:java}
>  CREATE TABLE IF NOT EXISTS t_source (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256)
> )WITH (
>     'connector' = 'kafka',
>     'format' = 'canal-json',
>     'scan.startup.mode' = 'latest-offset',
>     ... ...
> );
> CREATE TABLE IF NOT EXISTS t_sink (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256),
>     PRIMARY KEY (`user_id`) NOT ENFORCED
> )WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
>     'table-name' = 'user',
>     ... ...
>  ); 
> INSERT INTO t_sink(
>     `user_id`,
>     `A`,
>     `B`,
>     `C`,
>     `flag`
> ) SELECT  `user_id`, `A`, `B`, `C`, `flag` FROM t_source;
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to