[ 
https://issues.apache.org/jira/browse/FLINK-27275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fangliang Liu updated FLINK-27275:
----------------------------------
    Description: 
I use the following statement to create a flink job.

a field in the source data is null, but the field in mysql is not null, then 
the field will be overwritten by null after the record is written to the 
database. If I don't want this historical value to be overwritten by null, it 
is necessary to take out the historical value in the database in this task, and 
then update it to the database again, which is very costly for us.

So, i think the choice of whether to update to the database if a field is null 
can actually be left to the use.
{code:java}
 CREATE TABLE IF NOT EXISTS t_source (
    `user_id`  bigint,
    `A` string,
    `B` string,
    `C` string,
    `flag` varchar(256)
)WITH (
    'connector' = 'kafka',
    'format' = 'canal-json',
    'scan.startup.mode' = 'latest-offset',
    ... ...
);

CREATE TABLE IF NOT EXISTS t_sink (
    `user_id`  bigint,
    `A` string,
    `B` string,
    `C` string,
    `flag` varchar(256),
    PRIMARY KEY (`user_id`) NOT ENFORCED
)WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
    'table-name' = 'user',
    ... ...
 ); 

INSERT INTO t_sink(
    `user_id`,
    `A`,
    `B`,
    `C`,
    `flag`
) SELECT  `user_id`, `A`, `B`, `C`, `flag` FROM t_source;

{code}
 

  was:
I write data to mysql according to the following statement.
{code:java}
 CREATE TABLE IF NOT EXISTS t_source (
    `user_id`  bigint,
    `A` string,
    `B` string,
    `C` string,
    `flag` varchar(256)
)WITH (
    'connector' = 'kafka',
    'format' = 'canal-json',
    'scan.startup.mode' = 'latest-offset',
    ... ...
);

CREATE TABLE IF NOT EXISTS t_sink (
    `user_id`  bigint,
    `A` string,
    `B` string,
    `C` string,
    `flag` varchar(256),
    PRIMARY KEY (`user_id`) NOT ENFORCED
)WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
    'table-name' = 'user',
    ... ...
 ); 

INSERT INTO t_sink(
    `user_id`,
    `A`,
    `B`,
    `C`,
    `flag`
) SELECT  `user_id`, `A`, `B`, `C`, `flag` FROM t_source;

{code}
 


> Support null value not update in flink-connector-jdbc
> -----------------------------------------------------
>
>                 Key: FLINK-27275
>                 URL: https://issues.apache.org/jira/browse/FLINK-27275
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / JDBC
>    Affects Versions: 1.14.3
>            Reporter: Fangliang Liu
>            Priority: Major
>
> I use the following statement to create a flink job.
> a field in the source data is null, but the field in mysql is not null, then 
> the field will be overwritten by null after the record is written to the 
> database. If I don't want this historical value to be overwritten by null, it 
> is necessary to take out the historical value in the database in this task, 
> and then update it to the database again, which is very costly for us.
> So, i think the choice of whether to update to the database if a field is 
> null can actually be left to the use.
> {code:java}
>  CREATE TABLE IF NOT EXISTS t_source (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256)
> )WITH (
>     'connector' = 'kafka',
>     'format' = 'canal-json',
>     'scan.startup.mode' = 'latest-offset',
>     ... ...
> );
> CREATE TABLE IF NOT EXISTS t_sink (
>     `user_id`  bigint,
>     `A` string,
>     `B` string,
>     `C` string,
>     `flag` varchar(256),
>     PRIMARY KEY (`user_id`) NOT ENFORCED
> )WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://xx.xx.xx.xx:xxx/test',
>     'table-name' = 'user',
>     ... ...
>  ); 
> INSERT INTO t_sink(
>     `user_id`,
>     `A`,
>     `B`,
>     `C`,
>     `flag`
> ) SELECT  `user_id`, `A`, `B`, `C`, `flag` FROM t_source;
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to