[ 
https://issues.apache.org/jira/browse/FLINK-37213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-37213:
-----------------------------------
    Affects Version/s: 1.19.1

> Improve performance of unbounded OVER aggregations
> --------------------------------------------------
>
>                 Key: FLINK-37213
>                 URL: https://issues.apache.org/jira/browse/FLINK-37213
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.1, 2.0-preview
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>
> Unbounded over aggregations can be painfully slow.
> For example queries like:
> {code:sql}
> SELECT 
>   rowtime, 
>   b, 
>   c, 
>   min(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED 
> preceding AND CURRENT ROW), 
>   max(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED 
> preceding AND CURRENT ROW)
> FROM T
> -- or RANGE equivalent:
> SELECT 
>   rowtime, 
>   b, 
>   c, 
>   min(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED 
> preceding AND CURRENT ROW), 
>   max(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED 
> preceding AND CURRENT ROW)
> FROM T
> {code}
> On each processed watermark, regardless if any output will be produced by 
> this query, they need to sort the whole state. 
> For example we had a production query that had a key space of ~50 000 000 
> elements and we were expecting it to process around 1M records/h 
> (~270records/s), with new watermark arriving every ~200ms. Flink SQL even 
> with parallelism of 50 was not able to handle this load, averaging ~2 
> record/s/operator.
> The problem was that on each watermark, so nominally every 200ms, a timer for 
> each key was firing, sorting all buffered records and checking if any 
> buffered record has {{$rowtime <= watermar}} - given the frequency of 
> watermarks and number of keys that was extremely rare. The end result was 
> that single parallel instance of the 
> {{RowTimeRowsUnboundedPrecedingFunction}} was firing ~20 000 timers/s while 
> processing only ~2 records/s.
> I'm proposing to add a new version of 
> {{RowTimeRowsUnboundedPrecedingFunction}} and 
> {{RowTimeRangeUnboundedPrecedingFunction}} that would more like 
> {{StreamExecTemporalSort}}. Registering timer per each encountered 
> {{$rowtime}} value, and using timers to access the buffered records in the 
> ascending order of {{$rowtime}}. This way we would avoid:
> * unnecessarily firing of timers - timer would be fired only if there are 
> records to be processed
> * on each timer we don't need to sort all of the state, but just access one 
> single key in the {{MapState<Long, List<RowData>> inputState}} buffer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to