This is an automated email from the ASF dual-hosted git repository.
findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8d006a287c Core: Fix UNION field nullability tracking (#14356)
8d006a287c is described below
commit 8d006a287c653e8305827ad909c407baa7ed59fd
Author: Piotr Findeisen <[email protected]>
AuthorDate: Fri Jan 31 14:08:26 2025 +0100
Core: Fix UNION field nullability tracking (#14356)
* Fix UNION field nullability tracking
This commit fixes two bugs related to UNION handling
- when constructing union plan nullability of the other union branch was
ignored, thus resulting field could easily have incorrect nullability
- when pruning/simplifying projects, in `recompute_schema` function
there was similar logic, thus loosing nullability information even for
correctly constructed Union plan node
As a result, other optimizer logic (e.g. `expr_simplifier.rs`) could
draw incorrect conclusions and thus lead to incorrect query results, as
demonstrated with the attached SLT test.
* Fix formatting
* Skip no-op check
* Fix formatting
* add todo + link
* fixup! add todo + link
---
datafusion/expr/src/logical_plan/builder.rs | 28 ++-----
datafusion/expr/src/logical_plan/plan.rs | 115 +++++++++++++++++++++++++--
datafusion/sqllogictest/test_files/union.slt | 15 ++++
3 files changed, 127 insertions(+), 31 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index c7cff3ac26..b0c28e1455 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{
exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef,
DataFusionError,
- FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema,
- UnnestOptions,
+ Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
};
use datafusion_expr_common::type_coercion::binary::type_union_resolution;
@@ -1518,27 +1517,10 @@ pub fn validate_unique_names<'a>(
/// [`TypeCoercionRewriter::coerce_union`]:
https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
/// [`coerce_union_schema`]:
https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) ->
Result<LogicalPlan> {
- if left_plan.schema().fields().len() != right_plan.schema().fields().len()
{
- return plan_err!(
- "UNION queries have different number of columns: \
- left has {} columns whereas right has {} columns",
- left_plan.schema().fields().len(),
- right_plan.schema().fields().len()
- );
- }
-
- // Temporarily use the schema from the left input and later rely on the
analyzer to
- // coerce the two schemas into a common one.
-
- // Functional Dependencies doesn't preserve after UNION operation
- let schema = (**left_plan.schema()).clone();
- let schema =
-
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);
-
- Ok(LogicalPlan::Union(Union {
- inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
- schema,
- }))
+ Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
+ Arc::new(left_plan),
+ Arc::new(right_plan),
+ ])?))
}
/// Create Projection
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 7e9c0cb75e..446ae94108 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -699,15 +699,13 @@ impl LogicalPlan {
}))
}
LogicalPlan::Union(Union { inputs, schema }) => {
- let input_schema = inputs[0].schema();
- // If inputs are not pruned do not change schema
- // TODO this seems wrong (shouldn't we always use the schema
of the input?)
- let schema = if schema.fields().len() ==
input_schema.fields().len() {
- Arc::clone(&schema)
+ let first_input_schema = inputs[0].schema();
+ if schema.fields().len() == first_input_schema.fields().len() {
+ // If inputs are not pruned do not change schema
+ Ok(LogicalPlan::Union(Union { inputs, schema }))
} else {
- Arc::clone(input_schema)
- };
- Ok(LogicalPlan::Union(Union { inputs, schema }))
+ Ok(LogicalPlan::Union(Union::try_new(inputs)?))
+ }
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
@@ -2645,6 +2643,107 @@ pub struct Union {
pub schema: DFSchemaRef,
}
+impl Union {
+ /// Constructs new Union instance deriving schema from inputs.
+ fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
+ let schema = Self::derive_schema_from_inputs(&inputs, false)?;
+ Ok(Union { inputs, schema })
+ }
+
+ /// Constructs new Union instance deriving schema from inputs.
+ /// Inputs do not have to have matching types and produced schema will
+ /// take type from the first input.
+ // TODO (https://github.com/apache/datafusion/issues/14380): Avoid
creating uncoerced union at all.
+ pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) ->
Result<Self> {
+ let schema = Self::derive_schema_from_inputs(&inputs, true)?;
+ Ok(Union { inputs, schema })
+ }
+
+ /// Constructs new Union instance deriving schema from inputs.
+ ///
+ /// `loose_types` if true, inputs do not have to have matching types and
produced schema will
+ /// take type from the first input. TODO
(<https://github.com/apache/datafusion/issues/14380>) this is not necessarily
reasonable behavior.
+ fn derive_schema_from_inputs(
+ inputs: &[Arc<LogicalPlan>],
+ loose_types: bool,
+ ) -> Result<DFSchemaRef> {
+ if inputs.len() < 2 {
+ return plan_err!("UNION requires at least two inputs");
+ }
+ let first_schema = inputs[0].schema();
+ let fields_count = first_schema.fields().len();
+ for input in inputs.iter().skip(1) {
+ if fields_count != input.schema().fields().len() {
+ return plan_err!(
+ "UNION queries have different number of columns: \
+ left has {} columns whereas right has {} columns",
+ fields_count,
+ input.schema().fields().len()
+ );
+ }
+ }
+
+ let union_fields = (0..fields_count)
+ .map(|i| {
+ let fields = inputs
+ .iter()
+ .map(|input| input.schema().field(i))
+ .collect::<Vec<_>>();
+ let first_field = fields[0];
+ let name = first_field.name();
+ let data_type = if loose_types {
+ // TODO apply type coercion here, or document why it's
better to defer
+ // temporarily use the data type from the left input and
later rely on the analyzer to
+ // coerce the two schemas into a common one.
+ first_field.data_type()
+ } else {
+ fields.iter().skip(1).try_fold(
+ first_field.data_type(),
+ |acc, field| {
+ if acc != field.data_type() {
+ return plan_err!(
+ "UNION field {i} have different type in
inputs: \
+ left has {} whereas right has {}",
+ first_field.data_type(),
+ field.data_type()
+ );
+ }
+ Ok(acc)
+ },
+ )?
+ };
+ let nullable = fields.iter().any(|field| field.is_nullable());
+ let mut field = Field::new(name, data_type.clone(), nullable);
+ let field_metadata =
+ intersect_maps(fields.iter().map(|field|
field.metadata()));
+ field.set_metadata(field_metadata);
+ // TODO reusing table reference from the first schema is
probably wrong
+ let table_reference =
first_schema.qualified_field(i).0.cloned();
+ Ok((table_reference, Arc::new(field)))
+ })
+ .collect::<Result<_>>()?;
+ let union_schema_metadata =
+ intersect_maps(inputs.iter().map(|input|
input.schema().metadata()));
+
+ // Functional Dependencies doesn't preserve after UNION operation
+ let schema = DFSchema::new_with_metadata(union_fields,
union_schema_metadata)?;
+ let schema = Arc::new(schema);
+
+ Ok(schema)
+ }
+}
+
+fn intersect_maps<'a>(
+ inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
+) -> HashMap<String, String> {
+ let mut inputs = inputs.into_iter();
+ let mut merged: HashMap<String, String> =
inputs.next().cloned().unwrap_or_default();
+ for input in inputs {
+ merged.retain(|k, v| input.get(k) == Some(v));
+ }
+ merged
+}
+
// Manual implementation needed because of `schema` field. Comparison excludes
this field.
impl PartialOrd for Union {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index 352c01ca29..cbd19bf380 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -836,3 +836,18 @@ physical_plan
# Clean up after the test
statement ok
drop table aggregate_test_100;
+
+# test for https://github.com/apache/datafusion/issues/14352
+query TB rowsort
+SELECT
+ a,
+ a IS NOT NULL
+FROM (
+ -- second column, even though it's not selected, was necessary to
reproduce the bug linked above
+ SELECT 'foo' AS a, 3 AS b
+ UNION ALL
+ SELECT NULL AS a, 4 AS b
+)
+----
+NULL false
+foo true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]