helgikrs opened a new issue, #9211:
URL: https://github.com/apache/arrow-datafusion/issues/9211

   ### Describe the bug
   
   `PushdownProjection` physical optimization rule eliminates a projection of 
the form `ProjectionExec: expr=[a@1 as a, a@0 as a]`, causing the output 
ordering to change.
   
   ### To Reproduce
   
   There might be a simpler way to reproduce, but it happens for me when the 
join selection rules swaps the left and right hand sides of a hash join (which 
is triggered wen we have statistics, like for parquet tables).
   
   ```sql
   DataFusion CLI v35.0.0
   
   ❯ create table a (
       a bigint
   ) as values
   (1),
   (2),
   (3),
   (4),
   (5),
   (6);
   0 rows in set. Query took 0.010 seconds.
   
   ❯ create table b (
       a bigint
   ) as values
   (3),
   (4),
   (7),
   (9);
   0 rows in set. Query took 0.001 seconds.
   
   ❯ select * from a left join b on a.a = b.a;
   +---+---+
   | a | a |
   +---+---+
   | 3 | 3 |
   | 5 |   |
   | 4 | 4 |
   | 1 |   |
   | 6 |   |
   | 2 |   |
   +---+---+
   6 rows in set. Query took 0.001 seconds. 
   ```
   
   So far everything is fine when using the memory tables. If we export the 
tables to parquet file and run the same query the bug is triggered.
   
   ```sql
   ❯ copy (select * from a) to 'a.parquet';
   +-------+
   | count |
   +-------+
   | 6     |
   +-------+
   1 row in set. Query took 0.013 seconds.
   
   ❯ copy (select * from b) to 'b.parquet';
   +-------+
   | count |
   +-------+
   | 4     |
   +-------+
   1 row in set. Query took 0.011 seconds.
   
   
   ❯ select * from 'a.parquet' a left join 'b.parquet' b on a.a = b.a;
   +---+---+
   | a | a |
   +---+---+
   |   | 5 |
   | 4 | 4 |
   |   | 1 |
   | 3 | 3 |
   |   | 6 |
   |   | 2 |
   +---+---+
   6 rows in set. Query took 0.003 seconds.
   ```
   
   With explain verbose we can see what's happening (irrelevant parts removed 
for clarity)
   
   ```sql
   ❯ explain verbose select * from 'a.parquet' a left join 'b.parquet' b on a.a 
= b.a;
   
+------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type                                                  | plan          
                                                                                
                                                                                
                                                                    |
   
+------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    --- snip ---
   | logical_plan                                               | Left Join: 
a.a = b.a                                                                       
                                                                                
                                                                       |
   |                                                            |   
SubqueryAlias: a                                                                
                                                                                
                                                                                
|
   |                                                            |     
TableScan: a.parquet projection=[a]                                             
                                                                                
                                                                              |
   |                                                            |   
SubqueryAlias: b                                                                
                                                                                
                                                                                
|
   |                                                            |     
TableScan: b.parquet projection=[a]                                             
                                                                                
                                                                              |
   | initial_physical_plan                                      | HashJoinExec: 
mode=Partitioned, join_type=Left, on=[(a@0, a@0)]                               
                                                                                
                                                                    |
   |                                                            |   
ParquetExec: file_groups={1 group: [[a.parquet]]}, projection=[a]               
                                                                                
                                                                     |
   |                                                            |   
ParquetExec: file_groups={1 group: [[b.parquet]]}, projection=[a]               
                                                                                
                                                                     |
   |                                                            |               
                                                                                
                                                                                
                                                                    |
   | initial_physical_plan_with_stats                           | HashJoinExec: 
mode=Partitioned, join_type=Left, on=[(a@0, a@0)], statistics=[Rows=Inexact(6), 
Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(6)) 
Null=Exact(0)),(Col[1]: Min=Exact(Int64(3)) Max=Exact(Int64(9)) 
Null=Exact(0))]]    |
   |                                                            |   
ParquetExec: file_groups={1 group: [[a.parquet]]}, projection=[a], 
statistics=[Rows=Exact(6), Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) 
Max=Exact(Int64(6)) Null=Exact(0))]]                                            
           |
   |                                                            |   
ParquetExec: file_groups={1 group: [[b.parquet]]}, projection=[a], 
statistics=[Rows=Exact(4), Bytes=Absent, [(Col[0]: Min=Exact(Int64(3)) 
Max=Exact(Int64(9)) Null=Exact(0))]]                                            
           |
   |                                                            |               
                                                                                
                                                                                
                                                                    |
   --- snip ---
   | physical_plan after join_selection                         | 
OutputRequirementExec                                                           
                                                                                
                                                                                
  |
   |                                                            |   
ProjectionExec: expr=[a@1 as a, a@0 as a]                                       
                                                                                
                                                                                
|
   |                                                            |     
HashJoinExec: mode=Partitioned, join_type=Right, on=[(a@0, a@0)]                
                                                                                
                                                                              |
   |                                                            |       
ParquetExec: file_groups={1 group: [[b.parquet]]}, projection=[a]               
                                                                                
                                                                 |
   |                                                            |       
ParquetExec: file_groups={1 group: [[a.parquet]]}, projection=[a]               
                                                                                
                                                                 |
   --- snip --
   | physical_plan after OutputRequirements                     | 
ProjectionExec: expr=[a@1 as a, a@0 as a]                                       
                                                                                
                                                                                
  |
   |                                                            |   
CoalesceBatchesExec: target_batch_size=8192                                     
                                                                                
                                                                                
|
   |                                                            |     
HashJoinExec: mode=Partitioned, join_type=Right, on=[(a@0, a@0)]                
                                                                                
                                                                              |
   |                                                            |       
CoalesceBatchesExec: target_batch_size=8192                                     
                                                                                
                                                                            |
   |                                                            |         
RepartitionExec: partitioning=Hash([a@0], 24), input_partitions=1               
                                                                                
                                                                          |
   |                                                            |           
ParquetExec: file_groups={1 group: [[home/helgi/b.parquet]]}, projection=[a]    
                                                                                
                                                                        |
   |                                                            |       
CoalesceBatchesExec: target_batch_size=8192                                     
                                                                                
                                                                            |
   |                                                            |         
RepartitionExec: partitioning=Hash([a@0], 24), input_partitions=1               
                                                                                
                                                                          |
   |                                                            |           
ParquetExec: file_groups={1 group: [[home/helgi/a.parquet]]}, projection=[a]    
                                                                                
                                                                        |
   |                                                            |               
                                                                                
                                                                                
                                                                    |
   | physical_plan after PipelineChecker                        | SAME TEXT AS 
ABOVE                                                                           
                                                                                
                                                                     |
   | physical_plan after LimitAggregation                       | SAME TEXT AS 
ABOVE                                                                           
                                                                                
                                                                     |
   | physical_plan after ProjectionPushdown                     | 
CoalesceBatchesExec: target_batch_size=8192                                     
                                                                                
                                                                                
  |
   |                                                            |   
HashJoinExec: mode=Partitioned, join_type=Right, on=[(a@0, a@0)]                
                                                                                
                                                                                
|
   |                                                            |     
CoalesceBatchesExec: target_batch_size=8192                                     
                                                                                
                                                                              |
   |                                                            |       
RepartitionExec: partitioning=Hash([a@0], 24), input_partitions=1               
                                                                                
                                                                            |
   |                                                            |         
ParquetExec: file_groups={1 group: [[b.parquet]]}, projection=[a]               
                                                                                
                                                               |
   |                                                            |     
CoalesceBatchesExec: target_batch_size=8192                                     
                                                                                
                                                                              |
   |                                                            |       
RepartitionExec: partitioning=Hash([a@0], 24), input_partitions=1               
                                                                                
                                                                            |
   |                                                            |         
ParquetExec: file_groups={1 group: [[a.parquet]]}, projection=[a]               
                                                                                
                                                               |
   ```
   
   Here we see the join selection rule changing the join type to a right join 
and adding a `a@1 as a, a@0 as a` projection to account for the change, but the 
ProjectionPushdown rule then removes that projection, causing the columns to 
get swapped.
   
   ### Expected behavior
   
   I expected the same output as with the memory table.
   
   ### Additional context
   
   This does not happen if the column names are not ambiguous (the top 
projection is left in place)
   
   ```sql
   ❯ select * from 'a.parquet' a left join (select a as b from 'b.parquet') b 
on a.a = b.b;
   +---+---+
   | a | b |
   +---+---+
   | 4 | 4 |
   | 1 |   |
   | 6 |   |
   | 3 | 3 |
   | 2 |   |
   | 5 |   |
   +---+---+
   6 rows in set. Query took 0.003 seconds.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to