2010YOUY01 opened a new issue, #22355:
URL: https://github.com/apache/datafusion/issues/22355

   ### Is your feature request related to a problem or challenge?
   
   ## Background
   
   For a refresher on window function definitions, see Ch.2 of the following 
paper:
   
[https://www.vldb.org/pvldb/vol8/p1058-leis.pdf](https://www.vldb.org/pvldb/vol8/p1058-leis.pdf)
   
   Currently, DataFusion has two `ExecutionPlan` implementations for window 
functions. Both assume that repartitioning and ordering requirements are 
satisfied outside the window operator itself (the optimizer inserts 
`RepartitionExec` and `SortExec` on the input side of the window operator). 
They differ in execution strategy:
   
   * `WindowAggExec`: fallback implementation. It caches the entire input, 
identifies partition boundaries, then evaluates window partitions one by one.
   * `BoundedWindowAggExec`: memory-efficient variant. When the window frame is 
bounded, it only retains the batches required to evaluate the current window 
row.
   
   ## Pain Points
   
   Both existing implementations only support parallelism through 
repartitioning. This works well when the window expression includes `PARTITION 
BY` and the partition cardinality is high enough to utilize all CPU cores.
   
   For queries like:
   
   ```sql
   -- Only one global partition
   SELECT
     v1,
     avg(v1) OVER (
       ORDER BY v1
       ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
     ) AS moving_avg
   FROM generate_series(1000000000) AS t1(v1);
   ```
   
   only a single CPU core performs the work.
   
   ## Proposed Solution
   
   For the above workload, which uses a simple aggregate function with a fixed 
`ROWS` window frame, a possible solution would be:
   
   * Introduce intra-operator parallelism: each worker evaluates a separate 
chunk of rows.
   * Improve memory efficiency by incrementally loading input batches.
   * Introduce a vectorized aggregate API: evaluate window expression columns 
in batches so there is no per-row function-call overhead when adjusting frame 
boundaries.
   
   <img width="720" height="505" alt="Image" 
src="https://github.com/user-attachments/assets/adf33f40-f6cb-434c-9e8c-46f1aed2faab";
 />
   
   ## Implementation Plan
   
   WIP, I'm still thinking about how to design the common operator state 
machine and window expr API. Here is the challenge:
   
   Making a single workload fast is relatively straightforward. The hard part 
is designing a simple, general interface that are extensible for different 
optimizations.
   
   Here are some different window expr types:
   
   * Window frames such as `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`
   * Aggregate functions such as `max(x)`
   * Dynamic frame expressions such as `ROWS BETWEEN v1 PRECEDING AND v2 
FOLLOWING`
   
   Most of these workloads can support intra-partition parallelism, but the 
implementation strategy differs significantly. For example, window frames with 
highly dynamic ranges may benefit from segment-tree-based approaches as 
described in the paper above.
   
   I'm trying to prototype other workload types to figure out the right design, 
so that we could easily extend acceleration kernels for window exprs with 
different characteristics.
   
   
   ### Describe the solution you'd like
   
   _No response_
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to