This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 72b855bcfc Fix invalid schema for unions in ViewTables (#15135)
72b855bcfc is described below

commit 72b855bcfca84799982a66f719a7f8d0b6f25faa
Author: Matt Friede <[email protected]>
AuthorDate: Sun Mar 16 06:32:33 2025 -0400

    Fix invalid schema for unions in ViewTables (#15135)
    
    * Add test for coerce_union_schema
    
    * coerce_union to use its own schema instead of that of the first plan
    
    * Generate unique field names for union schema instead of using table 
qualifiers
    
    * Review feedback: avoid cloning schema
    
    * start from union schema when coercing
    
    * cargo fmt
    
    * dont use wildcard in test
    
    * Dont strip qualifiers for sorts over unions
---
 datafusion/expr/src/logical_plan/builder.rs        |  1 +
 datafusion/expr/src/logical_plan/plan.rs           | 20 +++++--
 datafusion/optimizer/src/analyzer/type_coercion.rs | 62 ++++++++++++++++++---
 .../optimizer/src/propagate_empty_relation.rs      |  4 +-
 datafusion/sqllogictest/test_files/limit.slt       |  2 +-
 datafusion/sqllogictest/test_files/order.slt       |  4 +-
 .../sqllogictest/test_files/type_coercion.slt      |  2 +-
 datafusion/sqllogictest/test_files/union.slt       | 65 ++++++++++++++++++++--
 .../sqllogictest/test_files/union_by_name.slt      | 12 ++--
 9 files changed, 142 insertions(+), 30 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index f506c0671b..605db26553 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -776,6 +776,7 @@ impl LogicalPlanBuilder {
             &missing_cols,
             is_distinct,
         )?;
+
         let sort_plan = LogicalPlan::Sort(Sort {
             expr: normalize_sorts(sorts, &plan)?,
             input: Arc::new(plan),
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 4196ebf28f..641489b5d9 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -2817,6 +2817,7 @@ impl Union {
             }
         }
 
+        let mut name_counts: HashMap<String, usize> = HashMap::new();
         let union_fields = (0..fields_count)
             .map(|i| {
                 let fields = inputs
@@ -2824,7 +2825,8 @@ impl Union {
                     .map(|input| input.schema().field(i))
                     .collect::<Vec<_>>();
                 let first_field = fields[0];
-                let name = first_field.name();
+                let base_name = first_field.name().to_string();
+
                 let data_type = if loose_types {
                     // TODO apply type coercion here, or document why it's 
better to defer
                     // temporarily use the data type from the left input and 
later rely on the analyzer to
@@ -2847,13 +2849,21 @@ impl Union {
                     )?
                 };
                 let nullable = fields.iter().any(|field| field.is_nullable());
-                let mut field = Field::new(name, data_type.clone(), nullable);
+
+                // Generate unique field name
+                let name = if let Some(count) = 
name_counts.get_mut(&base_name) {
+                    *count += 1;
+                    format!("{}_{}", base_name, count)
+                } else {
+                    name_counts.insert(base_name.clone(), 0);
+                    base_name
+                };
+
+                let mut field = Field::new(&name, data_type.clone(), nullable);
                 let field_metadata =
                     intersect_maps(fields.iter().map(|field| 
field.metadata()));
                 field.set_metadata(field_metadata);
-                // TODO reusing table reference from the first schema is 
probably wrong
-                let table_reference = 
first_schema.qualified_field(i).0.cloned();
-                Ok((table_reference, Arc::new(field)))
+                Ok((None, Arc::new(field)))
             })
             .collect::<Result<_>>()?;
         let union_schema_metadata =
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index c9c0b7a3b7..5d6b226ff1 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -214,7 +214,10 @@ impl<'a> TypeCoercionRewriter<'a> {
     /// Coerce the union’s inputs to a common schema compatible with all 
inputs.
     /// This occurs after wildcard expansion and the coercion of the input 
expressions.
     pub fn coerce_union(union_plan: Union) -> Result<LogicalPlan> {
-        let union_schema = Arc::new(coerce_union_schema(&union_plan.inputs)?);
+        let union_schema = Arc::new(coerce_union_schema_with_schema(
+            &union_plan.inputs,
+            &union_plan.schema,
+        )?);
         let new_inputs = union_plan
             .inputs
             .into_iter()
@@ -934,7 +937,12 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) 
-> Result<Case> {
 /// This method presumes that the wildcard expansion is unneeded, or has 
already
 /// been applied.
 pub fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> Result<DFSchema> {
-    let base_schema = inputs[0].schema();
+    coerce_union_schema_with_schema(&inputs[1..], inputs[0].schema())
+}
+fn coerce_union_schema_with_schema(
+    inputs: &[Arc<LogicalPlan>],
+    base_schema: &DFSchemaRef,
+) -> Result<DFSchema> {
     let mut union_datatypes = base_schema
         .fields()
         .iter()
@@ -953,7 +961,7 @@ pub fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> 
Result<DFSchema> {
 
     let mut metadata = base_schema.metadata().clone();
 
-    for (i, plan) in inputs.iter().enumerate().skip(1) {
+    for (i, plan) in inputs.iter().enumerate() {
         let plan_schema = plan.schema();
         metadata.extend(plan_schema.metadata().clone());
 
@@ -993,15 +1001,15 @@ pub fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) 
-> Result<DFSchema> {
         }
     }
     let union_qualified_fields = izip!(
-        base_schema.iter(),
+        base_schema.fields(),
         union_datatypes.into_iter(),
         union_nullabilities,
         union_field_meta.into_iter()
     )
-    .map(|((qualifier, field), datatype, nullable, metadata)| {
+    .map(|(field, datatype, nullable, metadata)| {
         let mut field = Field::new(field.name().clone(), datatype, nullable);
         field.set_metadata(metadata);
-        (qualifier.cloned(), field.into())
+        (None, field.into())
     })
     .collect::<Vec<_>>();
 
@@ -1045,11 +1053,12 @@ mod test {
     use std::sync::Arc;
 
     use arrow::datatypes::DataType::Utf8;
-    use arrow::datatypes::{DataType, Field, TimeUnit};
+    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
 
     use crate::analyzer::type_coercion::{
         coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
     };
+    use crate::analyzer::Analyzer;
     use crate::test::{assert_analyzed_plan_eq, 
assert_analyzed_plan_with_config_eq};
     use datafusion_common::config::ConfigOptions;
     use datafusion_common::tree_node::{TransformedResult, TreeNode};
@@ -1061,9 +1070,10 @@ mod test {
         cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, 
AggregateUDF,
         BinaryExpr, Case, ColumnarValue, Expr, ExprSchemable, Filter, 
LogicalPlan,
         Operator, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
-        SimpleAggregateUDF, Subquery, Volatility,
+        SimpleAggregateUDF, Subquery, Union, Volatility,
     };
     use datafusion_functions_aggregate::average::AvgAccumulator;
+    use datafusion_sql::TableReference;
 
     fn empty() -> Arc<LogicalPlan> {
         Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
@@ -1094,6 +1104,42 @@ mod test {
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)
     }
 
+    #[test]
+    fn test_coerce_union() -> Result<()> {
+        let left_plan = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
+            produce_one_row: false,
+            schema: Arc::new(
+                DFSchema::try_from_qualified_schema(
+                    TableReference::full("datafusion", "test", "foo"),
+                    &Schema::new(vec![Field::new("a", DataType::Int32, 
false)]),
+                )
+                .unwrap(),
+            ),
+        }));
+        let right_plan = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
+            produce_one_row: false,
+            schema: Arc::new(
+                DFSchema::try_from_qualified_schema(
+                    TableReference::full("datafusion", "test", "foo"),
+                    &Schema::new(vec![Field::new("a", DataType::Int64, 
false)]),
+                )
+                .unwrap(),
+            ),
+        }));
+        let union = LogicalPlan::Union(Union::try_new_with_loose_types(vec![
+            left_plan, right_plan,
+        ])?);
+        let analyzed_union = 
Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())])
+            .execute_and_check(union, &ConfigOptions::default(), |_, _| {})?;
+        let top_level_plan = LogicalPlan::Projection(Projection::try_new(
+            vec![col("a")],
+            Arc::new(analyzed_union),
+        )?);
+
+        let expected = "Projection: a\n  Union\n    Projection: 
CAST(datafusion.test.foo.a AS Int64) AS a\n      EmptyRelation\n    
EmptyRelation";
+        assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), top_level_plan, 
expected)
+    }
+
     fn coerce_on_output_if_viewtype(plan: LogicalPlan, expected: &str) -> 
Result<()> {
         let mut options = ConfigOptions::default();
         options.optimizer.expand_views_at_output = true;
diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs 
b/datafusion/optimizer/src/propagate_empty_relation.rs
index d26df073dc..344707ae8d 100644
--- a/datafusion/optimizer/src/propagate_empty_relation.rs
+++ b/datafusion/optimizer/src/propagate_empty_relation.rs
@@ -316,7 +316,7 @@ mod tests {
 
         let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
 
-        let expected = "TableScan: test";
+        let expected = "Projection: a, b, c\n  TableScan: test";
         assert_together_optimized_plan(plan, expected, true)
     }
 
@@ -406,7 +406,7 @@ mod tests {
 
         let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
 
-        let expected = "TableScan: test";
+        let expected = "Projection: a, b, c\n  TableScan: test";
         assert_together_optimized_plan(plan, expected, true)
     }
 
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index 067b23ac2f..93ffa313b8 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -654,7 +654,7 @@ explain select * FROM (
 ----
 logical_plan
 01)Limit: skip=4, fetch=10
-02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14
+02)--Sort: c DESC NULLS FIRST, fetch=14
 03)----Union
 04)------Projection: CAST(ordered_table.c AS Int64) AS c
 05)--------TableScan: ordered_table projection=[c]
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index f088e071d7..4e8be56f33 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -774,7 +774,7 @@ SELECT * FROM v
 ORDER BY 1, 2;
 ----
 logical_plan
-01)Sort: u.m ASC NULLS LAST, u.t ASC NULLS LAST
+01)Sort: m ASC NULLS LAST, t ASC NULLS LAST
 02)--Union
 03)----SubqueryAlias: u
 04)------Projection: Int64(0) AS m, m0.t
@@ -1248,7 +1248,7 @@ order by d, c, a, a0, b
 limit 2;
 ----
 logical_plan
-01)Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 
ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2
+01)Sort: d ASC NULLS LAST, c ASC NULLS LAST, a ASC NULLS LAST, a0 ASC NULLS 
LAST, b ASC NULLS LAST, fetch=2
 02)--Union
 03)----SubqueryAlias: t1
 04)------Projection: ordered_table.b, ordered_table.c, ordered_table.a, 
Int32(NULL) AS a0, ordered_table.d
diff --git a/datafusion/sqllogictest/test_files/type_coercion.slt 
b/datafusion/sqllogictest/test_files/type_coercion.slt
index 0900c88c15..2c6079bc70 100644
--- a/datafusion/sqllogictest/test_files/type_coercion.slt
+++ b/datafusion/sqllogictest/test_files/type_coercion.slt
@@ -187,7 +187,7 @@ EXPLAIN SELECT a FROM (select 1 a) x GROUP BY 1
 (SELECT a FROM (select 1.1 a) x GROUP BY 1) ORDER BY 1
 ----
 logical_plan
-01)Sort: x.a ASC NULLS LAST
+01)Sort: a ASC NULLS LAST
 02)--Union
 03)----Projection: CAST(x.a AS Float64) AS a
 04)------Aggregate: groupBy=[[x.a]], aggr=[[]]
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index 918c6e2811..654bccfab5 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -226,7 +226,7 @@ query TT
 EXPLAIN SELECT name FROM t1 UNION (SELECT name from t2 UNION SELECT name || 
'_new' from t2)
 ----
 logical_plan
-01)Aggregate: groupBy=[[t1.name]], aggr=[[]]
+01)Aggregate: groupBy=[[name]], aggr=[[]]
 02)--Union
 03)----TableScan: t1 projection=[name]
 04)----TableScan: t2 projection=[name]
@@ -411,7 +411,7 @@ query TT
 explain SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM 
aggregate_test_100 ORDER BY c9 DESC LIMIT 5
 ----
 logical_plan
-01)Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+01)Sort: c9 DESC NULLS FIRST, fetch=5
 02)--Union
 03)----Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS 
Decimal128(20, 0)) AS c9
 04)------TableScan: aggregate_test_100 projection=[c1, c9]
@@ -449,7 +449,7 @@ SELECT count(*) FROM (
 ----
 logical_plan
 01)Projection: count(Int64(1)) AS count(*)
-02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(Int64(1))]]
+02)--Aggregate: groupBy=[[name]], aggr=[[count(Int64(1))]]
 03)----Union
 04)------Aggregate: groupBy=[[t1.name]], aggr=[[]]
 05)--------TableScan: t1 projection=[name]
@@ -601,7 +601,7 @@ UNION ALL
 ORDER BY c1
 ----
 logical_plan
-01)Sort: t1.c1 ASC NULLS LAST
+01)Sort: c1 ASC NULLS LAST
 02)--Union
 03)----TableScan: t1 projection=[c1]
 04)----Projection: t2.c1a AS c1
@@ -709,6 +709,25 @@ SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1
 SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1 WHERE (t1.v2 IS NULL);
 ----
 
+query IR
+SELECT t1.v0, t2.v0 FROM t1,t2
+    UNION ALL
+SELECT t1.v0, t2.v0 FROM t1,t2
+ORDER BY v0;
+----
+-1493773377 0.280145772929
+-1493773377 0.280145772929
+-1229445667 0.280145772929
+-1229445667 0.280145772929
+1541512604 0.280145772929
+1541512604 0.280145772929
+NULL 0.280145772929
+NULL 0.280145772929
+NULL 0.280145772929
+NULL 0.280145772929
+NULL 0.280145772929
+NULL 0.280145772929
+
 statement ok
 CREATE TABLE t3 (
   id INT
@@ -814,7 +833,7 @@ UNION ALL
 ORDER BY c1
 ----
 logical_plan
-01)Sort: aggregate_test_100.c1 ASC NULLS LAST
+01)Sort: c1 ASC NULLS LAST
 02)--Union
 03)----Filter: aggregate_test_100.c1 = Utf8("a")
 04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")]
@@ -860,3 +879,39 @@ FROM (
 GROUP BY combined
 ----
 AB
+
+
+# Test union in view
+statement ok
+CREATE TABLE u1 (x INT, y INT);
+
+statement ok
+INSERT INTO u1 VALUES (3, 3), (3, 3), (1, 1);
+
+statement ok
+CREATE TABLE u2 (y BIGINT, z BIGINT);
+
+statement ok
+INSERT INTO u2 VALUES (20, 20), (40, 40);
+
+statement ok
+CREATE VIEW v1 AS
+SELECT y FROM u1 UNION ALL SELECT y FROM u2  ORDER BY y;
+
+query I
+SELECT * FROM (SELECT y FROM u1 UNION ALL SELECT y FROM u2) ORDER BY y;
+----
+1
+3
+3
+20
+40
+
+query I
+SELECT * FROM v1;
+----
+1
+3
+3
+20
+40
diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt 
b/datafusion/sqllogictest/test_files/union_by_name.slt
index 63a43a36ff..3844dba680 100644
--- a/datafusion/sqllogictest/test_files/union_by_name.slt
+++ b/datafusion/sqllogictest/test_files/union_by_name.slt
@@ -54,13 +54,13 @@ INSERT INTO t2 VALUES (2, 2), (4, 4);
 
 # Test binding
 query I
-SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x;
+SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY x;
 ----
 1
 3
 
 query I
-SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x;
+SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY x;
 ----
 1
 1
@@ -70,13 +70,13 @@ SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 
ORDER BY t1.x;
 3
 
 query I
-SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x;
+SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY x;
 ----
 1
 3
 
 query I
-SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x;
+SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY x;
 ----
 1
 1
@@ -124,8 +124,8 @@ NULL 3
 
 # Ambiguous name
 
-statement error DataFusion error: Schema error: No field named t1.x. Valid 
fields are a, b.
-SELECT x AS a FROM t1 UNION BY NAME SELECT x AS b FROM t1 ORDER BY t1.x;
+statement error DataFusion error: Schema error: No field named x. Valid fields 
are a, b.
+SELECT x AS a FROM t1 UNION BY NAME SELECT x AS b FROM t1 ORDER BY x;
 
 query II
 (SELECT y FROM t1 UNION ALL SELECT x FROM t1) UNION BY NAME (SELECT z FROM t2 
UNION ALL SELECT y FROM t2) ORDER BY y, z;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to