mustafasrepo opened a new pull request, #4691: URL: https://github.com/apache/arrow-datafusion/pull/4691
# Which issue does this PR close? Closes [#4686](https://github.com/apache/arrow-datafusion/issues/4686) # Rationale for this change After physical plan construction, we may end up with `SortExec`s that are unnecessary. This happens fairly often in realistic use cases, but let's start with a simple yet unrealistic example to illustrate the issue: Assume that we somehow end up with a physical plan fragment such as the one below: ``` sql "SortExec: [a@0 ASC]", " SortExec: [b@1 ASC]", ``` Since second `SortExec` overwrites the first one, the first `SortExec` is useless; we can safely remove it. We propose a rule that analyzes the physical plan and removes unnecessary `SortExec`s (i.e. `SortExec`s followed by layers in the execution plan that don't require an input ordering and don't maintain their input ordering). Obviously, this rule would solve the toy problem above. Let's discuss more realistic scenarios. Consider the following query from our tests: ```sql SELECT count(*) as global_count FROM (SELECT count(*), c1 FROM aggregate_test_100 WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' GROUP BY c1 ORDER BY c1 ) AS a ``` Its physical plan is as follows: ``` sql "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]", " AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]", " CoalescePartitionsExec", " AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]", " RepartitionExec: partitioning=RoundRobinBatch(8)", " SortExec: [c1@0 ASC NULLS LAST]", " CoalescePartitionsExec", " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8)", " AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]", " CoalesceBatchesExec: target_batch_size=4096", " FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", " RepartitionExec: partitioning=RoundRobinBatch(8)", ``` This plan contains a `SortExec` because of the `ORDER BY` clause in the subquery. However, the `AggregateExec` layer above the sort doesn't require any input ordering and doesn't maintain its input ordering either. Hence, the `SortExec` in this plan is unnecessary. This basic rule removes such unnecessary `SortExec`s from physical plans. Furthermore, some executors enable further optimizations for removing `SortExec`s before them. As an example, consider `WindowAggExec`. Currently, we remove the `SortExec`s before `WindowAggExec`s if input sorting direction and required sorting direction are same. However, most of the window functions can also calculate their results with reverse order. Therefore, we can remove `SortExec`s if we slightly modify the `WindowAggExec` above it. Consider the following query as an example: ```sql SELECT c9, SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1, SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 FROM aggregate_test_100 LIMIT 5 ``` This PR transforms the physical plan of the query above from ``` sql "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]", " GlobalLimitExec: skip=0, fetch=5", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]", " SortExec: [c9@1 ASC NULLS LAST]", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]", " SortExec: [c9@0 DESC]", ``` into ``` sql "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@0 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]", " GlobalLimitExec: skip=0, fetch=5", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]", " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })]", " SortExec: [c9@0 DESC]", ``` # What changes are included in this PR? In order to utilize this optimization, window functions should implement a `reverse_expr` method. We have added support for `SUM`, `COUNT` for aggregate functions (adding support for other window functions should be similar to these). For built-in window functions we added support `FIRST_VALUE`, `LAST_VALUE`, `LEAD`, `LAG`. Theoretically `NTH_VALUE` can produce its result in reverse order also. However, support for `NTH_VALUE` is left as future work. AFAICT, other built-in window functions cannot produce their result in reverse order and thus not can not partake in this optimization. # Are these changes tested? We added tests asserting new optimized physical plans for various cases. Approximately 1000 line of the changes comes from the additional tests. # Are there any user-facing changes? None. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
