[
https://issues.apache.org/jira/browse/FLINK-37213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski updated FLINK-37213:
-----------------------------------
Affects Version/s: 1.19.1
> Improve performance of unbounded OVER aggregations
> --------------------------------------------------
>
> Key: FLINK-37213
> URL: https://issues.apache.org/jira/browse/FLINK-37213
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Runtime
> Affects Versions: 1.19.1, 2.0-preview
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
> Priority: Major
>
> Unbounded over aggregations can be painfully slow.
> For example queries like:
> {code:sql}
> SELECT
> rowtime,
> b,
> c,
> min(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED
> preceding AND CURRENT ROW),
> max(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED
> preceding AND CURRENT ROW)
> FROM T
> -- or RANGE equivalent:
> SELECT
> rowtime,
> b,
> c,
> min(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED
> preceding AND CURRENT ROW),
> max(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED
> preceding AND CURRENT ROW)
> FROM T
> {code}
> On each processed watermark, regardless if any output will be produced by
> this query, they need to sort the whole state.
> For example we had a production query that had a key space of ~50 000 000
> elements and we were expecting it to process around 1M records/h
> (~270records/s), with new watermark arriving every ~200ms. Flink SQL even
> with parallelism of 50 was not able to handle this load, averaging ~2
> record/s/operator.
> The problem was that on each watermark, so nominally every 200ms, a timer for
> each key was firing, sorting all buffered records and checking if any
> buffered record has {{$rowtime <= watermar}} - given the frequency of
> watermarks and number of keys that was extremely rare. The end result was
> that single parallel instance of the
> {{RowTimeRowsUnboundedPrecedingFunction}} was firing ~20 000 timers/s while
> processing only ~2 records/s.
> I'm proposing to add a new version of
> {{RowTimeRowsUnboundedPrecedingFunction}} and
> {{RowTimeRangeUnboundedPrecedingFunction}} that would more like
> {{StreamExecTemporalSort}}. Registering timer per each encountered
> {{$rowtime}} value, and using timers to access the buffered records in the
> ascending order of {{$rowtime}}. This way we would avoid:
> * unnecessarily firing of timers - timer would be fired only if there are
> records to be processed
> * on each timer we don't need to sort all of the state, but just access one
> single key in the {{MapState<Long, List<RowData>> inputState}} buffer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)