mustafasrepo opened a new pull request, #4777: URL: https://github.com/apache/arrow-datafusion/pull/4777
# Which issue does this PR close? Improves the situation on https://github.com/apache/arrow-datafusion/issues/4285. # Rationale for this change NOTE: Below discussion is a simplification of a more detailed exposition in [the streaming execution proposal](https://synnada.notion.site/Streaming-Window-Design-581082c1fd60442ca7caaff4d6a63cd7). Unlike how the current implementation works, queries involving window expressions can actually be executed without materializing the entire table in memory when certain conditions are met. These conditions for a bounded implementation are as follows: - In order to run `WindowExec` with bounded memory (without seeing the whole table), window frame boundaries of the given window expression should be bounded; i.e. we cannot run queries involving either `UNBOUNDED PRECEDING` or `UNBOUNDED FOLLOWING`. - We should be able to produce query results as we scan the table incrementally. For this to be possible, columns used in the `ORDER BY` clauses should already be aligned with the `ORDER BY` specification. With this condition is met, we can remove the `PhysicalSort` expression before `WindowExec` and generate results as we scan the table. If the above conditions are met, we can run a query like the one below ```rust SELECT SUM(inc_col) OVER(ORDER BY inc_col ASC RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING) FROM annotated_data ``` with a bounded memory algorithm. Consider the physical plan of the above query: ```rust +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | | physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@0 as SUM(annotated_data.inc_col)] | | | WindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })] | | | SortExec: [inc_col@0 ASC NULLS LAST] | | | MemoryExec: partitions=1, partition_sizes=[51] | | | | +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ``` If we know that the column `inc_col` in the table is monotonically increasing, we can deduce that the `SortExec: [inc_col@0 ASC NULLS LAST`] step in the physical plan is unnecessary. Hence, we can remove this step from the physical plan (see #4691). Furthermore, we also know that the frame `RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING` describes a bounded range. Therefore, we can turn the above physical plan into the one below: ```rust +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | | physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@0 as SUM(annotated_data.inc_col)] | | | BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })] | | | MemoryExec: partitions=1, partition_sizes=[51] | | | | +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ``` ## Performance Indicators We analyzed the new *Bounded Memory Window Executor* and compared it with the existing implementation via - Benchmarking (criterion) for CPU-time analysis, and - Memory profiling the binary executable (Heaptrack). Test conditions for both executors is as follows: - No partition - Single query (`SUM (x) OVER (RANGE ORDER BY a RANGE BETWEEN 10 PRECEDING AND 10 FOLLOWING`)) - No sorting is included since the input data is already sorted. - The input is generated as a `RecordBatch` stream. They have varying sizes in the range of 0 to 50. NOTE: We did not include the benchmarking code in this PR. ### Benchmarking We measure the execution duration of each operator. The input size is `100_000`. - Average execution time for `WindowAggExec`: **226.72 ms** - Average execution time for `BoundedWindowAggExec`: **154.71 ms** which shows that overall performance improves. This is due to searching `RANGE` boundaries in a smaller batch since we maintain a bounded state. ### Heaptrack We used a simple test case for memory consumption; the input size is `1_000_000`. **WindowAggExec Memory Profiling** - peak heap memory consumption: **161,9MB** after 1min23s - peak RSS (including headtrack overhead): **202,7MB**  **BoundedWindowAggExec Profiling** - peak heap memory consumption: **78,6MB** after 08.633s - peak RSS (including heaptrack overhead):**115,4MB** The finding supports that the sliding window approach is memory efficient.  # What changes are included in this PR? This PR includes a bounded memory variant of the already-existing `WindowAggExec`. We add a rule to choose between `WindowAggExec` and `BoundedWindowAggExec`. The strategy is as follows: if `window_expr` can generate its result without seeing whole table we choose `BoundedWindowAggExec` otherwise we choose `WindowAggExec`. Please note that it is possible (but not certainly trivial) to unify these executors. However, we left this as future work. In this implementation, we also added bounded execution support for `COUNT`, `SUM`, `MIN`, `MAX` among `AggregateFunction`s. Among `BuiltInWindowFunction`s, we added support for `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `LAG`, `LEAD`, `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE`. If the window function used is different than these, we fall back to `WindowAggExec`. # Are these changes tested? We have added fuzzy tests comparing the results of `WindowAggExec` and `BoundedWindowAggExec`. We also added sql tests that runs on already sorted parquet file. Approximately 700 lines of the changes come from test and test utils. # Are there any user-facing changes? None. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
