mustafasrepo opened a new issue, #4686:
URL: https://github.com/apache/arrow-datafusion/issues/4686

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   A clear and concise description of what the problem is. Ex. I'm always 
frustrated when [...] 
   (This section helps Arrow developers understand the context and *why* for 
this feature, in addition to  the *what*)
   When we write complex queries final physical plan may include `SortExec`s 
that are unnecessary. Consider the query below 
   ```sql
   SELECT count(*) as global_count FROM
                    (SELECT count(*), c1
                     FROM aggregate_test_100
                     WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
                     GROUP BY c1
                     ORDER BY c1 ) AS a
   ```
   Its physical plan is as follows
   ```sql
       "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
       "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
       "    CoalescePartitionsExec",
       "      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]",
       "        RepartitionExec: partitioning=RoundRobinBatch(2)",
       "          SortExec: [c1@0 ASC NULLS LAST]",
       "            CoalescePartitionsExec",
       "              AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]",
       "                CoalesceBatchesExec: target_batch_size=4096",
       "                  RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2)",
       "                    AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]",
       "                      CoalesceBatchesExec: target_batch_size=4096",
       "                        FilterExec: c13@1 != 
C2GT5KVyOPZpgKVl110TyZO0NcJ434",
       "                          RepartitionExec: 
partitioning=RoundRobinBatch(2)",
   ```
   `SortExec` in the physical plan is unnecessary because following executor 
(RepartitionExec) doesn't require input ordering and doesn't maintain input 
ordering either. Hence even if we sort before this executor, the executor will 
overwrite sorting and ordering will be lost. Add support for removing 
`SortExec` in these scenarios.
   
   **Describe the solution you'd like**
   A clear and concise description of what you want to happen.
   We can traverse physical plan bottom up and if ordering information 
introduced by `SortExec` is overwritten by another executor without using 
ordering information itself. We can remove corresponding `SortExec` from the 
plan. Additionally, some executors can produce their result with reversed order 
also. Consider the query below 
   ```sql
   SELECT count(*) OVER(ORDER BY c9 ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING) 
as cnt1,
                        count(*) OVER(ORDER BY c9 DESC ROWS BETWEEN 3 PRECEDING 
AND 5 FOLLOWING) as cnt2
                       FROM aggregate_test_100
   ``` 
   whose physical plan is 
   ```
       "ProjectionExec: expr=[COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING@0 as cnt1, 
COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 
3 PRECEDING AND 5 FOLLOWING@1 as cnt2]",
       "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
       "    SortExec: [c9@1 ASC NULLS LAST]",
       "      WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
       "        SortExec: [c9@0 DESC]",
   ```
   Its physical plan include second Sorting to reverse ordering. However, count 
aggregator can produce its result with reverse order also. By exploiting this 
property we can turn above physical plan into the below
   ```
       "ProjectionExec: expr=[COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING@0 as cnt1, 
COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 
3 PRECEDING AND 5 FOLLOWING@1 as cnt2]",
       "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
       "    WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
       "      SortExec: [c9@0 DESC]",
   ```
   
   
   
   **Describe alternatives you've considered**
   A clear and concise description of any alternative solutions or features 
you've considered.
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to