mustafasrepo opened a new pull request, #4777:
URL: https://github.com/apache/arrow-datafusion/pull/4777

   # Which issue does this PR close?
   
   Improves the situation on 
https://github.com/apache/arrow-datafusion/issues/4285. 
   
   # Rationale for this change
   
   NOTE: Below discussion is a simplification of a more detailed exposition in 
[the streaming execution 
proposal](https://synnada.notion.site/Streaming-Window-Design-581082c1fd60442ca7caaff4d6a63cd7).
   
   Unlike how the current implementation works, queries involving window 
expressions can actually be executed without materializing the entire table in 
memory when certain conditions are met. These conditions for a bounded 
implementation are as follows:
   
   - In order to run `WindowExec` with bounded memory (without seeing the whole 
table), window frame boundaries of the given window expression should be 
bounded; i.e. we cannot run queries involving either `UNBOUNDED PRECEDING` or 
`UNBOUNDED FOLLOWING`.
   - We should be able to produce query results as we scan the table 
incrementally. For this to be possible, columns used in the `ORDER BY` clauses 
should already be aligned with the `ORDER BY` specification. With this 
condition is met, we can remove the `PhysicalSort` expression before 
`WindowExec` and generate results as we scan the table.
   
   If the above conditions are met, we can run a query like the one below
   
   ```rust
   SELECT
       SUM(inc_col) OVER(ORDER BY inc_col ASC RANGE BETWEEN 1 PRECEDING AND 10 
FOLLOWING)
   FROM annotated_data
   ```
   
    with a bounded memory algorithm. Consider the physical plan of the above 
query:
   
   ```rust
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                 |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
                                                                                
                                                       |
   | physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY 
[annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 
FOLLOWING@0 as SUM(annotated_data.inc_col)]             |
   |               |   WindowAggExec: wdw=[SUM(annotated_data.inc_col): 
Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: None })] |
   |               |     SortExec: [inc_col@0 ASC NULLS LAST]                   
                                                                                
                                                 |
   |               |       MemoryExec: partitions=1, partition_sizes=[51]       
                                                                                
                                                 |
   |               |                                                            
                                                                                
                                                 |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   If we know that the column `inc_col` in the table is monotonically 
increasing, we can deduce that the `SortExec: [inc_col@0 ASC NULLS LAST`] step 
in the physical plan is unnecessary. Hence, we can remove this step from the 
physical plan (see #4691). Furthermore, we also know that the frame `RANGE 
BETWEEN 1 PRECEDING AND 10 FOLLOWING` describes a bounded range. Therefore, we 
can turn the above physical plan into the one below:
   
   ```rust
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                       |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
                                                                                
                                                             |
   | physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY 
[annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 
FOLLOWING@0 as SUM(annotated_data.inc_col)]                   |
   |               |   BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): 
Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: None })] |
   |               |     MemoryExec: partitions=1, partition_sizes=[51]         
                                                                                
                                                       |
   |               |                                                            
                                                                                
                                                       |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   ## Performance Indicators
   
   We analyzed the new *Bounded Memory Window Executor* and compared it with 
the existing implementation via
   
   - Benchmarking (criterion) for CPU-time analysis, and
   - Memory profiling the binary executable (Heaptrack).
   
   Test conditions for both executors is as follows:
   
   - No partition
   - Single query (`SUM (x) OVER (RANGE ORDER BY a RANGE BETWEEN 10 PRECEDING 
AND 10 FOLLOWING`))
   - No sorting is included since the input data is already sorted.
   - The input is generated as a `RecordBatch` stream. They have varying sizes 
in the range of 0 to 50.
   
   NOTE: We did not include the benchmarking code in this PR.
   
   ### Benchmarking
   
   We measure the execution duration of each operator. The input size is 
`100_000`.
   
   - Average execution time for `WindowAggExec`: **226.72 ms**
   - Average execution time for `BoundedWindowAggExec`: **154.71 ms**
   
   which shows that overall performance improves. This is due to searching 
`RANGE` boundaries in a smaller batch since we maintain a bounded state.
   
   ### Heaptrack
   
   We used a simple test case for memory consumption; the input size is 
`1_000_000`.
   
   **WindowAggExec Memory Profiling**
   
   - peak heap memory consumption: **161,9MB** after 1min23s
   - peak RSS (including headtrack overhead): **202,7MB**
   
   ![Memory usage of 
`WindowAggExec`](https://user-images.githubusercontent.com/106137913/209624485-9bdc0fb9-134b-41e0-b241-0026c6b72803.png)
   
   **BoundedWindowAggExec Profiling**
   
   - peak heap memory consumption: **78,6MB** after 08.633s
   - peak RSS (including heaptrack overhead):**115,4MB**
   
   The finding supports that the sliding window approach is memory efficient.
   
   ![Memory usage of 
`BoundedWindowAggExec`](https://user-images.githubusercontent.com/106137913/209624446-ded5262d-683f-482e-988c-7bed9d170c20.png)
   
   # What changes are included in this PR?
   
   This PR includes a bounded memory variant of the already-existing 
`WindowAggExec`. We add a rule to choose between   `WindowAggExec` and  
`BoundedWindowAggExec`. The strategy is as follows: if `window_expr` can 
generate its result without seeing whole table we choose `BoundedWindowAggExec` 
otherwise we choose `WindowAggExec`. Please note that it is possible (but not 
certainly trivial) to unify these executors. However, we left this as future 
work.
   
   In this implementation, we also added bounded execution support for `COUNT`, 
`SUM`, `MIN`, `MAX`  among `AggregateFunction`s. Among 
`BuiltInWindowFunction`s, we added support for `ROW_NUMBER`, `RANK`, 
`DENSE_RANK`, `LAG`, `LEAD`, `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE`. If the 
window function used is different than these, we fall back to `WindowAggExec`. 
   
   # Are these changes tested?
   
   We have added fuzzy tests comparing the results of `WindowAggExec` and 
`BoundedWindowAggExec`. We also added sql tests that runs on already sorted 
parquet file. Approximately 700 lines of the changes come from test and test 
utils.
   
   # Are there any user-facing changes?
   
   None.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to