Dandandan opened a new issue, #3515:
URL: https://github.com/apache/arrow-datafusion/issues/3515

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   As explained by @alamb in the user defined plan:
   > //! # TopK Background:
   //!
   //! A "Top K" node is a common query optimization which is used for
   //! queries such as "find the top 3 customers by revenue". The
   //! (simplified) SQL for such a query might be:
   //!
   //! ```sql
   //! CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
   //!   STORED AS CSV location 'tests/customer.csv';
   //!
   //! SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
   //! ```
   //!
   //! And a naive plan would be:
   //!
   //! ```
   //! > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC 
limit 3;
   //! +--------------+----------------------------------------+
   //! | plan_type    | plan                                   |
   //! +--------------+----------------------------------------+
   //! | logical_plan | Limit: 3                               |
   //! |              |   Sort: #revenue DESC NULLS FIRST      |
   //! |              |     Projection: #customer_id, #revenue |
   //! |              |       TableScan: sales |
   //! +--------------+----------------------------------------+
   //! ```
   //!
   //! While this plan produces the correct answer, the careful reader
   //! will note it fully sorts the input before discarding everything
   //! other than the top 3 elements.
   
   In e.g. ClickBench we can see many queries that might benefit from this 
optimization, but we can see they are converted to a "naive" plan:
   
   ```
   9. EXPLAIN SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP 
BY "RegionID" ORDER BY u DESC LIMIT 10;
   
   -----------------------------------------
   
   
   DataFusion CLI v12.0.0
   0 rows in set. Query took 0.062 seconds.
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                   |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: skip=0, fetch=10                                    
                                                                                
                   |
   |               |   Sort: #u DESC NULLS FIRST                                
                                                                                
                   |
   |               |     Projection: #hits.RegionID, #COUNT(DISTINCT 
hits.UserID) AS u                                                               
                              |
   |               |       Projection: #group_alias_0 AS RegionID, 
#COUNT(alias1) AS COUNT(DISTINCT hits.UserID)                                   
                                |
   |               |         Aggregate: groupBy=[[#group_alias_0]], 
aggr=[[COUNT(#alias1)]]                                                         
                               |
   |               |           Aggregate: groupBy=[[#hits.RegionID AS 
group_alias_0, #hits.UserID AS alias1]], aggr=[[]]                              
                             |
   |               |             TableScan: hits projection=[RegionID, UserID]  
                                                                                
                   |
   | physical_plan | GlobalLimitExec: skip=0, fetch=10                          
                                                                                
                   |
   |               |   SortExec: [u@1 DESC]                                     
                                                                                
                   |
   |               |     CoalescePartitionsExec                                 
                                                                                
                   |
   |               |       ProjectionExec: expr=[RegionID@0 as RegionID, 
COUNT(DISTINCT hits.UserID)@1 as u]                                             
                          |
   |               |         ProjectionExec: expr=[group_alias_0@0 as RegionID, 
COUNT(alias1)@1 as COUNT(DISTINCT hits.UserID)]                                 
                   |
   |               |           AggregateExec: mode=FinalPartitioned, 
gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                    
                              |
   |               |             CoalesceBatchesExec: target_batch_size=4096    
                                                                                
                   |
   |               |               RepartitionExec: partitioning=Hash([Column { 
name: "group_alias_0", index: 0 }], 16)                                         
                   |
   |               |                 AggregateExec: mode=Partial, 
gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                    
                                 |
   |               |                   AggregateExec: mode=FinalPartitioned, 
gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]             
                      |
   |               |                     CoalesceBatchesExec: 
target_batch_size=4096                                                          
                                     |
   |               |                       RepartitionExec: 
partitioning=Hash([Column { name: "group_alias_0", index: 0 }, Column { name: 
"alias1", index: 1 }], 16)               |
   |               |                         AggregateExec: mode=Partial, 
gby=[RegionID@0 as group_alias_0, UserID@1 as alias1], aggr=[]                  
                         |
   |               |                           RepartitionExec: 
partitioning=RoundRobinBatch(16)                                                
                                   |
   |               |                             ParquetExec: limit=None, 
partitions=[home/danielheres/Code/gdd/ClickBench/datafusion/hits.parquet], 
projection=[RegionID, UserID] |
   |               |                                                            
                                                                                
                   |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 rows in set. Query took 0.015 seconds.
   
   ```
   
   **Describe the solution you'd like**
   Implement 
   
   **Describe alternatives you've considered**
   A clear and concise description of any alternative solutions or features 
you've considered.
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to