[
https://issues.apache.org/jira/browse/FLINK-25559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zongwen Li updated FLINK-25559:
-------------------------------
Description:
{code:java}
//sink table,omits some physical fields
CREATE TABLE kd_product_info (
productSaleId BIGINT COMMENT '商品编号',
productId BIGINT COMMENT '产品编号',
PRIMARY KEY (productSaleId) NOT ENFORCED
)
// sql omits some selected fields
INSERT INTO kd_product_info
SELECT
ps.product AS productId,
ps.productsaleid AS productSaleId,
CAST(p.complex AS INT) AS complex,
p.createtime AS createTime,
p.updatetime AS updateTime,
p.ean AS ean,
ts.availablequantity AS totalAvailableStock,
IF
(
ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
0
) AS sharedStock
,rps.purchase AS purchase
,v.`name` AS vendorName
FROM
product_sale ps
JOIN product p ON ps.product = p.id
LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory =
mc.id
LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale
AND pse359.meta = 359
LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
All table sources are upsert-kafka,I have ensured that the associated columns
are of the same type:
{code:java}
CREATE TABLE product_sale (
id BIGINT COMMENT '主键',
productsaleid BIGINT COMMENT '商品编号',
product BIGINT COMMENT '产品编号',
merchant_id DECIMAL(20, 0) COMMENT '商户id',
vendor STRING COMMENT '供应商',
PRIMARY KEY (productsaleid) NOT ENFORCED
)
// No computed columns
// Just plain physical columns
WITH (
'connector' = 'upsert-kafka',
'topic' = 'XXX',
'group.id' = '%s',
'properties.bootstrap.servers' = '%s',
'key.format' = 'json',
'value.format' = 'json'
)
CREATE TABLE product (
id BIGINT,
mccategory STRING,
PRIMARY KEY (id) NOT ENFORCED
)
CREATE TABLE rate_product_sale (
id BIGINT COMMENT '主键',
PRIMARY KEY (id) NOT ENFORCED
)
CREATE TABLE pss_total_stock (
id INT COMMENT 'ID',
productsale BIGINT COMMENT '商品编码',
PRIMARY KEY (id) NOT ENFORCED
)
CREATE TABLE vendor (
merchant_id DECIMAL(20, 0) COMMENT '商户id',
vendor STRING COMMENT '供应商编码',
PRIMARY KEY (merchant_id, vendor) NOT ENFORCED
)
CREATE TABLE mccategory (
id STRING COMMENT 'mc编号',
merchant_id DECIMAL(20, 0) COMMENT '商户id',
PRIMARY KEY (merchant_id, id) NOT ENFORCED
)
CREATE TABLE new_mccategory (
mc STRING,
PRIMARY KEY (mc) NOT ENFORCED
)
CREATE TABLE product_sale_grade_plus (
productsale BIGINT,
PRIMARY KEY (productsale) NOT ENFORCED
)
CREATE TABLE product_sale_extend (
id BIGINT,
product_sale BIGINT,
meta BIGINT,
PRIMARY KEY (id) NOT ENFORCED
)
CREATE TABLE product_image_url(
product BIGINT,
PRIMARY KEY (product) NOT ENFORCED
){code}
the data in each table is between 5 million and 10 million, parallelism: 24;
Not set ttl; In fact, we can notice data loss as soon as 30 minutes.
The data flow:
MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink
I'm sure the ODS data in Kafka is correct;
I have also tried to use the flink-cdc source directly, it didn't solve the
problem;
We tested sinking to kudu, Kafka or ES;
Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;
Lost data appears out of order on kafka, guessed as a bug of retraction stream:
!image-2022-01-07-11-27-01-010.png!
After many tests, we found that when the left join table is more or the
parallelism of the operator is greater, the data will be more easily lost.
was:
{code:java}
// sql omits some selected fields
INSERT INTO kd_product_info
SELECT
ps.product AS productId,
ps.productsaleid AS productSaleId,
CAST(p.complex AS INT) AS complex,
p.createtime AS createTime,
p.updatetime AS updateTime,
p.ean AS ean,
ts.availablequantity AS totalAvailableStock,
IF
(
ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
0
) AS sharedStock
,rps.purchase AS purchase
,v.`name` AS vendorName
FROM
product_sale ps
JOIN product p ON ps.product = p.id
LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory =
mc.id
LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale
AND pse359.meta = 359
LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
All table sources are upsert-kafka,I have ensured that the associated columns
are of the same type:
{code:java}
// No computed columns
// Just plain physical columns
PRIMARY KEY (xx) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'XXX',
'group.id' = '%s',
'properties.bootstrap.servers' = '%s',
'key.format' = 'json',
'value.format' = 'json'
) {code}
the data in each table is between 5 million and 10 million, parallelism: 24;
Not set ttl; In fact, we can notice data loss as soon as 30 minutes.
The data flow:
MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink
I'm sure the ODS data in Kafka is correct;
I have also tried to use the flink-cdc source directly, it didn't solve the
problem;
We tested sinking to kudu, Kafka or ES;
Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;
Lost data appears out of order on kafka, guessed as a bug of retraction stream:
!image-2022-01-07-11-27-01-010.png!
After many tests, we found that when the left join table is more or the
parallelism of the operator is greater, the data will be more easily lost.
> SQL JOIN causes data loss
> -------------------------
>
> Key: FLINK-25559
> URL: https://issues.apache.org/jira/browse/FLINK-25559
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2
> Reporter: Zongwen Li
> Priority: Major
> Attachments: image-2022-01-07-11-27-01-010.png
>
>
> {code:java}
> //sink table,omits some physical fields
> CREATE TABLE kd_product_info (
> productSaleId BIGINT COMMENT '商品编号',
> productId BIGINT COMMENT '产品编号',
> PRIMARY KEY (productSaleId) NOT ENFORCED
> )
> // sql omits some selected fields
> INSERT INTO kd_product_info
> SELECT
> ps.product AS productId,
> ps.productsaleid AS productSaleId,
> CAST(p.complex AS INT) AS complex,
> p.createtime AS createTime,
> p.updatetime AS updateTime,
> p.ean AS ean,
> ts.availablequantity AS totalAvailableStock,
> IF
> (
> ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
> ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
> 0
> ) AS sharedStock
> ,rps.purchase AS purchase
> ,v.`name` AS vendorName
> FROM
> product_sale ps
> JOIN product p ON ps.product = p.id
> LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
> LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
> LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
> LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory
> = mc.id
> LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
> LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
> LEFT JOIN product_sale_extend pse359 ON ps.productsaleid =
> pse359.product_sale AND pse359.meta = 359
> LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
> All table sources are upsert-kafka,I have ensured that the associated columns
> are of the same type:
>
> {code:java}
> CREATE TABLE product_sale (
> id BIGINT COMMENT '主键',
> productsaleid BIGINT COMMENT '商品编号',
> product BIGINT COMMENT '产品编号',
> merchant_id DECIMAL(20, 0) COMMENT '商户id',
> vendor STRING COMMENT '供应商',
> PRIMARY KEY (productsaleid) NOT ENFORCED
> )
> // No computed columns
> // Just plain physical columns
> WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'XXX',
> 'group.id' = '%s',
> 'properties.bootstrap.servers' = '%s',
> 'key.format' = 'json',
> 'value.format' = 'json'
> )
> CREATE TABLE product (
> id BIGINT,
> mccategory STRING,
> PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE rate_product_sale (
> id BIGINT COMMENT '主键',
> PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE pss_total_stock (
> id INT COMMENT 'ID',
> productsale BIGINT COMMENT '商品编码',
> PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE vendor (
> merchant_id DECIMAL(20, 0) COMMENT '商户id',
> vendor STRING COMMENT '供应商编码',
> PRIMARY KEY (merchant_id, vendor) NOT ENFORCED
> )
> CREATE TABLE mccategory (
> id STRING COMMENT 'mc编号',
> merchant_id DECIMAL(20, 0) COMMENT '商户id',
> PRIMARY KEY (merchant_id, id) NOT ENFORCED
> )
> CREATE TABLE new_mccategory (
> mc STRING,
> PRIMARY KEY (mc) NOT ENFORCED
> )
> CREATE TABLE product_sale_grade_plus (
> productsale BIGINT,
> PRIMARY KEY (productsale) NOT ENFORCED
> )
> CREATE TABLE product_sale_extend (
> id BIGINT,
> product_sale BIGINT,
> meta BIGINT,
> PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE product_image_url(
> product BIGINT,
> PRIMARY KEY (product) NOT ENFORCED
> ){code}
> the data in each table is between 5 million and 10 million, parallelism: 24;
> Not set ttl; In fact, we can notice data loss as soon as 30 minutes.
>
> The data flow:
> MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink
> I'm sure the ODS data in Kafka is correct;
> I have also tried to use the flink-cdc source directly, it didn't solve the
> problem;
>
> We tested sinking to kudu, Kafka or ES;
> Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;
> Lost data appears out of order on kafka, guessed as a bug of retraction
> stream:
> !image-2022-01-07-11-27-01-010.png!
>
> After many tests, we found that when the left join table is more or the
> parallelism of the operator is greater, the data will be more easily lost.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)