[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356153#comment-17356153
 ] 

徐州州 commented on FLINK-22826:
-----------------------------

 I set the parallelism to 1, Data is still lost,

My business didn't use no window and state expiration time,

Two consecutive days of non-stop operation.

The first day is normal because the first day only takes the data of the day, 
but the status of the order table changes the next day, resulting in the 
operation of + u.

I write the Kafka upert table based on the canal JSON Association,

but I find that even if the parallelism is set to 1,

because the first day is modified the next day,

the order table data of the original layer of the second day is Three pieces of 
+ U data,

but when it comes to the join stage, you should execute - U first and then + u,

But the program executes +u first, and then -u.

I lost my data

> flink sql1.13.1 causes data loss based on change log stream data join
> ---------------------------------------------------------------------
>
>                 Key: FLINK-22826
>                 URL: https://issues.apache.org/jira/browse/FLINK-22826
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.0, 1.13.1
>            Reporter: 徐州州
>            Priority: Blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>    ord.Id,
>    ord.Code,
>    Status
>      concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd'))  as uuids,
>      TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U |             XJ0120210531004794 |          50 |
> | +U |             XJ0120210531004672 |          50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I |             XJ0120210531004794 |          50 |
> | -U |             XJ0120210531004794 |          50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to