[
https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu Wang updated FLINK-21430:
----------------------------
Description:
kafka data:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
06:39:05.088"}
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
06:47:34.609"}
kafka ddl :
CREATE TABLE washroom_detail (
building_id STRING,
sofa_id STRING,
floor_num INT,
occupy_status INT,
start_time BIGINT,
end_time BIGINT,
process_time TIMESTAMP,
occupy_times as concat(date_format(TIMESTAMPADD(hour, 8,
cast(start_time / 1000 as timestamp)), 'HH:mm'), '-',
date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)),
'HH:mm')),
local_date as date_format(cast(start_time / 1000 as
timestamp), 'yyyy-MM-dd'),
day_hour as cast(date_format(cast(start_time / 1000 as
timestamp), 'HH') as INT) + 8
) WITH (
'connector' = 'kafka',
'topic' = 'xxxxxxxx',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx',
'properties.group.id' = 'xxxxxxxxxxxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
mysql ddl:
create table hour_ddl
(
building_id STRING,
sofa_id STRING,
local_date STRING,
`hour` INT,
floor_num INT,
occupy_frequency INT,
occupy_times STRING,
update_time TIMESTAMP,
process_time TIMESTAMP,
primary key (building_id, sofa_id, local_date, `hour`) NOT
ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'xxxxxxxx',
'table-name' = 'xxxxxxxx',
'username' = 'xxxxx'
'password' = 'xxxxxx'
)
flink sql dml:
INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num,
occupy_frequency, occupy_times, update_time, process_time)
SELECT a.building_id,
a.sofa_id,
a.local_date,
a.day_hour,
a.floor_num,
CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency)
AS INT),
concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times
IS NULL, a.times, concat(',', a.times))),
NOW(),
a.process_time
FROM
(SELECT building_id,
sofa_id,
local_date,
day_hour,
floor_num,
count(1) AS frequency,
LISTAGG(occupy_times) AS times,
MAX(process_time) AS process_time,
PROCTIME() AS compute_time
FROM washroom_detail
GROUP BY building_id,
sofa_id,
local_date,
day_hour,
floor_num) a
LEFT JOIN hour_ddl
FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
AND a.sofa_id = b.sofa_id
AND a.local_date = b.local_date
AND a.day_hour = b.`hour`
WHERE a.process_time > b.process_time
OR b.process_time IS NULL
appearance:
when mysql has not this record,insert this record:
occupy_frequency occupy_times
1 15:01-15:03
when key conflict , upsert this record:
occupy_frequency occupy_times
3 15:01-15:03,15:01-15:03,15:03-15:04
result should be the following record:
{color:red}occupy_frequency occupy_times
2 15:01-15:03,15:03-15:04{color}
was:
kafka 数据格式:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
06:39:05.088"}
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
06:47:34.609"}
kafka ddl :
CREATE TABLE washroom_detail (
building_id STRING,
sofa_id STRING,
floor_num INT,
occupy_status INT,
start_time BIGINT,
end_time BIGINT,
process_time TIMESTAMP,
occupy_times as concat(date_format(TIMESTAMPADD(hour, 8,
cast(start_time / 1000 as timestamp)), 'HH:mm'), '-',
date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)),
'HH:mm')),
local_date as date_format(cast(start_time / 1000 as
timestamp), 'yyyy-MM-dd'),
day_hour as cast(date_format(cast(start_time / 1000 as
timestamp), 'HH') as INT) + 8
) WITH (
'connector' = 'kafka',
'topic' = 'xxxxxxxx',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx',
'properties.group.id' = 'xxxxxxxxxxxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
mysql ddl:
create table hour_ddl
(
building_id STRING,
sofa_id STRING,
local_date STRING,
`hour` INT,
floor_num INT,
occupy_frequency INT,
occupy_times STRING,
update_time TIMESTAMP,
process_time TIMESTAMP,
primary key (building_id, sofa_id, local_date, `hour`) NOT
ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'xxxxxxxx',
'table-name' = 'xxxxxxxx',
'username' = 'xxxxx'
'password' = 'xxxxxx'
)
flink sql dml:
INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num,
occupy_frequency, occupy_times, update_time, process_time)
SELECT a.building_id,
a.sofa_id,
a.local_date,
a.day_hour,
a.floor_num,
CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency)
AS INT),
concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times
IS NULL, a.times, concat(',', a.times))),
NOW(),
a.process_time
FROM
(SELECT building_id,
sofa_id,
local_date,
day_hour,
floor_num,
count(1) AS frequency,
LISTAGG(occupy_times) AS times,
MAX(process_time) AS process_time,
PROCTIME() AS compute_time
FROM washroom_detail
GROUP BY building_id,
sofa_id,
local_date,
day_hour,
floor_num) a
LEFT JOIN hour_ddl
FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
AND a.sofa_id = b.sofa_id
AND a.local_date = b.local_date
AND a.day_hour = b.`hour`
WHERE a.process_time > b.process_time
OR b.process_time IS NULL
现象:
当mysql 没有数据时,插入一条记录:
occupy_frequency occupy_times
1 15:01-15:03
当主键冲突时,现象是:
occupy_frequency occupy_times
3 15:01-15:03,15:01-15:03,15:03-15:04
结果应该是:
{color:red}occupy_frequency occupy_times
2 15:01-15:03,15:03-15:04{color}
> Appear append data when flink sql sink mysql on key conflict
> ------------------------------------------------------------
>
> Key: FLINK-21430
> URL: https://issues.apache.org/jira/browse/FLINK-21430
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.12.0
> Reporter: Yu Wang
> Priority: Major
>
> kafka data:
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
> 06:39:05.088"}
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
> 06:47:34.609"}
> kafka ddl :
> CREATE TABLE washroom_detail (
> building_id STRING,
> sofa_id STRING,
> floor_num INT,
> occupy_status INT,
> start_time BIGINT,
> end_time BIGINT,
> process_time TIMESTAMP,
> occupy_times as concat(date_format(TIMESTAMPADD(hour, 8,
> cast(start_time / 1000 as timestamp)), 'HH:mm'), '-',
> date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)),
> 'HH:mm')),
> local_date as date_format(cast(start_time / 1000 as
> timestamp), 'yyyy-MM-dd'),
> day_hour as cast(date_format(cast(start_time / 1000 as
> timestamp), 'HH') as INT) + 8
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'xxxxxxxx',
> 'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx',
> 'properties.group.id' = 'xxxxxxxxxxxx',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'json'
> );
> mysql ddl:
> create table hour_ddl
> (
> building_id STRING,
> sofa_id STRING,
> local_date STRING,
> `hour` INT,
> floor_num INT,
> occupy_frequency INT,
> occupy_times STRING,
> update_time TIMESTAMP,
> process_time TIMESTAMP,
> primary key (building_id, sofa_id, local_date, `hour`)
> NOT ENFORCED
> ) with (
> 'connector' = 'jdbc',
> 'url' = 'xxxxxxxx',
> 'table-name' = 'xxxxxxxx',
> 'username' = 'xxxxx'
> 'password' = 'xxxxxx'
> )
> flink sql dml:
> INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num,
> occupy_frequency, occupy_times, update_time, process_time)
> SELECT a.building_id,
> a.sofa_id,
> a.local_date,
> a.day_hour,
> a.floor_num,
> CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0,
> b.occupy_frequency) AS INT),
> concat(if(b.occupy_times IS NULL, '', b.occupy_times),
> if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
> NOW(),
> a.process_time
> FROM
> (SELECT building_id,
> sofa_id,
> local_date,
> day_hour,
> floor_num,
> count(1) AS frequency,
> LISTAGG(occupy_times) AS times,
> MAX(process_time) AS process_time,
> PROCTIME() AS compute_time
> FROM washroom_detail
> GROUP BY building_id,
> sofa_id,
> local_date,
> day_hour,
> floor_num) a
> LEFT JOIN hour_ddl
> FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
> AND a.sofa_id = b.sofa_id
> AND a.local_date = b.local_date
> AND a.day_hour = b.`hour`
> WHERE a.process_time > b.process_time
> OR b.process_time IS NULL
> appearance:
> when mysql has not this record,insert this record:
> occupy_frequency occupy_times
> 1 15:01-15:03
> when key conflict , upsert this record:
> occupy_frequency occupy_times
> 3 15:01-15:03,15:01-15:03,15:03-15:04
> result should be the following record:
> {color:red}occupy_frequency occupy_times
> 2
> 15:01-15:03,15:03-15:04{color}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)