This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new bd249ba5b9 Add documentation for UNION schema handling. (#17248) bd249ba5b9 is described below commit bd249ba5b9d9818ce82e515e8dcb1c83c054e32e Author: wiedld <wie...@users.noreply.github.com> AuthorDate: Fri Aug 22 12:50:50 2025 -0700 Add documentation for UNION schema handling. (#17248) * refactor: consolidate and document union intsersection of field metadata * chore: document how schemas are unioned in the analyzer * chore: add concrete examples, linking to datafusion-examples * inline and rename function * complete rename --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion/expr/src/expr.rs | 59 ++++++++++++++++++++++ datafusion/expr/src/logical_plan/plan.rs | 37 +++++--------- datafusion/optimizer/src/analyzer/type_coercion.rs | 37 ++++++++++++++ 3 files changed, 110 insertions(+), 23 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 2324ae79c0..0eef8a00a9 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -642,6 +642,65 @@ impl From<&HashMap<String, String>> for FieldMetadata { } } +/// The metadata used in [`Field::metadata`]. +/// +/// This represents the metadata associated with an Arrow [`Field`]. The metadata consists of key-value pairs. +/// +/// # Common Use Cases +/// +/// Field metadata is commonly used to store: +/// - Default values for columns when data is missing +/// - Column descriptions or documentation +/// - Data lineage information +/// - Custom application-specific annotations +/// - Encoding hints or display formatting preferences +/// +/// # Example: Storing Default Values +/// +/// A practical example of using field metadata is storing default values for columns +/// that may be missing in the physical data but present in the logical schema. +/// See the [default_column_values.rs] example implementation. +/// +/// [default_column_values.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/default_column_values.rs +pub type SchemaFieldMetadata = std::collections::HashMap<String, String>; + +/// Intersects multiple metadata instances for UNION operations. +/// +/// This function implements the intersection strategy used by UNION operations, +/// where only metadata keys that exist in ALL inputs with identical values +/// are preserved in the result. +/// +/// # Union Metadata Behavior +/// +/// Union operations require consistent metadata across all branches: +/// - Only metadata keys present in ALL union branches are kept +/// - For each kept key, the value must be identical across all branches +/// - If a key has different values across branches, it is excluded from the result +/// - If any input has no metadata, the result will be empty +/// +/// # Arguments +/// +/// * `metadatas` - An iterator of `SchemaFieldMetadata` instances to intersect +/// +/// # Returns +/// +/// A new `SchemaFieldMetadata` containing only the intersected metadata +pub fn intersect_metadata_for_union<'a>( + metadatas: impl IntoIterator<Item = &'a SchemaFieldMetadata>, +) -> SchemaFieldMetadata { + let mut metadatas = metadatas.into_iter(); + let Some(mut intersected) = metadatas.next().cloned() else { + return Default::default(); + }; + + for metadata in metadatas { + // Only keep keys that exist in both with the same value + intersected.retain(|k, v| metadata.get(k) == Some(v)); + } + + intersected +} + /// UNNEST expression. #[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] pub struct Unnest { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 8df6c22db6..887afd7cde 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -31,7 +31,10 @@ use super::invariants::{ }; use super::DdlStatement; use crate::builder::{change_redundant_column, unnest_with_options}; -use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams}; +use crate::expr::{ + intersect_metadata_for_union, Placeholder, Sort as SortExpr, WindowFunction, + WindowFunctionParams, +}; use crate::expr_rewriter::{ create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver, }; @@ -2799,15 +2802,16 @@ impl Union { let mut field = Field::new(name, data_type.clone(), final_is_nullable); - field.set_metadata(intersect_maps(unmerged_metadata)); + field.set_metadata(intersect_metadata_for_union(unmerged_metadata)); (None, Arc::new(field)) }, ) .collect::<Vec<(Option<TableReference>, _)>>(); - let union_schema_metadata = - intersect_maps(inputs.iter().map(|input| input.schema().metadata())); + let union_schema_metadata = intersect_metadata_for_union( + inputs.iter().map(|input| input.schema().metadata()), + ); // Functional Dependencies are not preserved after UNION operation let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?; @@ -2876,14 +2880,16 @@ impl Union { }; let mut field = Field::new(&name, data_type.clone(), nullable); - let field_metadata = - intersect_maps(fields.iter().map(|field| field.metadata())); + let field_metadata = intersect_metadata_for_union( + fields.iter().map(|field| field.metadata()), + ); field.set_metadata(field_metadata); Ok((None, Arc::new(field))) }) .collect::<Result<_>>()?; - let union_schema_metadata = - intersect_maps(inputs.iter().map(|input| input.schema().metadata())); + let union_schema_metadata = intersect_metadata_for_union( + inputs.iter().map(|input| input.schema().metadata()), + ); // Functional Dependencies are not preserved after UNION operation let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?; @@ -2893,21 +2899,6 @@ impl Union { } } -fn intersect_maps<'a>( - inputs: impl IntoIterator<Item = &'a HashMap<String, String>>, -) -> HashMap<String, String> { - let mut inputs = inputs.into_iter(); - let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default(); - for input in inputs { - // The extra dereference below (`&*v`) is a workaround for https://github.com/rkyv/rkyv/issues/434. - // When this crate is used in a workspace that enables the `rkyv-64` feature in the `chrono` crate, - // this triggers a Rust compilation error: - // error[E0277]: can't compare `Option<&std::string::String>` with `Option<&mut std::string::String>`. - merged.retain(|k, v| input.get(k) == Some(&*v)); - } - merged -} - // Manual implementation needed because of `schema` field. Comparison excludes this field. impl PartialOrd for Union { fn partial_cmp(&self, other: &Self) -> Option<Ordering> { diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 7b4726c309..7d4920a6cb 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -957,6 +957,43 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result<Case> { /// /// This method presumes that the wildcard expansion is unneeded, or has already /// been applied. +/// +/// ## Schema and Field Handling in Union Coercion +/// +/// **Processing order**: The function starts with the base schema (first input) and then +/// processes remaining inputs sequentially, with later inputs taking precedence in merging. +/// +/// **Schema-level metadata merging**: Later schemas take precedence for duplicate keys. +/// +/// **Field-level metadata merging**: Later fields take precedence for duplicate metadata keys. +/// +/// **Type coercion precedence**: The coerced type is determined by iteratively applying +/// `comparison_coercion()` between the accumulated type and each new input's type. The +/// result depends on type coercion rules, not input order. +/// +/// **Nullability merging**: Nullability is accumulated using logical OR (`||`). +/// Once any input field is nullable, the result field becomes nullable permanently. +/// Later inputs can make a field nullable but cannot make it non-nullable. +/// +/// **Field precedence**: Field names come from the first (base) schema, but the field properties +/// (nullability and field-level metadata) have later schemas taking precedence. +/// +/// **Example**: +/// ```sql +/// SELECT a, b FROM table1 -- a: Int32, metadata {"source": "t1"}, nullable=false +/// UNION +/// SELECT a, b FROM table2 -- a: Int64, metadata {"source": "t2"}, nullable=true +/// UNION +/// SELECT a, b FROM table3 -- a: Int32, metadata {"encoding": "utf8"}, nullable=false +/// -- Result: +/// -- a: Int64 (from type coercion), nullable=true (from table2), +/// -- metadata: {"source": "t2", "encoding": "utf8"} (later inputs take precedence) +/// ``` +/// +/// **Precedence Summary**: +/// - **Datatypes**: Determined by `comparison_coercion()` rules, not input order +/// - **Nullability**: Later inputs can add nullability but cannot remove it (logical OR) +/// - **Metadata**: Later inputs take precedence for same keys (HashMap::extend semantics) pub fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> Result<DFSchema> { coerce_union_schema_with_schema(&inputs[1..], inputs[0].schema()) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org