[ 
https://issues.apache.org/jira/browse/FLINK-25559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zongwen Li updated FLINK-25559:
-------------------------------
    Description: 
{code:java}
//sink table,omits some physical fields
CREATE TABLE kd_product_info (
  productSaleId BIGINT COMMENT '商品编号',
  productId BIGINT COMMENT '产品编号',
  PRIMARY KEY (productSaleId) NOT ENFORCED
)

// sql omits some selected fields
INSERT INTO kd_product_info
SELECT
 ps.product AS productId,
 ps.productsaleid AS productSaleId,
 CAST(p.complex AS INT) AS complex,
 p.createtime AS createTime,
 p.updatetime AS updateTime,
 p.ean AS ean,
 ts.availablequantity AS totalAvailableStock,
IF
 (
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
  0
 ) AS sharedStock
 ,rps.purchase AS purchase
 ,v.`name` AS vendorName
 FROM
 product_sale ps
 JOIN product p ON ps.product = p.id
 LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
 LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
 LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
 LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory = 
mc.id
 LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
 LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
 LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale 
AND pse359.meta = 359
 LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
All table sources are upsert-kafka,I have ensured that the associated columns 
are of the same type:

 
{code:java}
CREATE TABLE product_sale (
  id BIGINT COMMENT '主键',
  productsaleid BIGINT COMMENT '商品编号',
  product BIGINT COMMENT '产品编号',
  merchant_id DECIMAL(20, 0) COMMENT '商户id',
  vendor STRING COMMENT '供应商',
  PRIMARY KEY (productsaleid) NOT ENFORCED
)
// No computed columns
// Just plain physical columns
WITH (
'connector' = 'upsert-kafka',
'topic' = 'XXX',
'group.id' = '%s',
'properties.bootstrap.servers' = '%s',
'key.format' = 'json',
'value.format' = 'json'
) 

CREATE TABLE product (
  id BIGINT,
  mccategory STRING,
  PRIMARY KEY (id) NOT ENFORCED
)

CREATE TABLE rate_product_sale ( 
  id BIGINT COMMENT '主键',
  PRIMARY KEY (id) NOT ENFORCED
)

CREATE TABLE pss_total_stock (
  id INT COMMENT 'ID',
  productsale BIGINT COMMENT '商品编码',
  PRIMARY KEY (id) NOT ENFORCED
)

CREATE TABLE vendor (
  merchant_id DECIMAL(20, 0) COMMENT '商户id',
  vendor STRING COMMENT '供应商编码',
  PRIMARY KEY (merchant_id, vendor) NOT ENFORCED
)

CREATE TABLE mccategory (
  id STRING COMMENT 'mc编号',
  merchant_id DECIMAL(20, 0) COMMENT '商户id',
  PRIMARY KEY (merchant_id, id) NOT ENFORCED
)

CREATE TABLE new_mccategory (
  mc STRING,
  PRIMARY KEY (mc) NOT ENFORCED
)

CREATE TABLE product_sale_grade_plus (
  productsale BIGINT,
  PRIMARY KEY (productsale) NOT ENFORCED
)

CREATE TABLE product_sale_extend (
  id BIGINT,
  product_sale BIGINT,
  meta BIGINT,
  PRIMARY KEY (id) NOT ENFORCED
)

CREATE TABLE product_image_url(
  product BIGINT,
  PRIMARY KEY (product) NOT ENFORCED 
){code}
the data in each table is between 5 million and 10 million, parallelism: 24;

Not set ttl; In fact, we can notice data loss as soon as 30 minutes.

 

The data flow:

MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink

I'm sure the ODS data in Kafka is correct;

I have also tried to use the flink-cdc source directly, it didn't solve the 
problem;

 

We tested sinking to kudu, Kafka or ES;

Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;

Lost data appears out of order on kafka, guessed as a bug of retraction stream:

!image-2022-01-07-11-27-01-010.png!

 

After many tests, we found that when the left join table is more or the 
parallelism of the operator is greater, the data will be more easily lost.

  was:
{code:java}
// sql omits some selected fields
INSERT INTO kd_product_info
SELECT
 ps.product AS productId,
 ps.productsaleid AS productSaleId,
 CAST(p.complex AS INT) AS complex,
 p.createtime AS createTime,
 p.updatetime AS updateTime,
 p.ean AS ean,
 ts.availablequantity AS totalAvailableStock,
IF
 (
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
  0
 ) AS sharedStock
 ,rps.purchase AS purchase
 ,v.`name` AS vendorName
 FROM
 product_sale ps
 JOIN product p ON ps.product = p.id
 LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
 LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
 LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
 LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory = 
mc.id
 LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
 LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
 LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale 
AND pse359.meta = 359
 LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
All table sources are upsert-kafka,I have ensured that the associated columns 
are of the same type:

 
{code:java}
// No computed columns
// Just plain physical columns
PRIMARY KEY (xx) NOT ENFORCED 
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'XXX',
'group.id' = '%s',
'properties.bootstrap.servers' = '%s',
'key.format' = 'json',
'value.format' = 'json'
) {code}
the data in each table is between 5 million and 10 million, parallelism: 24;

Not set ttl; In fact, we can notice data loss as soon as 30 minutes.

 

The data flow:

MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink

I'm sure the ODS data in Kafka is correct;

I have also tried to use the flink-cdc source directly, it didn't solve the 
problem;

 

We tested sinking to kudu, Kafka or ES;

Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;

Lost data appears out of order on kafka, guessed as a bug of retraction stream:

!image-2022-01-07-11-27-01-010.png!

 

After many tests, we found that when the left join table is more or the 
parallelism of the operator is greater, the data will be more easily lost.


> SQL JOIN causes data loss
> -------------------------
>
>                 Key: FLINK-25559
>                 URL: https://issues.apache.org/jira/browse/FLINK-25559
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2
>            Reporter: Zongwen Li
>            Priority: Major
>         Attachments: image-2022-01-07-11-27-01-010.png
>
>
> {code:java}
> //sink table,omits some physical fields
> CREATE TABLE kd_product_info (
>   productSaleId BIGINT COMMENT '商品编号',
>   productId BIGINT COMMENT '产品编号',
>   PRIMARY KEY (productSaleId) NOT ENFORCED
> )
> // sql omits some selected fields
> INSERT INTO kd_product_info
> SELECT
>  ps.product AS productId,
>  ps.productsaleid AS productSaleId,
>  CAST(p.complex AS INT) AS complex,
>  p.createtime AS createTime,
>  p.updatetime AS updateTime,
>  p.ean AS ean,
>  ts.availablequantity AS totalAvailableStock,
> IF
>  (
>   ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
>   ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
>   0
>  ) AS sharedStock
>  ,rps.purchase AS purchase
>  ,v.`name` AS vendorName
>  FROM
>  product_sale ps
>  JOIN product p ON ps.product = p.id
>  LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
>  LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
>  LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
>  LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory 
> = mc.id
>  LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
>  LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
>  LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = 
> pse359.product_sale AND pse359.meta = 359
>  LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
> All table sources are upsert-kafka,I have ensured that the associated columns 
> are of the same type:
>  
> {code:java}
> CREATE TABLE product_sale (
>   id BIGINT COMMENT '主键',
>   productsaleid BIGINT COMMENT '商品编号',
>   product BIGINT COMMENT '产品编号',
>   merchant_id DECIMAL(20, 0) COMMENT '商户id',
>   vendor STRING COMMENT '供应商',
>   PRIMARY KEY (productsaleid) NOT ENFORCED
> )
> // No computed columns
> // Just plain physical columns
> WITH (
> 'connector' = 'upsert-kafka',
> 'topic' = 'XXX',
> 'group.id' = '%s',
> 'properties.bootstrap.servers' = '%s',
> 'key.format' = 'json',
> 'value.format' = 'json'
> ) 
> CREATE TABLE product (
>   id BIGINT,
>   mccategory STRING,
>   PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE rate_product_sale ( 
>   id BIGINT COMMENT '主键',
>   PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE pss_total_stock (
>   id INT COMMENT 'ID',
>   productsale BIGINT COMMENT '商品编码',
>   PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE vendor (
>   merchant_id DECIMAL(20, 0) COMMENT '商户id',
>   vendor STRING COMMENT '供应商编码',
>   PRIMARY KEY (merchant_id, vendor) NOT ENFORCED
> )
> CREATE TABLE mccategory (
>   id STRING COMMENT 'mc编号',
>   merchant_id DECIMAL(20, 0) COMMENT '商户id',
>   PRIMARY KEY (merchant_id, id) NOT ENFORCED
> )
> CREATE TABLE new_mccategory (
>   mc STRING,
>   PRIMARY KEY (mc) NOT ENFORCED
> )
> CREATE TABLE product_sale_grade_plus (
>   productsale BIGINT,
>   PRIMARY KEY (productsale) NOT ENFORCED
> )
> CREATE TABLE product_sale_extend (
>   id BIGINT,
>   product_sale BIGINT,
>   meta BIGINT,
>   PRIMARY KEY (id) NOT ENFORCED
> )
> CREATE TABLE product_image_url(
>   product BIGINT,
>   PRIMARY KEY (product) NOT ENFORCED 
> ){code}
> the data in each table is between 5 million and 10 million, parallelism: 24;
> Not set ttl; In fact, we can notice data loss as soon as 30 minutes.
>  
> The data flow:
> MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink
> I'm sure the ODS data in Kafka is correct;
> I have also tried to use the flink-cdc source directly, it didn't solve the 
> problem;
>  
> We tested sinking to kudu, Kafka or ES;
> Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;
> Lost data appears out of order on kafka, guessed as a bug of retraction 
> stream:
> !image-2022-01-07-11-27-01-010.png!
>  
> After many tests, we found that when the left join table is more or the 
> parallelism of the operator is greater, the data will be more easily lost.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to