[ 
https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866216#comment-17866216
 ] 

xuyang commented on FLINK-35826:
--------------------------------

Hi, let me try to explain the cause for this problem.

I have reproduced this problem by the following source data and query:

https://github.com/xuyangzhong/flink/tree/window_bug

 
{code:java}
// source data
// <changelog, pk, sum_column, event_time, pt>
("+I", 1, 1, "2024-03-13T10:12", "1"),
("+I", 2, 3, "2024-03-13T10:16", "1"),
("+I", 3, 2, "2024-03-13T10:13", "1")

// schema
CREATE TABLE MyTable3 (
  id int,
  amount int,
  create_time timestamp(3),
  pt string,
  proc_time AS PROCTIME(),
  WATERMARK FOR `create_time` AS `create_time` - INTERVAL '0' MINUTES,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (...)

// query
select pt, 
  HOP_START(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS 
w_start,
  HOP_END(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS w_end,
  sum(amount) as count_age
  from MyTable3
group by HOP(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES), pt;

// expected result:
"1,2024-03-13T10:05,2024-03-13T10:15,1",
"1,2024-03-13T10:10,2024-03-13T10:20,6",
"1,2024-03-13T10:15,2024-03-13T10:25,3"

// but sometimes wrong:
"1,2024-03-13T10:05,2024-03-13T10:15,3",
"1,2024-03-13T10:10,2024-03-13T10:20,6",
"1,2024-03-13T10:15,2024-03-13T10:25,3" {code}
 

{color:#FF0000}*Conclusion:*{color}

The bug is caused by that the data `("+I", 3, 2, "2024-03-13T10:13", "1")` 
sometimes is not treated as a late record because of the watermark `create_time 
- INTERVAL '0' MINUTES`.

 

*Detailed cause:*

The plan: 
{code:java}
**** parallelism is 4 **** 
(t1 means subtask-1)source&watermark assigner(t1)  -> changelogNormalize (t1) 
-> group window agg(t1)         
                                                   -> changelogNormalize (t2) 
-> group window agg(t2)         
                                                   -> changelogNormalize (t3) 
-> group window agg(t3)         
                                                   -> changelogNormalize (t4) 
-> group window agg(t4) {code}
Data accross changelogNormalize:

Data is simplified by <pk, sum_column>, and Watermark is simplified to minutes.
| |t1|t2|t3|t4|
|data|<1,1>|-|-|-|
|watermark|12|12|12|12|
|data|-|<2, 3>|-|-|
|watermark|16|16{color:#FF0000}[1]{color}|16|16|
|data|-|-|<3, 2>|-|
|watermark|Long.MAX|Long.MAX|Long.MAX|Long.MAX|

[1]If we block the processing of watermark in ChangelogNormalize [1] for a 
little time,  <3, 2> will not been treated as a late record because the 
timestamp <3, 2> is 13(less than the watermark 12 in GroupWindowAgg )

Wrong data process in GroupWindowAgg:
|data or watermark|window's acc|output|
|<1, 1> // t1|[10, 15) : 1|-|
|Watermark 12 // t1, t2, t3, t4|[10, 15): 1|-|
|<2, 3> // t2|[10, 15): 1
[15, 20): 3|-|
|Watermark 16 // t1, t3, t4|[10, 15): 1
[15, 20): 3|-|
|<3, 2> // t3|[10, 15): 3
[15, 20): 3|-|
|{color:#FF0000}Watermark 16 // t2 (blocked){color}|[10, 15): 3
[15, 20): 3|[5, 15): 3|
|Watermark Long.MAX// t1,t2,t3, t4| |[10, 20): 6
[15, 25): 3|

 

> [SQL] Sliding window may produce unstable calculations when processing 
> changelog data.
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-35826
>                 URL: https://issues.apache.org/jira/browse/FLINK-35826
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0
>         Environment: flink with release-1.20
>            Reporter: Yuan Kui
>            Assignee: xuyang
>            Priority: Major
>         Attachments: image-2024-07-12-14-27-58-061.png
>
>
> Calculation results may be unstable when using a sliding window to process 
> changelog data. Repeat the execution 10 times, the test results are partial 
> success and partial failure:
> !image-2024-07-12-14-27-58-061.png!
> See the documentation and code for more details.
> [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing]
> code:
> [[BUG] Reproduce the issue of unstable sliding window calculation results · 
> yuchengxin/flink@c003e45 
> (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to