Dandandan opened a new issue, #3516:
URL: https://github.com/apache/arrow-datafusion/issues/3516

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   As explained by @alamb in the user defined plan:
   >  TopK Background:
    A "Top K" node is a common query optimization which is used for
    queries such as "find the top 3 customers by revenue". The
   (simplified) SQL for such a query might be:
    ```sql
    CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
     STORED AS CSV location 'tests/customer.csv';
   
    SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
    ```
   > And a naive plan would be:
   
   ```
   > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 
3;
    +--------------+----------------------------------------+
    | plan_type    | plan                                   |
    +--------------+----------------------------------------+
   | logical_plan | Limit: 3                               |
    |              |   Sort: #revenue DESC NULLS FIRST      |
    |              |     Projection: #customer_id, #revenue |
    |              |       TableScan: sales |
    +--------------+----------------------------------------+
    ```
   
   >  While this plan produces the correct answer, the careful reader
    will note it fully sorts the input before discarding everything
    other than the top 3 elements.
   
   In the ClickBench benchmarks we can see many queries that might benefit from 
this optimization, but we can see they are converted to a "naive" plan. See for 
example query 9:
   
   ```
   9. EXPLAIN SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP 
BY "RegionID" ORDER BY u DESC LIMIT 10;
   
   -----------------------------------------
   
   
   DataFusion CLI v12.0.0
   0 rows in set. Query took 0.062 seconds.
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                   |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: skip=0, fetch=10                                    
                                                                                
                   |
   |               |   Sort: #u DESC NULLS FIRST                                
                                                                                
                   |
   |               |     Projection: #hits.RegionID, #COUNT(DISTINCT 
hits.UserID) AS u                                                               
                              |
   |               |       Projection: #group_alias_0 AS RegionID, 
#COUNT(alias1) AS COUNT(DISTINCT hits.UserID)                                   
                                |
   |               |         Aggregate: groupBy=[[#group_alias_0]], 
aggr=[[COUNT(#alias1)]]                                                         
                               |
   |               |           Aggregate: groupBy=[[#hits.RegionID AS 
group_alias_0, #hits.UserID AS alias1]], aggr=[[]]                              
                             |
   |               |             TableScan: hits projection=[RegionID, UserID]  
                                                                                
                   |
   | physical_plan | GlobalLimitExec: skip=0, fetch=10                          
                                                                                
                   |
   |               |   SortExec: [u@1 DESC]                                     
                                                                                
                   |
   |               |     CoalescePartitionsExec                                 
                                                                                
                   |
   |               |       ProjectionExec: expr=[RegionID@0 as RegionID, 
COUNT(DISTINCT hits.UserID)@1 as u]                                             
                          |
   |               |         ProjectionExec: expr=[group_alias_0@0 as RegionID, 
COUNT(alias1)@1 as COUNT(DISTINCT hits.UserID)]                                 
                   |
   |               |           AggregateExec: mode=FinalPartitioned, 
gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                    
                              |
   |               |             CoalesceBatchesExec: target_batch_size=4096    
                                                                                
                   |
   |               |               RepartitionExec: partitioning=Hash([Column { 
name: "group_alias_0", index: 0 }], 16)                                         
                   |
   |               |                 AggregateExec: mode=Partial, 
gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                    
                                 |
   |               |                   AggregateExec: mode=FinalPartitioned, 
gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]             
                      |
   |               |                     CoalesceBatchesExec: 
target_batch_size=4096                                                          
                                     |
   |               |                       RepartitionExec: 
partitioning=Hash([Column { name: "group_alias_0", index: 0 }, Column { name: 
"alias1", index: 1 }], 16)               |
   |               |                         AggregateExec: mode=Partial, 
gby=[RegionID@0 as group_alias_0, UserID@1 as alias1], aggr=[]                  
                         |
   |               |                           RepartitionExec: 
partitioning=RoundRobinBatch(16)                                                
                                   |
   |               |                             ParquetExec: limit=None, 
partitions=[home/danielheres/Code/gdd/ClickBench/datafusion/hits.parquet], 
projection=[RegionID, UserID] |
   |               |                                                            
                                                                                
                   |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 rows in set. Query took 0.015 seconds.
   
   ```
   
   **Describe the solution you'd like**
   Implement this feature.
   
   **Describe alternatives you've considered**
   n/a
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to