2010YOUY01 opened a new issue, #22355: URL: https://github.com/apache/datafusion/issues/22355
### Is your feature request related to a problem or challenge? ## Background For a refresher on window function definitions, see Ch.2 of the following paper: [https://www.vldb.org/pvldb/vol8/p1058-leis.pdf](https://www.vldb.org/pvldb/vol8/p1058-leis.pdf) Currently, DataFusion has two `ExecutionPlan` implementations for window functions. Both assume that repartitioning and ordering requirements are satisfied outside the window operator itself (the optimizer inserts `RepartitionExec` and `SortExec` on the input side of the window operator). They differ in execution strategy: * `WindowAggExec`: fallback implementation. It caches the entire input, identifies partition boundaries, then evaluates window partitions one by one. * `BoundedWindowAggExec`: memory-efficient variant. When the window frame is bounded, it only retains the batches required to evaluate the current window row. ## Pain Points Both existing implementations only support parallelism through repartitioning. This works well when the window expression includes `PARTITION BY` and the partition cardinality is high enough to utilize all CPU cores. For queries like: ```sql -- Only one global partition SELECT v1, avg(v1) OVER ( ORDER BY v1 ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING ) AS moving_avg FROM generate_series(1000000000) AS t1(v1); ``` only a single CPU core performs the work. ## Proposed Solution For the above workload, which uses a simple aggregate function with a fixed `ROWS` window frame, a possible solution would be: * Introduce intra-operator parallelism: each worker evaluates a separate chunk of rows. * Improve memory efficiency by incrementally loading input batches. * Introduce a vectorized aggregate API: evaluate window expression columns in batches so there is no per-row function-call overhead when adjusting frame boundaries. <img width="720" height="505" alt="Image" src="https://github.com/user-attachments/assets/adf33f40-f6cb-434c-9e8c-46f1aed2faab" /> ## Implementation Plan WIP, I'm still thinking about how to design the common operator state machine and window expr API. Here is the challenge: Making a single workload fast is relatively straightforward. The hard part is designing a simple, general interface that are extensible for different optimizations. Here are some different window expr types: * Window frames such as `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` * Aggregate functions such as `max(x)` * Dynamic frame expressions such as `ROWS BETWEEN v1 PRECEDING AND v2 FOLLOWING` Most of these workloads can support intra-partition parallelism, but the implementation strategy differs significantly. For example, window frames with highly dynamic ranges may benefit from segment-tree-based approaches as described in the paper above. I'm trying to prototype other workload types to figure out the right design, so that we could easily extend acceleration kernels for window exprs with different characteristics. ### Describe the solution you'd like _No response_ ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
