[ 
https://issues.apache.org/jira/browse/FLINK-34559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-34559:
--------------------------------------
    Description: 
RecordsWindowBuffer flushes buffered records in the following cases:
 * watermark
 * checkpoint barrier
 * buffer overflow

 

In two-phase aggregations, this creates the following problems:

1) Local aggregation: enters hard-backpressure because for flush, it outputs 
the data downstream and doesn't check network buffer availability

This already disrupts normal checkpointing and watermarks progression

 

2) Global aggregation: 

When the window is large enough and/or the watermark is lagging, lots of data 
is flushed to state backend (and the state is updated) in checkpoint SYNC phase.

 

All this eventually causes checkpoint timeouts (10 minutes in our env).

 

Example query
{code:java}
INSERT INTO `target_table` 

SELECT window_start, window_end, some, attributes, SUM(view_time) AS 
total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages 

FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' HOUR)) 

GROUP BY window_start, window_end, some, attributes;{code}
In our setup, the issue can be reproduced deterministically.

 

As a quick fix, we might want to:
 # limit the amount of data buffered in Global Aggregation nodes
 # disable two-phase aggregations, i.e. Local Aggregations (we can try to limit 
buffing there two, but network buffer availability can not be easily checked 
from the operator)

  was:
RecordsWindowBuffer flushes buffered records in the following cases:
 * watermark
 * checkpoint barrier
 * buffer overflow

 

In two-phase aggregations, this creates the following problems:

1) Local aggregation: enters hard-backpressure because for flush, it outputs 
the data downstream and doesn't check network buffer availability

This already disrupts normal checkpointing and watermarks progression

 

2) Global aggregation: 

When the window is large enough and/or the watermark is lagging, lots of data 
is flushed to state backend (and the state is updated) in checkpoint SYNC phase.

 

All this eventually causes checkpoint timeouts (10 minutes in our env).

 

Example query
{code:java}
INSERT INTO `target_table` 

SELECT window_start, window_end, some, attributes, SUM(view_time) AS 
total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages 

FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' HOUR)) 

GROUP BY window_start, window_end, some, attributes;{code}
 

As a quick fix, we might want to:
 # limit the amount of data buffered in Global Aggregation nodes
 # disable two-phase aggregations, i.e. Local Aggregations (we can try to limit 
buffing there two, but network buffer availability can not be easily checked 
from the operator)


> TVF Window Aggregations might get stuck
> ---------------------------------------
>
>                 Key: FLINK-34559
>                 URL: https://issues.apache.org/jira/browse/FLINK-34559
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.0, 1.18.1
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.19.0
>
>
> RecordsWindowBuffer flushes buffered records in the following cases:
>  * watermark
>  * checkpoint barrier
>  * buffer overflow
>  
> In two-phase aggregations, this creates the following problems:
> 1) Local aggregation: enters hard-backpressure because for flush, it outputs 
> the data downstream and doesn't check network buffer availability
> This already disrupts normal checkpointing and watermarks progression
>  
> 2) Global aggregation: 
> When the window is large enough and/or the watermark is lagging, lots of data 
> is flushed to state backend (and the state is updated) in checkpoint SYNC 
> phase.
>  
> All this eventually causes checkpoint timeouts (10 minutes in our env).
>  
> Example query
> {code:java}
> INSERT INTO `target_table` 
> SELECT window_start, window_end, some, attributes, SUM(view_time) AS 
> total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages 
> FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' 
> HOUR)) 
> GROUP BY window_start, window_end, some, attributes;{code}
> In our setup, the issue can be reproduced deterministically.
>  
> As a quick fix, we might want to:
>  # limit the amount of data buffered in Global Aggregation nodes
>  # disable two-phase aggregations, i.e. Local Aggregations (we can try to 
> limit buffing there two, but network buffer availability can not be easily 
> checked from the operator)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to