erratic-pattern opened a new issue, #19049:
URL: https://github.com/apache/datafusion/issues/19049

   ### Describe the bug
   
   DataFusion fails with schema mismatch error when processing UNION ALL query 
on parquet files with field metadata.
   
   `Error while planning query: Internal error: Physical input schema should be 
the same as the one converted from logical input schema. Differences: .`
   
   
   
   
   
   ### To Reproduce
   
   Parquet: 
[union_all_repo.zip](https://github.com/user-attachments/files/23885649/union_all_repo.zip)
   
   Query:
   ```sql
   SET datafusion.execution.parquet.skip_metadata = false;
   
   SELECT AVG(usage_idle), AVG(usage_system)
   FROM (
       SELECT time, usage_idle, NULL::DOUBLE as usage_system FROM 
'union_all_repro.parquet'
       UNION ALL
       SELECT time, NULL::DOUBLE as usage_idle, usage_system FROM 
'union_all_repro.parquet'
   );
   ```
   
   
   ### Expected behavior
   
   Query should succeed with matching logical and physical schema
   
   ### Additional context
   
   ## Sequence of Events
   
   ### 1. Projection Optimization
   
   
[`optimize_projections`](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/optimize_projections/mod.rs)
 sees that `time` column is unused in the projection and removes it from the 
`UnionExec`
   
   ```
   
+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type                                                  | plan          
                                                                                
                                                                                
      |
   
+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | initial_logical_plan                                       | Projection: 
avg(usage_idle), avg(usage_system)                                              
                                                                                
        |
   |                                                            |   Aggregate: 
groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]]                       
                                                                                
       |
   |                                                            |     Union     
                                                                                
                                                                                
      |
   |                                                            |       
Projection: ./union_all_repro.parquet.time, 
./union_all_repro.parquet.usage_idle, CAST(NULL AS Float64) AS usage_system     
                                                  |
   |                                                            |         
TableScan: ./union_all_repro.parquet                                            
                                                                                
            |
   |                                                            |       
Projection: ./union_all_repro.parquet.time, CAST(NULL AS Float64) AS 
usage_idle, ./union_all_repro.parquet.usage_system                              
                         |
   |                                                            |         
TableScan: ./union_all_repro.parquet                                            
                                                                                
            |
   | logical_plan after resolve_grouping_function               | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after type_coercion                           | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | analyzed_logical_plan                                      | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_nested_union                  | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after simplify_expressions                    | Projection: 
avg(usage_idle), avg(usage_system)                                              
                                                                                
        |
   |                                                            |   Aggregate: 
groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]]                       
                                                                                
       |
   |                                                            |     Union     
                                                                                
                                                                                
      |
   |                                                            |       
Projection: ./union_all_repro.parquet.time, 
./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system             
                                                  |
   |                                                            |         
TableScan: ./union_all_repro.parquet                                            
                                                                                
            |
   |                                                            |       
Projection: ./union_all_repro.parquet.time, Float64(NULL) AS usage_idle, 
./union_all_repro.parquet.usage_system                                          
                     |
   |                                                            |         
TableScan: ./union_all_repro.parquet                                            
                                                                                
            |
   | logical_plan after replace_distinct_aggregate              | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_join                          | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after decorrelate_predicate_subquery          | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after scalar_subquery_to_join                 | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after decorrelate_lateral_join                | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after extract_equijoin_predicate              | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_duplicated_expr               | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_filter                        | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_cross_join                    | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_limit                         | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after propagate_empty_relation                | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_one_union                     | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after filter_null_join_keys                   | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_outer_join                    | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after push_down_limit                         | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after push_down_filter                        | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after eliminate_group_by_constant             | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after common_sub_expression_eliminate         | SAME TEXT AS 
ABOVE                                                                           
                                                                                
       |
   | logical_plan after optimize_projections                    | Aggregate: 
groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]]                       
                                                                                
         |
   |                                                            |   Union       
                                                                                
                                                                                
      |
   |                                                            |     
Projection: ./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system 
                                                                                
                |
   |                                                            |       
TableScan: ./union_all_repro.parquet projection=[usage_idle]                    
                                                                                
              |
   |                                                            |     
Projection: Float64(NULL) AS usage_idle, ./union_all_repro.parquet.usage_system 
                                                                                
                |
   |                                                            |       
TableScan: ./union_all_repro.parquet projection=[usage_system]                  
                                                                                
      
   ```
   
   ### 2. Schema Recomputation
   
   
[`optimize_projections`](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/optimize_projections/mod.rs)
 
[calls](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/optimizer/src/optimize_projections/mod.rs#L468)
 
[`recompute_schema`](https://github.com/apache/arrow-datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/expr/src/logical_plan/plan.rs#L624-L756)
 since the plan has changed. `recompute_schema` sees that the number of fields 
has changed and [creates a new 
schema](https://github.com/influxdata/arrow-datafusion/blob/82cd7f3cdb8dbe0b63b8b62f54543641598655a0/datafusion/expr/src/logical_plan/plan.rs#L718)
 with 
[`Union::try_new`](https://github.com/influxdata/arrow-datafusion/blob/82cd7f3cdb8dbe0b63b8b62f54543641598655a0/datafusion/expr/src/logical_plan/plan.rs#L718)
   
   
[`Union::try_new`](https://github.com/influxdata/arrow-datafusion/blob/82cd7f3cdb8dbe0b63b8b62f54543641598655a0/datafusion/expr/src/logical_plan/plan.rs#L718)
 calls 
[`Union::derive_schema_from_inputs`](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/expr/src/logical_plan/plan.rs#L2867-L2881)
 to recreate the Union Schema from the plan inputs. For each field in the 
schema, it calls 
[`intersect_metadata_for_union`](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/expr/src/expr.rs#L506-L520)
 which only keeps metadata when the field has the same metadata across all plan 
inputs. Since the `NULL` literal doesn't have the same metadata as our column 
from parquet, the metadata gets removed from the logical schema.
   
   ### 3. Physical/Logical Schema Mismatch
   
   During physical planning, we call 
[`DefaultPhysicalPlanner::map_logical_node_to_physical`](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/core/src/physical_planner.rs#L447-L1525)
 which 
[checks](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/core/src/physical_planner.rs#L683-L724)
  the input of the `Aggregate` node (in this case `Union`) to see if its 
logical schema matches its physical schema. However, because we previously 
removed the metadata from `Union`, the [metadata no longer 
matches](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/core/src/schema_equivalence.rs#L42)
  when compared.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to