This is an automated email from the ASF dual-hosted git repository.

findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d006a287c Core: Fix UNION field nullability tracking (#14356)
8d006a287c is described below

commit 8d006a287c653e8305827ad909c407baa7ed59fd
Author: Piotr Findeisen <[email protected]>
AuthorDate: Fri Jan 31 14:08:26 2025 +0100

    Core: Fix UNION field nullability tracking (#14356)
    
    * Fix UNION field nullability tracking
    
    This commit fixes two bugs related to UNION handling
    
    - when constructing union plan nullability of the other union branch was
      ignored, thus resulting field could easily have incorrect nullability
    - when pruning/simplifying projects, in `recompute_schema` function
      there was similar logic, thus loosing nullability information even for
      correctly constructed Union plan node
    
    As a result, other optimizer logic (e.g. `expr_simplifier.rs`) could
    draw incorrect conclusions and thus lead to incorrect query results, as
    demonstrated with the attached SLT test.
    
    * Fix formatting
    
    * Skip no-op check
    
    * Fix formatting
    
    * add todo + link
    
    * fixup! add todo + link
---
 datafusion/expr/src/logical_plan/builder.rs  |  28 ++-----
 datafusion/expr/src/logical_plan/plan.rs     | 115 +++++++++++++++++++++++++--
 datafusion/sqllogictest/test_files/union.slt |  15 ++++
 3 files changed, 127 insertions(+), 31 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index c7cff3ac26..b0c28e1455 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{
     exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
     plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, 
DataFusionError,
-    FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema,
-    UnnestOptions,
+    Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
 };
 use datafusion_expr_common::type_coercion::binary::type_union_resolution;
 
@@ -1518,27 +1517,10 @@ pub fn validate_unique_names<'a>(
 /// [`TypeCoercionRewriter::coerce_union`]: 
https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
 /// [`coerce_union_schema`]: 
https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
 pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> 
Result<LogicalPlan> {
-    if left_plan.schema().fields().len() != right_plan.schema().fields().len() 
{
-        return plan_err!(
-            "UNION queries have different number of columns: \
-            left has {} columns whereas right has {} columns",
-            left_plan.schema().fields().len(),
-            right_plan.schema().fields().len()
-        );
-    }
-
-    // Temporarily use the schema from the left input and later rely on the 
analyzer to
-    // coerce the two schemas into a common one.
-
-    // Functional Dependencies doesn't preserve after UNION operation
-    let schema = (**left_plan.schema()).clone();
-    let schema =
-        
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);
-
-    Ok(LogicalPlan::Union(Union {
-        inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
-        schema,
-    }))
+    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
+        Arc::new(left_plan),
+        Arc::new(right_plan),
+    ])?))
 }
 
 /// Create Projection
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 7e9c0cb75e..446ae94108 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -699,15 +699,13 @@ impl LogicalPlan {
                 }))
             }
             LogicalPlan::Union(Union { inputs, schema }) => {
-                let input_schema = inputs[0].schema();
-                // If inputs are not pruned do not change schema
-                // TODO this seems wrong (shouldn't we always use the schema 
of the input?)
-                let schema = if schema.fields().len() == 
input_schema.fields().len() {
-                    Arc::clone(&schema)
+                let first_input_schema = inputs[0].schema();
+                if schema.fields().len() == first_input_schema.fields().len() {
+                    // If inputs are not pruned do not change schema
+                    Ok(LogicalPlan::Union(Union { inputs, schema }))
                 } else {
-                    Arc::clone(input_schema)
-                };
-                Ok(LogicalPlan::Union(Union { inputs, schema }))
+                    Ok(LogicalPlan::Union(Union::try_new(inputs)?))
+                }
             }
             LogicalPlan::Distinct(distinct) => {
                 let distinct = match distinct {
@@ -2645,6 +2643,107 @@ pub struct Union {
     pub schema: DFSchemaRef,
 }
 
+impl Union {
+    /// Constructs new Union instance deriving schema from inputs.
+    fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
+        let schema = Self::derive_schema_from_inputs(&inputs, false)?;
+        Ok(Union { inputs, schema })
+    }
+
+    /// Constructs new Union instance deriving schema from inputs.
+    /// Inputs do not have to have matching types and produced schema will
+    /// take type from the first input.
+    // TODO (https://github.com/apache/datafusion/issues/14380): Avoid 
creating uncoerced union at all.
+    pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> 
Result<Self> {
+        let schema = Self::derive_schema_from_inputs(&inputs, true)?;
+        Ok(Union { inputs, schema })
+    }
+
+    /// Constructs new Union instance deriving schema from inputs.
+    ///
+    /// `loose_types` if true, inputs do not have to have matching types and 
produced schema will
+    /// take type from the first input. TODO 
(<https://github.com/apache/datafusion/issues/14380>) this is not necessarily 
reasonable behavior.
+    fn derive_schema_from_inputs(
+        inputs: &[Arc<LogicalPlan>],
+        loose_types: bool,
+    ) -> Result<DFSchemaRef> {
+        if inputs.len() < 2 {
+            return plan_err!("UNION requires at least two inputs");
+        }
+        let first_schema = inputs[0].schema();
+        let fields_count = first_schema.fields().len();
+        for input in inputs.iter().skip(1) {
+            if fields_count != input.schema().fields().len() {
+                return plan_err!(
+                    "UNION queries have different number of columns: \
+                    left has {} columns whereas right has {} columns",
+                    fields_count,
+                    input.schema().fields().len()
+                );
+            }
+        }
+
+        let union_fields = (0..fields_count)
+            .map(|i| {
+                let fields = inputs
+                    .iter()
+                    .map(|input| input.schema().field(i))
+                    .collect::<Vec<_>>();
+                let first_field = fields[0];
+                let name = first_field.name();
+                let data_type = if loose_types {
+                    // TODO apply type coercion here, or document why it's 
better to defer
+                    // temporarily use the data type from the left input and 
later rely on the analyzer to
+                    // coerce the two schemas into a common one.
+                    first_field.data_type()
+                } else {
+                    fields.iter().skip(1).try_fold(
+                        first_field.data_type(),
+                        |acc, field| {
+                            if acc != field.data_type() {
+                                return plan_err!(
+                                    "UNION field {i} have different type in 
inputs: \
+                                    left has {} whereas right has {}",
+                                    first_field.data_type(),
+                                    field.data_type()
+                                );
+                            }
+                            Ok(acc)
+                        },
+                    )?
+                };
+                let nullable = fields.iter().any(|field| field.is_nullable());
+                let mut field = Field::new(name, data_type.clone(), nullable);
+                let field_metadata =
+                    intersect_maps(fields.iter().map(|field| 
field.metadata()));
+                field.set_metadata(field_metadata);
+                // TODO reusing table reference from the first schema is 
probably wrong
+                let table_reference = 
first_schema.qualified_field(i).0.cloned();
+                Ok((table_reference, Arc::new(field)))
+            })
+            .collect::<Result<_>>()?;
+        let union_schema_metadata =
+            intersect_maps(inputs.iter().map(|input| 
input.schema().metadata()));
+
+        // Functional Dependencies doesn't preserve after UNION operation
+        let schema = DFSchema::new_with_metadata(union_fields, 
union_schema_metadata)?;
+        let schema = Arc::new(schema);
+
+        Ok(schema)
+    }
+}
+
+fn intersect_maps<'a>(
+    inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
+) -> HashMap<String, String> {
+    let mut inputs = inputs.into_iter();
+    let mut merged: HashMap<String, String> = 
inputs.next().cloned().unwrap_or_default();
+    for input in inputs {
+        merged.retain(|k, v| input.get(k) == Some(v));
+    }
+    merged
+}
+
 // Manual implementation needed because of `schema` field. Comparison excludes 
this field.
 impl PartialOrd for Union {
     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index 352c01ca29..cbd19bf380 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -836,3 +836,18 @@ physical_plan
 # Clean up after the test
 statement ok
 drop table aggregate_test_100;
+
+# test for https://github.com/apache/datafusion/issues/14352
+query TB rowsort
+SELECT
+    a,
+    a IS NOT NULL
+FROM (
+    -- second column, even though it's not selected, was necessary to 
reproduce the bug linked above
+    SELECT 'foo' AS a, 3 AS b
+    UNION ALL
+    SELECT NULL AS a, 4 AS b
+)
+----
+NULL false
+foo true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to