alamb commented on code in PR #5770:
URL: https://github.com/apache/arrow-datafusion/pull/5770#discussion_r1152250301


##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                         
                                                                                
                                   |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / 
Float64(7) AS avg_yearly                                                        
                                   |
-|               |   Aggregate: groupBy=[[]], 
aggr=[[SUM(lineitem.l_extendedprice)]]                                          
                                                                     |
-|               |     Projection: lineitem.l_extendedprice                     
                                                                                
                                   |
-|               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 
15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND 
__scalar_sq_1.l_partkey = lineitem.l_partkey              |
-|               |         Projection: lineitem.l_partkey, lineitem.l_quantity, 
lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value        
                                   |
-|               |           Inner Join: part.p_partkey = 
__scalar_sq_1.l_partkey                                                         
                                                         |
-|               |             Filter: part.p_partkey = lineitem.l_partkey AND 
lineitem.l_partkey = part.p_partkey                                             
                                    |
-|               |               Inner Join: lineitem.l_partkey = 
part.p_partkey                                                                  
                                                 |
-|               |                 TableScan: lineitem projection=[l_partkey, 
l_quantity, l_extendedprice]                                                    
                                     |
-|               |                 Projection: part.p_partkey                   
                                                                                
                                   |
-|               |                   Filter: part.p_brand = Utf8("Brand#23") 
AND part.p_container = Utf8("MED BOX")                                          
                                      |
-|               |                     TableScan: part projection=[p_partkey, 
p_brand, p_container]                                                           
                                     |
-|               |             SubqueryAlias: __scalar_sq_1                     
                                                                                
                                   |
-|               |               Projection: lineitem.l_partkey, Float64(0.2) * 
CAST(AVG(lineitem.l_quantity) AS Float64) AS __value                            
                                   |
-|               |                 Aggregate: groupBy=[[lineitem.l_partkey]], 
aggr=[[AVG(lineitem.l_quantity)]]                                               
                                     |
-|               |                   TableScan: lineitem projection=[l_partkey, 
l_quantity]                                                                     
                                   |
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 
AS Float64) / 7 as avg_yearly]                                                  
                                     |
-|               |   AggregateExec: mode=Final, gby=[], 
aggr=[SUM(lineitem.l_extendedprice)]                                            
                                                           |
-|               |     CoalescePartitionsExec                                   
                                                                                
                                   |
-|               |       AggregateExec: mode=Partial, gby=[], 
aggr=[SUM(lineitem.l_extendedprice)]                                            
                                                     |
-|               |         ProjectionExec: expr=[l_extendedprice@2 as 
l_extendedprice]                                                                
                                             |
-|               |           CoalesceBatchesExec: target_batch_size=8192        
                                                                                
                                   |
-|               |             FilterExec: CAST(l_quantity@1 AS Decimal128(30, 
15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0      
                                    |
-|               |               ProjectionExec: expr=[l_partkey@0 as 
l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, 
l_partkey@4 as l_partkey, __value@5 as __value] |
-|               |                 CoalesceBatchesExec: target_batch_size=8192  
                                                                                
                                   |
-|               |                   HashJoinExec: mode=Partitioned, 
join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name: 
"l_partkey", index: 0 })]                        |
-|               |                     CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                       |
-|               |                       RepartitionExec: 
partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2), 
input_partitions=2                                                       |
-|               |                         CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                   |
-|               |                           FilterExec: p_partkey@3 = 
l_partkey@0 AND l_partkey@0 = p_partkey@3                                       
                                            |
-|               |                             RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=2                             
                                                   |
-|               |                               CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                             |
-|               |                                 HashJoinExec: 
mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, 
Column { name: "p_partkey", index: 0 })]          |
-|               |                                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                         |
-|               |                                     RepartitionExec: 
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), 
input_partitions=0                                         |
-|               |                                       MemoryExec: 
partitions=0, partition_sizes=[]                                                
                                              |
-|               |                                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                         |
-|               |                                     RepartitionExec: 
partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), 
input_partitions=2                                         |
-|               |                                       RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=0                             
                                         |
-|               |                                         ProjectionExec: 
expr=[p_partkey@0 as p_partkey]                                                 
                                        |
-|               |                                           
CoalesceBatchesExec: target_batch_size=8192                                     
                                                      |
-|               |                                             FilterExec: 
p_brand@1 = Brand#23 AND p_container@2 = MED BOX                                
                                        |
-|               |                                               MemoryExec: 
partitions=0, partition_sizes=[]                                                
                                      |
-|               |                     ProjectionExec: expr=[l_partkey@0 as 
l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]        
                                       |
-|               |                       AggregateExec: mode=FinalPartitioned, 
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                 
                                    |
-|               |                         CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                   |
-|               |                           RepartitionExec: 
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), 
input_partitions=2                                                   |
-|               |                             RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=0                             
                                                   |
-|               |                               AggregateExec: mode=Partial, 
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                 
                                     |
-|               |                                 MemoryExec: partitions=0, 
partition_sizes=[]                                                              
                                      |
-|               |                                                              
                                                                                
                                   |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / 
Float64(7) AS avg_yearly                                                        
                                                                                
                                                                                
                                                                                
                                                                          |
+|               |   Aggregate: groupBy=[[]], 
aggr=[[SUM(lineitem.l_extendedprice)]]                                          
                                                                                
                                                                                
                                                                                
                                                                                
                            |
+|               |     Projection: lineitem.l_extendedprice                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
+|               |       Inner Join: part.p_partkey = __scalar_sq_1.l_partkey 
Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < 
CAST(__scalar_sq_1.__value AS Decimal128(30, 15))                               
                                                                                
                                                                                
                                                                                
                  |

Review Comment:
   This is a better plan because the redundant 
   ```
   Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey = 
part.p_partkey
   ```
   , which is already done by the earlier joins, is removed, right?



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -952,21 +952,21 @@ impl DefaultPhysicalPlanner {
 
                     let join_filter = match filter {
                         Some(expr) => {
-                            // Extract columns from filter expression
+                            // Extract columns from filter expression and 
saved in a HashSet
                             let cols = expr.to_columns()?;
 
-                            // Collect left & right field indices
+                            // Collect left & right field indices, the field 
indices are sorted in ascending order

Review Comment:
   If the sort order is important for later stages, can you make a note about 
the rationale (so the comment explains why the sorting is important, in 
addition to noting the output is sorted)



##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type     | plan                                                         
                                                                                
                                   |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / 
Float64(7) AS avg_yearly                                                        
                                   |
-|               |   Aggregate: groupBy=[[]], 
aggr=[[SUM(lineitem.l_extendedprice)]]                                          
                                                                     |
-|               |     Projection: lineitem.l_extendedprice                     
                                                                                
                                   |
-|               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 
15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND 
__scalar_sq_1.l_partkey = lineitem.l_partkey              |
-|               |         Projection: lineitem.l_partkey, lineitem.l_quantity, 
lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value        
                                   |
-|               |           Inner Join: part.p_partkey = 
__scalar_sq_1.l_partkey                                                         
                                                         |
-|               |             Filter: part.p_partkey = lineitem.l_partkey AND 
lineitem.l_partkey = part.p_partkey                                             
                                    |
-|               |               Inner Join: lineitem.l_partkey = 
part.p_partkey                                                                  
                                                 |
-|               |                 TableScan: lineitem projection=[l_partkey, 
l_quantity, l_extendedprice]                                                    
                                     |
-|               |                 Projection: part.p_partkey                   
                                                                                
                                   |
-|               |                   Filter: part.p_brand = Utf8("Brand#23") 
AND part.p_container = Utf8("MED BOX")                                          
                                      |
-|               |                     TableScan: part projection=[p_partkey, 
p_brand, p_container]                                                           
                                     |
-|               |             SubqueryAlias: __scalar_sq_1                     
                                                                                
                                   |
-|               |               Projection: lineitem.l_partkey, Float64(0.2) * 
CAST(AVG(lineitem.l_quantity) AS Float64) AS __value                            
                                   |
-|               |                 Aggregate: groupBy=[[lineitem.l_partkey]], 
aggr=[[AVG(lineitem.l_quantity)]]                                               
                                     |
-|               |                   TableScan: lineitem projection=[l_partkey, 
l_quantity]                                                                     
                                   |
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 
AS Float64) / 7 as avg_yearly]                                                  
                                     |
-|               |   AggregateExec: mode=Final, gby=[], 
aggr=[SUM(lineitem.l_extendedprice)]                                            
                                                           |
-|               |     CoalescePartitionsExec                                   
                                                                                
                                   |
-|               |       AggregateExec: mode=Partial, gby=[], 
aggr=[SUM(lineitem.l_extendedprice)]                                            
                                                     |
-|               |         ProjectionExec: expr=[l_extendedprice@2 as 
l_extendedprice]                                                                
                                             |
-|               |           CoalesceBatchesExec: target_batch_size=8192        
                                                                                
                                   |
-|               |             FilterExec: CAST(l_quantity@1 AS Decimal128(30, 
15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0      
                                    |
-|               |               ProjectionExec: expr=[l_partkey@0 as 
l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, 
l_partkey@4 as l_partkey, __value@5 as __value] |
-|               |                 CoalesceBatchesExec: target_batch_size=8192  
                                                                                
                                   |
-|               |                   HashJoinExec: mode=Partitioned, 
join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name: 
"l_partkey", index: 0 })]                        |
-|               |                     CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                       |
-|               |                       RepartitionExec: 
partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2), 
input_partitions=2                                                       |
-|               |                         CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                   |
-|               |                           FilterExec: p_partkey@3 = 
l_partkey@0 AND l_partkey@0 = p_partkey@3                                       
                                            |
-|               |                             RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=2                             
                                                   |
-|               |                               CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                             |
-|               |                                 HashJoinExec: 
mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 }, 
Column { name: "p_partkey", index: 0 })]          |
-|               |                                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                         |
-|               |                                     RepartitionExec: 
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), 
input_partitions=0                                         |
-|               |                                       MemoryExec: 
partitions=0, partition_sizes=[]                                                
                                              |
-|               |                                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                         |
-|               |                                     RepartitionExec: 
partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2), 
input_partitions=2                                         |
-|               |                                       RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=0                             
                                         |
-|               |                                         ProjectionExec: 
expr=[p_partkey@0 as p_partkey]                                                 
                                        |
-|               |                                           
CoalesceBatchesExec: target_batch_size=8192                                     
                                                      |
-|               |                                             FilterExec: 
p_brand@1 = Brand#23 AND p_container@2 = MED BOX                                
                                        |
-|               |                                               MemoryExec: 
partitions=0, partition_sizes=[]                                                
                                      |
-|               |                     ProjectionExec: expr=[l_partkey@0 as 
l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]        
                                       |
-|               |                       AggregateExec: mode=FinalPartitioned, 
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                 
                                    |
-|               |                         CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                   |
-|               |                           RepartitionExec: 
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2), 
input_partitions=2                                                   |
-|               |                             RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=0                             
                                                   |
-|               |                               AggregateExec: mode=Partial, 
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]                 
                                     |
-|               |                                 MemoryExec: partitions=0, 
partition_sizes=[]                                                              
                                      |
-|               |                                                              
                                                                                
                                   |
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type     | plan                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan  | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / 
Float64(7) AS avg_yearly                                                        
                                                                                
                                                                                
                                                                                
                                                                          |
+|               |   Aggregate: groupBy=[[]], 
aggr=[[SUM(lineitem.l_extendedprice)]]                                          
                                                                                
                                                                                
                                                                                
                                                                                
                            |
+|               |     Projection: lineitem.l_extendedprice                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
+|               |       Inner Join: part.p_partkey = __scalar_sq_1.l_partkey 
Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < 
CAST(__scalar_sq_1.__value AS Decimal128(30, 15))                               
                                                                                
                                                                                
                                                                                
                  |

Review Comment:
   It also pushes the filter
   
   ```
   |               |       Filter: CAST(lineitem.l_quantity AS Decimal128(30, 
15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND 
__scalar_sq_1.l_partkey = lineitem.l_partkey              |
   ``
   
   Into the Join which seems like a win to me (avoid generating output)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to