[
https://issues.apache.org/jira/browse/FLINK-37213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski closed FLINK-37213.
----------------------------------
Fix Version/s: 2.1.0
Release Note:
Performance of unbounded OVER aggregations in Table API and SQL has been
greatly improved when the key space is quite large. For one real world use case
the throughput has improved from ~1 record/s/operator up to ~20000
records/s/operator.
The new implementation is used by default for freshly submitted queries. There
is no migration path for the jobs that were using the old code path - they have
to be resubmitted.
Old code path can be still used for new jobs by setting
{{table.exec.unbounded-over.version}} to {{1}}. (default value is 2).
Resolution: Fixed
merged commit 77edff6 into apache:master
> Improve performance of unbounded OVER aggregations
> --------------------------------------------------
>
> Key: FLINK-37213
> URL: https://issues.apache.org/jira/browse/FLINK-37213
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Runtime
> Affects Versions: 1.19.1, 2.0-preview
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.1.0
>
>
> Unbounded over aggregations can be painfully slow.
> For example queries like:
> {code:sql}
> SELECT
> rowtime,
> b,
> c,
> min(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED
> preceding AND CURRENT ROW),
> max(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED
> preceding AND CURRENT ROW)
> FROM T
> -- or RANGE equivalent:
> SELECT
> rowtime,
> b,
> c,
> min(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED
> preceding AND CURRENT ROW),
> max(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED
> preceding AND CURRENT ROW)
> FROM T
> {code}
> On each processed watermark, regardless if any output will be produced by
> this query, they need to sort the whole state.
> For example we had a production query that had a key space of ~50 000 000
> elements and we were expecting it to process around 1M records/h
> (~270records/s), with new watermark arriving every ~200ms. Flink SQL even
> with parallelism of 50 was not able to handle this load, averaging ~2
> record/s/operator.
> The problem was that on each watermark, so nominally every 200ms, a timer for
> each key was firing, sorting all buffered records and checking if any
> buffered record has {{$rowtime <= watermar}} - given the frequency of
> watermarks and number of keys that was extremely rare. The end result was
> that single parallel instance of the
> {{RowTimeRowsUnboundedPrecedingFunction}} was firing ~20 000 timers/s while
> processing only ~2 records/s.
> I'm proposing to add a new version of
> {{RowTimeRowsUnboundedPrecedingFunction}} and
> {{RowTimeRangeUnboundedPrecedingFunction}} that would more like
> {{StreamExecTemporalSort}}. Registering timer per each encountered
> {{$rowtime}} value, and using timers to access the buffered records in the
> ascending order of {{$rowtime}}. This way we would avoid:
> * unnecessarily firing of timers - timer would be fired only if there are
> records to be processed
> * on each timer we don't need to sort all of the state, but just access one
> single key in the {{MapState<Long, List<RowData>> inputState}} buffer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)