mslapek commented on PR #5745:
URL: 
https://github.com/apache/arrow-datafusion/pull/5745#issuecomment-1485376378

   @mingmwang Thank you for your observation. Let's make an experiment...
   
   ## Experiment
   
   Copied `arrow-datafusion/datafusion/core/tests/data/aggregate_simple.csv` to 
`aggregate_simple.csv`.
   
   Script:
   
   ```rust
   use datafusion::prelude::*;
   
   #[tokio::main]
   async fn main() -> datafusion::error::Result<()> {
       let ctx = SessionContext::new();
   
       ctx.register_csv(
           "my_table",
           "datafusion/core/tests/data/aggregate_simple.csv",
           CsvReadOptions::default(),
       ).await?;
       ctx.register_csv(
           "my_table2",
           "datafusion/core/tests/data/aggregate_simple2.csv",
           CsvReadOptions::default(),
       ).await?;
   
       let sql = "SELECT COUNT(*) FROM (SELECT * from my_table UNION ALL SELECT 
* from my_table2)";
       let df = ctx.sql(sql).await?;
       df.clone().explain(true, false)?.show().await?;
       df.show().await?;
   
       Ok(())
   }
   
   ```
   
   Optimized logical and physical plans are below.
   
   ## Without `push_down_aggregate`
   
   *Explain after optimizations*
   
   ```
   ...
   | logical_plan                                               | Aggregate: 
groupBy=[[]], aggr=[[COUNT(UInt8(1))]]                                          
                                                                              |
   |                                                            |   Union       
                                                                                
                                                                           |
   |                                                            |     
TableScan: my_table projection=[c1]                                             
                                                                                
     |
   |                                                            |     
Projection: my_table2.c1 AS my_table.c1                                         
                                                                                
     |
   |                                                            |       
TableScan: my_table2 projection=[c1]
   ...
   | physical_plan                                              | 
AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]                       
                                                                                
         |
   |                                                            |   
CoalescePartitionsExec                                                          
                                                                                
       |
   |                                                            |     
AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]                     
                                                                                
     |
   |                                                            |       
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2           
                                                                                
   |
   |                                                            |         
UnionExec                                                                       
                                                                                
 |
   |                                                            |           
CsvExec: files={1 group: 
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple.csv]]},
 has_header=true, limit=None, projection=[c1]    |
   |                                                            |           
ProjectionExec: expr=[c1@0 as my_table.c1]                                      
                                                                               |
   |                                                            |             
CsvExec: files={1 group: 
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple2.csv]]},
 has_header=true, limit=None, projection=[c1] |
   |                                                            |               
                                                                                
                                                                           |
   
+------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   These CSV files will be naively mixed together with `RoundRobinBatch` - only 
to count the rows at the end! 😧
   
   > actually the partial agg is already pushed down.
   
   ❗️ No, it isn't...
   
   ## With `push_down_aggregate`
   
   *Explain after optimizations*
   
   ```
   ...
   | logical_plan                                               | Projection: 
SUM(COUNT(aggr_1)) AS COUNT(UInt8(1))                                           
                                                                                
     |
   |                                                            |   Aggregate: 
groupBy=[[]], aggr=[[SUM(COUNT(aggr_1))]]                                       
                                                                                
    |
   |                                                            |     Union     
                                                                                
                                                                                
   |
   |                                                            |       
Aggregate: groupBy=[[]], aggr=[[COUNT(aggr_1)]]                                 
                                                                                
           |
   |                                                            |         
Projection: UInt8(1) AS aggr_1                                                  
                                                                                
         |
   |                                                            |           
TableScan: my_table projection=[c1]                                             
                                                                                
       |
   |                                                            |       
Aggregate: groupBy=[[]], aggr=[[COUNT(aggr_1)]]                                 
                                                                                
           |
   |                                                            |         
Projection: UInt8(1) AS aggr_1                                                  
                                                                                
         |
   |                                                            |           
TableScan: my_table2 projection=[c1]
   ...
   | physical_plan                                              | 
ProjectionExec: expr=[SUM(COUNT(aggr_1))@0 as COUNT(UInt8(1))]                  
                                                                                
                 |
   |                                                            |   
AggregateExec: mode=Final, gby=[], aggr=[SUM(COUNT(aggr_1))]                    
                                                                                
               |
   |                                                            |     
CoalescePartitionsExec                                                          
                                                                                
             |
   |                                                            |       
AggregateExec: mode=Partial, gby=[], aggr=[SUM(COUNT(aggr_1))]                  
                                                                                
           |
   |                                                            |         
UnionExec                                                                       
                                                                                
         |
   |                                                            |           
AggregateExec: mode=Final, gby=[], aggr=[COUNT(aggr_1)]                         
                                                                                
       |
   |                                                            |             
CoalescePartitionsExec                                                          
                                                                                
     |
   |                                                            |               
AggregateExec: mode=Partial, gby=[], aggr=[COUNT(aggr_1)]                       
                                                                                
   |
   |                                                            |               
  ProjectionExec: expr=[1 as aggr_1]                                            
                                                                                
   |
   |                                                            |               
    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1       
                                                                                
   |
   |                                                            |               
      CsvExec: files={1 group: 
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple.csv]]},
 has_header=true, limit=None, projection=[c1]  |
   |                                                            |           
AggregateExec: mode=Final, gby=[], aggr=[COUNT(aggr_1)]                         
                                                                                
       |
   |                                                            |             
CoalescePartitionsExec                                                          
                                                                                
     |
   |                                                            |               
AggregateExec: mode=Partial, gby=[], aggr=[COUNT(aggr_1)]                       
                                                                                
   |
   |                                                            |               
  ProjectionExec: expr=[1 as aggr_1]                                            
                                                                                
   |
   |                                                            |               
    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1       
                                                                                
   |
   |                                                            |               
      CsvExec: files={1 group: 
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple2.csv]]},
 has_header=true, limit=None, projection=[c1] |
   |
   ```
   
   There is no mixing of the file contents.
   To `AggregateExec: mode=Partial, gby=[], aggr=[SUM(COUNT(aggr_1))]` are 
going only two rows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to