alamb opened a new issue, #23197:
URL: https://github.com/apache/datafusion/issues/23197
### Is your feature request related to a problem or challenge?
This ticket tracks various ideas to make evaluating window functions faster
for single, often large windows. This has come up several times recently wiht
@avantgardnerio @2010YOUY01 and @wirybeaver among others
The core use case is to make evaluating a query like this fast:
```
SELECT
-- moving average over the current row and the 5 previous rows
AVG(cpu_usage) OVER (ORDER BY time ROWS BETWEEN 5 PRECEDING AND CURRENT
ROW) as avg_cpu
FROM metrics;
```
The key property is that the number of rows in individual windows is large
as there is no `PARTITION BY` in the window clause (`OVER`)
(note a similar thing can happen when there is a `PARTITION BY` clause but
imbalanced window sizes due to skew or small numbers of partitions)
Today, each window is executed in a single DataFusion partition which means:
1. They are limited to a single core
2. They are limited to a single machine in distributed environments
# Background
DataFusion has support for many window functions (see [doc
link](https://datafusion.apache.org/user-guide/sql/window_functions.html)).
Window functions are used like this
```sql
SELECT
customer, time, cpu_usage,
-- moving average over the current row and the 5 previous rows
AVG(cpu_usage) OVER (PARTITION BY customer ORDER BY time ROWS BETWEEN 5
PRECEDING AND CURRENT ROW) as avg_cpu
FROM
metrics;
```
Which results in something like
```sql
+----------+---------------------+-----------+---------+
| customer | time | cpu_usage | avg_cpu |
+----------+---------------------+-----------+---------+
| acme | 2026-01-01T00:00:00 | 10.0 | 10.0 |
| acme | 2026-01-01T00:01:00 | 20.0 | 15.0 |
...
| globex | 2026-01-01T00:03:00 | 45.0 | 30.0 |
| globex | 2026-01-01T00:04:00 | 55.0 | 35.0 |
+----------+---------------------+-----------+---------+
```
The plan for such a query looks like the following, and typically keeps all
cores fully occupied
```sql
BoundedWindowExec(...)
Repartition (on customer) <--- this is responsible for dividing work among
partitions and thus cores
Sort(customer, time)
Scan
```
Here is an example from the tests:
https://github.com/apache/datafusion/blob/a0e9887550065324320c6fd52001aa23bae67485/datafusion/sqllogictest/test_files/window_topk_pushdown.slt#L112-L118
However, for the case in question, when we remove the `PARTITION BY
customer` from the `OVER` clause
```sql
SELECT
customer, time, cpu_usage,
-- moving average over the current row and the 5 previous rows
AVG(cpu_usage) OVER (ORDER BY time ROWS BETWEEN 5 PRECEDING AND CURRENT
ROW) as avg_cpu
FROM
metrics;
```
The plan looks like this (no `Repartition` and it executes in a single core)
```sql
BoundedWindowExec(...) <-- single partition, single core
Sort(customer, time)
Scan
```
Here is an example
https://github.com/apache/datafusion/blob/a0e9887550065324320c6fd52001aa23bae67485/datafusion/sqllogictest/test_files/window.slt#L4100-L4103
<details><summary>Table definition</summary>
<p>
```sql
CREATE TABLE metrics (
time TIMESTAMP,
customer VARCHAR,
cpu_usage DOUBLE
);
INSERT INTO metrics VALUES
(TIMESTAMP '2026-01-01 00:00:00', 'acme', 10.0),
(TIMESTAMP '2026-01-01 00:01:00', 'acme', 20.0),
(TIMESTAMP '2026-01-01 00:02:00', 'acme', 30.0),
(TIMESTAMP '2026-01-01 00:03:00', 'acme', 40.0),
(TIMESTAMP '2026-01-01 00:04:00', 'acme', 50.0),
(TIMESTAMP '2026-01-01 00:00:00', 'globex', 15.0),
(TIMESTAMP '2026-01-01 00:01:00', 'globex', 25.0),
(TIMESTAMP '2026-01-01 00:02:00', 'globex', 35.0),
(TIMESTAMP '2026-01-01 00:03:00', 'globex', 45.0),
(TIMESTAMP '2026-01-01 00:04:00', 'globex', 55.0);
```
</p>
</details>
The performance currently relied
### Describe the solution you'd like
* DataFusion can use multiple cores to evaluate window functions for large
windows
* Distributed systems such as Ballista can use multiple machines to compute
the window function results in parallel
* DataFusion can complete window function queries even when the windows are
larger than available memory
### Describe alternatives you've considered
Here are some ideas for improving this usecase:
1. https://github.com/apache/datafusion/pull/23124 from @Dandandan
### Additional context
## Prefix Sums / Prefix Scan
Used for cumulative windows such as `ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW`.
- Paper: [Prefix Sums and Their
Applications](https://www.cs.cmu.edu/~guyb/papers/Ble93.pdf)
- PoC from @avantgardnerio:
https://github.com/coralogix/arrow-datafusion/pull/426
## "Halo" Rows / Parallel Bounded Windows
Used for bounded window frames where each output partition needs some rows
from neighboring ranges. Note I don't think "halo rows" is a standard database
term -- it means "overlapping partitions with replicated boundary rows"
- Main PoC from @avantgardnerio:
https://github.com/apache/datafusion/pull/23026
- Runtime partition extrema design:
https://github.com/apache/datafusion/issues/23089
- Runtime partition extrema PR:
https://github.com/apache/datafusion/pull/23090
- Dynamic range partitioning design:
https://github.com/apache/datafusion/issues/23093
- Dynamic range partitioning PR:
https://github.com/apache/datafusion/pull/23094
## WindowAggExec Memory / Spilling
Avoid `WindowAggExec` OOMs by processing partitions incrementally and
spilling when needed.
- Support spilling for `WindowAggExec`:
https://github.com/apache/datafusion/issues/22946
- POC from @wirybeaver: https://github.com/apache/datafusion/pull/22947
## Window Frame Evaluation / Vectorization
Improve the CPU and memory efficiency for window frame boundary
calculations:
- https://github.com/apache/datafusion/issues/7518
## Intra operator parallelism
Alternative/complementary approach: keep the logical window as one
partition, but parallelize execution inside the operator.
- Paper: [Efficient Processing of Window Functions in Analytical SQL
Queries](https://www.vldb.org/pvldb/vol8/p1058-leis.pdf), PVLDB 2015
- General issue from @2010YOUY01:
https://github.com/apache/datafusion/issues/23174
- Window-specific issue from @2010YOUY01:
https://github.com/apache/datafusion/issues/22355
- PoC from @2010YOUY01: https://github.com/apache/datafusion/pull/22356
- Related paper on window algorithms:
https://www.vldb.org/pvldb/vol8/p1058-leis.pdf
## Adaptive Query Execution / Runtime-Informed Planning
Relevant if DataFusion wants a general framework for runtime stats,
dynamic split points, repartition choices, skew handling, and similar
optimizations.
- AQE issue from @avantgardnerio:
https://github.com/apache/datafusion/issues/23194
- AQE-lite PoC from @avantgardnerio:
https://github.com/apache/datafusion/pull/23167
## Related Sort / Merge Parallelism
Relevant because single-partition windows often sit downstream of sort /
merge bottlenecks.
- Parallel sort-preserving merge PR from @Dandandan:
https://github.com/apache/datafusion/pull/23124
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]