2010YOUY01 opened a new issue, #23273: URL: https://github.com/apache/datafusion/issues/23273
### Is your feature request related to a problem or challenge? The proposed refactor in this issue can make window execution implementation simpler and more extensible. I think it is a necessary step if we want to invest further in better vectorization or more parallel execution paradigms mentioned in the issue: - https://github.com/apache/datafusion/issues/23197 The existing structure is not ideal: if we continue evolving it in its current form, future optimization work will likely require more special cases, making the implementation even harder to maintain and extend. To sanity-check whether this refactor makes sense, we can run a few thought experiments using the potential optimizations mentioned in the issue. The examples include better parallelism and vectorization for fixed frames, segment-tree-based parallelism, etc. These optimizations are natural extensions of the ideal architecture introduced by this issue, but they are hard to add cleanly with the existing structure. This issue explains, in order: - The issues in the existing implementation - How an ideal structure should look - A possible implementation plan ### Issue with existing implementation An ideal architecture requires a clean separation between the logical and physical layer of window function execution. - The logical layer describes what should be calculated (what are the expr for window frame/partition/...) - The physical layer answers 'how' by implementing the required methods (APIs that take the input batch and calculate the window expr output). The major issue is that the existing abstraction layers leak into adjacent layers. The below figured listed several trait methods that is currently sitting in the wrong abstraction layer: <img width="1077" height="639" alt="Image" src="https://github.com/user-attachments/assets/8b2ae768-4478-4d2b-8736-893b0a0ef6eb" /> I think the original design goal was: - `WindowExpr` is supposed to be the logical layer. - `PartitionEvaluator` is supposed to be the physical layer. Over time, however, these responsibilities have become mixed. The decision-making flow has become bidirectional, and the implementation now relies on special cases to work around abstraction leaks. My guess is that these are mostly hacks accumulated over the years. I cannot find a strong reason to preserve this design. The ideal design should place each abstraction at the right layer, making the implementation simpler and easier to extend. <img width="955" height="767" alt="Image" src="https://github.com/user-attachments/assets/c5fb2386-034a-4ff6-9c59-ea460aeb745c" /> ### Ideal Architecture The gist is that we should fully separate the logical and physical layers of window execution. - Logical layer: `WindowCall` purely describes what we want to calculate. It contains the expressions for arguments, partitioning, ordering, and frame bounds. - Physical layer: `WindowKernel` purely provides the methods needed for execution. It represents the selected execution algorithm for a specific window call. This design brings below benefits: - Simplicity: the control flow is one directional, `WindowCall` decides what window kernel to use, and window kernel purely provide methods for execution. - Extensibility: adding new parallelism scheme/or improve vectorized fast path means adding one window kernel, no deep structural changes needed. #### Workflow ```text SQL / logical physical planning -> WindowCall // pure description: function, args, partition/order/frame -> WindowKernel selection // physical execution protocol chosen from shape + capabilities -> WindowExec // execution routing: choose stream based on selected kernel -> NaiveAccumulatorStream -> SlidingAccumulatorStream -> other specialized streams ``` In rough terms: ```rust /// pure description: function, args, partition/order/frame struct WindowCall { name: String, field: FieldRef, function: WindowFunctionKind, args: Vec<Arc<dyn PhysicalExpr>>, filter: Option<Arc<dyn PhysicalExpr>>, partition_by: Vec<Arc<dyn PhysicalExpr>>, order_by: Vec<PhysicalSortExpr>, frame: Arc<WindowFrame>, options: WindowOptions, } /// pure execution: provided methods needed for a specific path enum WindowKernel { /// Derived from existing Accumulator without `retract_batch` /// A nested-loop algorithm will be used. NaiveAccumulator(Box<dyn NaiveAccumulatorWindowKernel>), /// Derived from existing Accumulator with `retract_batch` /// A sliding window algorithm will be. SlidingAccumulator(Box<dyn SlidingAccumulatorWindowKernel>), } ``` DataFusion's existing `Accumulator` API already contains the primitives for two useful aggregate window algorithms: - `update_batch()` plus `evaluate()` can recompute a result for any frame. This supports a naive nested-loop fallback for all accumulators. - `retract_batch()` plus `supports_retract_batch()` allow incremental sliding-window execution when rows leave the frame. If the accumulator does not support `retract_batch()`, a naive nested-loop evaluation can be used. If `retract_batch()` is supported and the window frame is a fixed sliding frame, a sliding-window algorithm can be used for optimization. Then the implication for newly added user-defined window function is, it should only support the naive method to make it work universally (for aggregate function in window cases, it requires only `update_batch()` for the above naive path), but it can optionally support more fast paths (`retract_batch` for sliding window, or even vectorized API in the future), then the optimizer/execution will route that into the fast path if the query expression shape allows. Here is a simple example to walk through the above workflow. #### Workload 1: Sliding Aggregate Example query: ```sql SELECT avg(x) OVER ( PARTITION BY k ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT ROW ) AS avg_x FROM t; ``` Planning: 1. `WindowCall` holds the logical description: `avg(x)`, `PARTITION BY k`, `ORDER BY ts`, and `ROWS BETWEEN 2 PRECEDING AND CURRENT ROW`. 2. The planner sees that this is an aggregate window over a fixed moving frame. 3. The planner asks the aggregate accumulator whether it supports `retract_batch()`. `avg` does; 4. The planner chooses `SlidingAccumulatorWindowKernel`. 5. `WindowAggExec` routes execution to a dedicated `SlidingAccumulatorStream`, because the selected kernel has the sliding-window execution protocol. The kernel API can stay small because it only represents one physical protocol: ```rust trait SlidingAccumulatorWindowKernel { fn evaluate_partition( &mut self, input: &PartitionWindowInput<'_>, frame: &FrameIndex, ) -> Result<ArrayRef>; } struct PartitionWindowInput<'a> { batch: &'a RecordBatch, args: Vec<ArrayRef>, filter: Option<BooleanArray>, } ``` Very rough sliding-window algorithm sketch: ```python acc = create_avg_accumulator() current_frame = range(0, 0) output = [] for row_idx in partition_rows: next_frame = frame_for(row_idx) # Rows that were in the previous frame but are not in the next frame. leaving = current_frame.start .. next_frame.start if leaving is not empty: acc.retract_batch(values_for(leaving)) # Rows that are in the next frame but were not in the previous frame. entering = current_frame.end .. next_frame.end if entering is not empty: acc.update_batch(values_for(entering)) output.append(acc.evaluate()) current_frame = next_frame ``` This is the fast path: each input row is added and removed at most once, so the cost is linear in the partition size for row-based fixed frames. #### Workload 2: Naive Aggregate Fallback Example query: ```sql SELECT my_udaf(x) OVER ( PARTITION BY k ORDER BY ts ROWS BETWEEN t.n_gap PRECEDING AND CURRENT ROW ) AS v FROM t; ``` Assume `my_udaf` is a user-defined aggregate accumulator that supports `update_batch()` and `evaluate()`, but does not support `retract_batch()`. Also the window frame `t.n_gap` preceding can be arbitrary value, it's not supported by the sliding window algorithm. Planning: 1. `WindowCall` holds the logical description: `my_udaf(x)`, `PARTITION BY k`, `ORDER BY ts`, and `ROWS BETWEEN 2 PRECEDING AND CURRENT ROW`. 2. The planner sees that this is an aggregate window (without `retract_batch()` capability), and also over a non-fixed moving frame. 3. The planner chooses `NaiveAccumulatorWindowKernel`. 4. `WindowAggExec` routes execution to a dedicated `NaiveAccumulatorStream`. The kernel API can again stay small: ```rust trait NaiveAccumulatorWindowKernel { fn evaluate_partition( &self, input: &PartitionWindowInput<'_>, frame: &FrameIndex, ) -> Result<ArrayRef>; } ``` Naive nested-loop algorithm sketch: ```python output = [] for row_idx in partition_rows: frame = frame_for(row_idx) # This is slower, but it only needs update_batch() and evaluate(). acc = create_my_udaf_accumulator() acc.update_batch(values_for(frame)) output.append(acc.evaluate()) ``` ### Implementation Plan I plan to do some prototyping to work out a practical refactoring plan. The known goals are: - Remove all three `WindowExpr` implementations and use `WindowCall` as the pure logical layer. - Use `WindowKernel` to replace the `PartitionEvaluator` - `PartitionEvaluator` is now a large trait that uses 3+ flags to decide behavior. I think it is hard to use and extend; small, focused traits inside `WindowKernel` enum variants should be better. - Provide an adapter like `WindowKernel::LegacyPartitionEvaluator` to make the refactor practical. - Evolve `WindowAggExec` in this direction and avoid changing `BoundedWindowAggExec` - See https://github.com/apache/datafusion/issues/23197#issuecomment-4806401319 ### Describe the solution you'd like _No response_ ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
