jychen7 opened a new pull request, #6014:
URL: https://github.com/apache/arrow-datafusion/pull/6014

   # Which issue does this PR close?
   
   close https://github.com/apache/arrow-datafusion/issues/6000
   
   # Rationale for this change
   
   reduce memory usage, improve speed
   
   # What changes are included in this PR?
   
   - [x] add `fetch` option to `SortPreservingMergeExec` and 
`SortPreservingMergeStream`
       + notice the change of `SortPreservingMergeExec: fetch=10` below
   - [ ] use `fetch` in `SortPreservingMergeStream`
   
   before
   ```
   ❯ explain SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC 
LIMIT 10;
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: skip=0, fetch=10                                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |   Sort: c DESC NULLS FIRST, fetch=10                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |     Projection: hits.WatchID, hits.ClientIP, 
COUNT(UInt8(1)) AS c, SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |       Aggregate: groupBy=[[hits.WatchID, hits.ClientIP]], 
aggr=[[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]]        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |         TableScan: hits projection=[WatchID, ClientIP, 
IsRefresh, ResolutionWidth]                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   | physical_plan | GlobalLimitExec: skip=0, fetch=10                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |   SortPreservingMergeExec: [c@2 DESC]                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |     SortExec: fetch=10, expr=[c@2 DESC]                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |       ProjectionExec: expr=[WatchID@0 as WatchID, 
ClientIP@1 as ClientIP, COUNT(UInt8(1))@2 as c, SUM(hits.IsRefresh)@3 as 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)@4 as AVG(hits.ResolutionWidth)]  
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |         AggregateExec: mode=FinalPartitioned, 
gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |           CoalesceBatchesExec: target_batch_size=8192      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |             RepartitionExec: partitioning=Hash([Column { 
name: "WatchID", index: 0 }, Column { name: "ClientIP", index: 1 }], 12), 
input_partitions=12                                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |               AggregateExec: mode=Partial, gby=[WatchID@0 
as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |                 ParquetExec: limit=None, partitions={12 
groups: [[hits.parquet:0..1231664704], [hits.parquet:1231664704..2463329408], 
[hits.parquet:2463329408..3694994112], [hits.parquet:3694994112..4926658816], 
[hits.parquet:4926658816..6158323520], [hits.parquet:6158323520..7389988224], 
[hits.parquet:7389988224..8621652928], [hits.parquet:8621652928..9853317632], 
[hits.parquet:9853317632..11084982336], 
[hits.parquet:11084982336..12316647040], 
[hits.parquet:12316647040..13548311744], 
[hits.parquet:13548311744..14779976446]]}, projection=[WatchID, ClientIP, 
IsRefresh, ResolutionWidth] |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   after
   ```
   ❯ explain SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC 
LIMIT 10;
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: skip=0, fetch=10                                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |   Sort: c DESC NULLS FIRST, fetch=10                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |     Projection: hits.WatchID, hits.ClientIP, 
COUNT(UInt8(1)) AS c, SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |       Aggregate: groupBy=[[hits.WatchID, hits.ClientIP]], 
aggr=[[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]]        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |         TableScan: hits projection=[WatchID, ClientIP, 
IsRefresh, ResolutionWidth]                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   | physical_plan | GlobalLimitExec: skip=0, fetch=10                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |   SortPreservingMergeExec: fetch=10, [c@2 DESC]            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |     SortExec: fetch=10, expr=[c@2 DESC]                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |       ProjectionExec: expr=[WatchID@0 as WatchID, 
ClientIP@1 as ClientIP, COUNT(UInt8(1))@2 as c, SUM(hits.IsRefresh)@3 as 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)@4 as AVG(hits.ResolutionWidth)]  
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |         AggregateExec: mode=FinalPartitioned, 
gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |           CoalesceBatchesExec: target_batch_size=8192      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |             RepartitionExec: partitioning=Hash([Column { 
name: "WatchID", index: 0 }, Column { name: "ClientIP", index: 1 }], 12), 
input_partitions=12                                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |               AggregateExec: mode=Partial, gby=[WatchID@0 
as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   |               |                 ParquetExec: limit=None, partitions={12 
groups: [[hits.parquet:0..1231664704], [hits.parquet:1231664704..2463329408], 
[hits.parquet:2463329408..3694994112], [hits.parquet:3694994112..4926658816], 
[hits.parquet:4926658816..6158323520], [hits.parquet:6158323520..7389988224], 
[hits.parquet:7389988224..8621652928], [hits.parquet:8621652928..9853317632], 
[hits.parquet:9853317632..11084982336], 
[hits.parquet:11084982336..12316647040], 
[hits.parquet:12316647040..13548311744], 
[hits.parquet:13548311744..14779976446]]}, projection=[WatchID, ClientIP, 
IsRefresh, ResolutionWidth] |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                            |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   # Are these changes tested?
   
   
   # Are there any user-facing changes?
   
   NO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to