Piotr Nowojski created FLINK-37213:
--------------------------------------
Summary: Improve performance of unbounded OVER aggregations
Key: FLINK-37213
URL: https://issues.apache.org/jira/browse/FLINK-37213
Project: Flink
Issue Type: Improvement
Components: Table SQL / Runtime
Affects Versions: 2.0-preview
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
Unbounded over aggregations can be painfully slow.
For example queries like:
{code:sql}
SELECT
rowtime,
b,
c,
min(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding
AND CURRENT ROW),
max(c) OVER (PARTITION BY b ORDER BY rowtime ROWS BETWEEN UNBOUNDED preceding
AND CURRENT ROW)
FROM T
-- or RANGE equivalent:
SELECT
rowtime,
b,
c,
min(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED
preceding AND CURRENT ROW),
max(c) OVER (PARTITION BY b ORDER BY rowtime RANGE BETWEEN UNBOUNDED
preceding AND CURRENT ROW)
FROM T
{code}
On each processed watermark, regardless if any output will be produced by this
query, they need to sort the whole state.
For example we had a production query that had a key space of ~50 000 000
elements and we were expecting it to process around 1M records/h
(~270records/s), with new watermark arriving every ~200ms. Flink SQL even with
parallelism of 50 was not able to handle this load, averaging ~2
record/s/operator.
The problem was that on each watermark, so nominally every 200ms, a timer for
each key was firing, sorting all buffered records and checking if any buffered
record has {{$rowtime <= watermar}} - given the frequency of watermarks and
number of keys that was extremely rare. The end result was that single parallel
instance of the {{RowTimeRowsUnboundedPrecedingFunction}} was firing ~20 000
timers/s while processing only ~2 records/s.
I'm proposing to add a new version of {{RowTimeRowsUnboundedPrecedingFunction}}
and {{RowTimeRangeUnboundedPrecedingFunction}} that would more like
{{StreamExecTemporalSort}}. Registering timer per each encountered {{$rowtime}}
value, and using timers to access the buffered records in the ascending order
of {{$rowtime}}. This way we would avoid:
* unnecessarily firing of timers - timer would be fired only if there are
records to be processed
* on each timer we don't need to sort all of the state, but just access one
single key in the {{MapState<Long, List<RowData>> inputState}} buffer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)