This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fd97799ddc Make Physical CastExpr Field-aware and unify cast semantics
across physical expressions (#20814)
fd97799ddc is described below
commit fd97799ddc06347e25b0a2285a6b76e1c0d887c6
Author: kosiew <[email protected]>
AuthorDate: Tue Mar 10 09:15:51 2026 +0800
Make Physical CastExpr Field-aware and unify cast semantics across physical
expressions (#20814)
## Which issue does this PR close?
* Part of #20164
## Rationale for this change
Physical `CastExpr` previously stored only a target `DataType`. This
caused field-level semantics (name, nullability, and metadata) to be
lost when casts were represented in the physical layer. In contrast,
logical expressions already carry this information through `FieldRef`.
This mismatch created several issues:
* Physical and logical cast representations diverged in how they
preserve schema semantics.
* Struct casting logic behaved differently depending on whether the cast
was represented as `CastExpr` or `CastColumnExpr`.
* Downstream components (such as schema rewriting and ordering
equivalence analysis) required additional branching and duplicated
logic.
Making `CastExpr` field-aware aligns the physical representation with
logical semantics and enables consistent schema propagation across
execution planning and expression evaluation.
## What changes are included in this PR?
This PR introduces field-aware semantics to `CastExpr` and simplifies
several areas that previously relied on type-only casting.
Key changes include:
1. **Field-aware CastExpr**
* Replace the `cast_type: DataType` field with `target_field: FieldRef`.
* Add `new_with_target_field` constructor to explicitly construct
field-aware casts.
* Keep the existing `new(expr, DataType)` constructor as a compatibility
shim that creates a canonical field.
2. **Return-field and nullability behavior**
* `return_field` now returns the full `target_field`, preserving name,
nullability, and metadata.
* `nullable()` now derives its result from the resolved target field
rather than the input expression.
* Add compatibility logic for legacy type-only casts to preserve
previous behavior.
3. **Struct cast validation improvements**
* Struct-to-struct casting now validates compatibility using field
information before execution.
* Planning-time validation prevents unsupported casts from reaching
execution.
4. **Shared cast property logic**
* Introduce shared helper functions (`cast_expr_properties`,
`is_order_preserving_cast_family`) for determining ordering
preservation.
* Reuse this logic in both `CastExpr` and `CastColumnExpr` to avoid
duplicated implementations.
5. **Schema rewriter improvements**
* Refactor physical column resolution into `resolve_physical_column`.
* Simplify cast insertion logic when logical and physical fields differ.
* Pass explicit physical and logical fields to cast creation for
improved correctness.
6. **Ordering equivalence simplification**
* Introduce `substitute_cast_like_ordering` helper to unify handling of
`CastExpr` and `CastColumnExpr` in ordering equivalence analysis.
7. **Additional unit tests**
* Validate metadata propagation through `return_field`.
* Verify nullability behavior for field-aware casts.
* Ensure legacy type-only casts preserve existing semantics.
* Test struct-cast validation with nested field semantics.
## Are these changes tested?
Yes.
New unit tests were added in `physical-expr/src/expressions/cast.rs` to
verify:
* Metadata propagation through field-aware casts
* Correct nullability behavior derived from the target field
* Backward compatibility with legacy type-only constructors
* Struct cast compatibility validation using nested fields
Existing tests continue to pass and validate compatibility with the
previous API behavior.
## Are there any user-facing changes?
There are no direct user-facing behavior changes.
This change primarily improves internal schema semantics and consistency
in the physical expression layer. Existing APIs remain compatible
through the legacy constructor that accepts only a `DataType`.
## LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated
content has been manually reviewed and tested.
---
.../physical-expr-adapter/src/schema_rewriter.rs | 122 +++++-----
.../src/equivalence/properties/mod.rs | 65 +++---
datafusion/physical-expr/src/expressions/cast.rs | 260 +++++++++++++++++----
.../physical-expr/src/expressions/cast_column.rs | 16 +-
4 files changed, 319 insertions(+), 144 deletions(-)
diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
index e1f45bb0d1..a2a45cbdfe 100644
--- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
+++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow::compute::can_cast_types;
-use arrow::datatypes::{DataType, Field, SchemaRef};
+use arrow::datatypes::{DataType, Field, FieldRef, SchemaRef};
use datafusion_common::{
Result, ScalarValue, exec_err,
metadata::FieldMetadata,
@@ -404,71 +404,76 @@ impl DefaultPhysicalExprAdapterRewriter {
}
};
- // Check if the column exists in the physical schema
- let physical_column_index = match self
- .physical_file_schema
- .index_of(column.name())
- {
- Ok(index) => index,
- Err(_) => {
- if !logical_field.is_nullable() {
- return exec_err!(
- "Non-nullable column '{}' is missing from the physical
schema",
- column.name()
- );
- }
- // If the column is missing from the physical schema fill it
in with nulls.
- // For a different behavior, provide a custom
`PhysicalExprAdapter` implementation.
- let null_value =
ScalarValue::Null.cast_to(logical_field.data_type())?;
- return Ok(Transformed::yes(Arc::new(
- expressions::Literal::new_with_metadata(
- null_value,
- Some(FieldMetadata::from(logical_field)),
- ),
- )));
+ let Some((resolved_column, physical_field)) =
+ self.resolve_physical_column(column)?
+ else {
+ if !logical_field.is_nullable() {
+ return exec_err!(
+ "Non-nullable column '{}' is missing from the physical
schema",
+ column.name()
+ );
}
+ // If the column is missing from the physical schema fill it in
with nulls.
+ // For a different behavior, provide a custom
`PhysicalExprAdapter` implementation.
+ let null_value =
ScalarValue::Null.cast_to(logical_field.data_type())?;
+ return Ok(Transformed::yes(Arc::new(
+ expressions::Literal::new_with_metadata(
+ null_value,
+ Some(FieldMetadata::from(logical_field)),
+ ),
+ )));
};
- let physical_field =
self.physical_file_schema.field(physical_column_index);
- if column.index() == physical_column_index && logical_field ==
physical_field {
+ if resolved_column.index() == column.index()
+ && logical_field == physical_field.as_ref()
+ {
return Ok(Transformed::no(expr));
}
- let column = self.resolve_column(column, physical_column_index)?;
-
- if logical_field == physical_field {
+ if logical_field == physical_field.as_ref() {
// If the fields match (including metadata/nullability), we can
use the column as is
- return Ok(Transformed::yes(Arc::new(column)));
+ return Ok(Transformed::yes(Arc::new(resolved_column)));
}
- if logical_field.data_type() == physical_field.data_type() {
- // The data type matches, but the field metadata / nullability
differs.
- // Emit a CastColumnExpr so downstream schema construction uses
the logical field.
- return self.create_cast_column_expr(column, logical_field);
- }
-
- // We need to cast the column to the logical data type
+ // We need a cast expression whenever the logical and physical fields
differ,
+ // whether that difference is only metadata/nullability or also data
type.
// TODO: add optimization to move the cast from the column to literal
expressions in the case of `col = 123`
// since that's much cheaper to evalaute.
// See
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
- self.create_cast_column_expr(column, logical_field)
+ self.create_cast_column_expr(resolved_column, physical_field,
logical_field)
}
- /// Resolves a column expression, handling index and type mismatches.
- ///
- /// Returns the appropriate Column expression when the column's index or
data type
- /// don't match the physical schema. Assumes that the early-exit case
(both index
- /// and type match) has already been checked by the caller.
- fn resolve_column(
+ /// Resolves a logical column to the corresponding physical column and
field.
+ fn resolve_physical_column(
&self,
column: &Column,
- physical_column_index: usize,
- ) -> Result<Column> {
- if column.index() == physical_column_index {
- Ok(column.clone())
+ ) -> Result<Option<(Column, FieldRef)>> {
+ // The physical schema adaptation step intentionally resolves columns
by **name first**
+ // rather than trusting the incoming index. This mirrors what the old
refactoring
+ // did before `resolve_physical_column()` was extracted: the planner
might hand us a
+ // `Column` whose `index` field is stale (e.g. after projection/rename
rewrites), so
+ // resolving by name ensures we match the correct physical slot. Once
we know the
+ // proper index we rebuild the `Column` with `new_with_schema` so
callers can rely
+ // on `column.index()` later without having to re-query the schema.
+ let Ok(physical_column_index) =
self.physical_file_schema.index_of(column.name())
+ else {
+ return Ok(None);
+ };
+
+ let column = if column.index() == physical_column_index {
+ column.clone()
} else {
- Column::new_with_schema(column.name(),
self.physical_file_schema.as_ref())
- }
+ Column::new_with_schema(column.name(),
self.physical_file_schema.as_ref())?
+ };
+
+ Ok(Some((
+ column,
+ Arc::new(
+ self.physical_file_schema
+ .field(physical_column_index)
+ .clone(),
+ ),
+ )))
}
/// Validates type compatibility and creates a CastColumnExpr if needed.
@@ -479,19 +484,15 @@ impl DefaultPhysicalExprAdapterRewriter {
fn create_cast_column_expr(
&self,
column: Column,
+ physical_field: FieldRef,
logical_field: &Field,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
- // Look up the column index in the physical schema by name to ensure
correctness.
- let physical_column_index =
self.physical_file_schema.index_of(column.name())?;
- let actual_physical_field =
- self.physical_file_schema.field(physical_column_index);
-
// For struct types, use validate_struct_compatibility which handles:
// - Missing fields in source (filled with nulls)
// - Extra fields in source (ignored)
// - Recursive validation of nested structs
// For non-struct types, use Arrow's can_cast_types
- match (actual_physical_field.data_type(), logical_field.data_type()) {
+ match (physical_field.data_type(), logical_field.data_type()) {
(DataType::Struct(physical_fields),
DataType::Struct(logical_fields)) => {
validate_struct_compatibility(
physical_fields.as_ref(),
@@ -499,15 +500,13 @@ impl DefaultPhysicalExprAdapterRewriter {
)?;
}
_ => {
- let is_compatible = can_cast_types(
- actual_physical_field.data_type(),
- logical_field.data_type(),
- );
+ let is_compatible =
+ can_cast_types(physical_field.data_type(),
logical_field.data_type());
if !is_compatible {
return exec_err!(
"Cannot cast column '{}' from '{}' (physical data
type) to '{}' (logical data type)",
column.name(),
- actual_physical_field.data_type(),
+ physical_field.data_type(),
logical_field.data_type()
);
}
@@ -516,7 +515,7 @@ impl DefaultPhysicalExprAdapterRewriter {
let cast_expr = Arc::new(CastColumnExpr::new(
Arc::new(column),
- Arc::new(actual_physical_field.clone()),
+ physical_field,
Arc::new(logical_field.clone()),
None,
));
@@ -1604,6 +1603,7 @@ mod tests {
let transformed = rewriter
.create_cast_column_expr(
Column::new("a", 0),
+
Arc::new(physical_schema.field_with_name("a").unwrap().clone()),
logical_schema.field_with_name("a").unwrap(),
)
.unwrap();
diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs
b/datafusion/physical-expr/src/equivalence/properties/mod.rs
index e9ed0b226f..1ca4ead033 100644
--- a/datafusion/physical-expr/src/equivalence/properties/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs
@@ -39,7 +39,7 @@ use crate::{
PhysicalSortRequirement,
};
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Constraint, Constraints, HashMap, Result, plan_err};
use datafusion_expr::interval_arithmetic::Interval;
@@ -195,6 +195,39 @@ impl OrderingEquivalenceCache {
}
impl EquivalenceProperties {
+ /// Helper used by the ordering equivalence rule when considering whether a
+ /// cast-bearing expression can replace an existing sort key without
invalidating
+ /// the ordering.
+ ///
+ /// This function handles *both* `CastExpr` (generic cast) and
+ /// `CastColumnExpr` (field-aware cast) because the planner may introduce
either
+ /// form during rewrite steps; the core logic is the same in both cases.
The
+ /// substitution is only allowed when the cast wraps **the very same child
+ /// expression** that the original sort used (an exact-child-match
invariant),
+ /// and the casted type must be a widening/order-preserving conversion
+ /// `CastExpr::check_bigger_cast(...)` ensures. Without those
restrictions the
+ /// existing sort order could be violated (e.g. a narrowing cast could
collapse
+ /// distinct values together).
+ fn substitute_cast_like_ordering(
+ r_expr: Arc<dyn PhysicalExpr>,
+ sort_expr: &PhysicalSortExpr,
+ expr_type: &DataType,
+ ) -> Option<PhysicalSortExpr> {
+ let (child_expr, cast_type) = if let Some(cast_expr) =
+ r_expr.as_any().downcast_ref::<CastExpr>()
+ {
+ (cast_expr.expr(), cast_expr.cast_type())
+ } else if let Some(cast_expr) =
r_expr.as_any().downcast_ref::<CastColumnExpr>() {
+ (cast_expr.expr(), cast_expr.target_field().data_type())
+ } else {
+ return None;
+ };
+
+ (child_expr.eq(&sort_expr.expr)
+ && CastExpr::check_bigger_cast(cast_type, expr_type))
+ .then(|| PhysicalSortExpr::new(r_expr, sort_expr.options))
+ }
+
/// Creates an empty `EquivalenceProperties` object.
pub fn new(schema: SchemaRef) -> Self {
Self {
@@ -844,32 +877,10 @@ impl EquivalenceProperties {
let expr_type = sort_expr.expr.data_type(schema).unwrap();
// TODO: Add one-to-one analysis for ScalarFunctions.
for r_expr in referring_exprs {
- // We check whether this expression is substitutable.
- if let Some(cast_expr) =
- r_expr.as_any().downcast_ref::<CastExpr>()
- {
- // For casts, we need to know whether the cast
- // expression matches:
- if cast_expr.expr.eq(&sort_expr.expr)
- && cast_expr.is_bigger_cast(&expr_type)
- {
- result.push(PhysicalSortExpr::new(
- r_expr,
- sort_expr.options,
- ));
- }
- } else if let Some(cast_expr) =
- r_expr.as_any().downcast_ref::<CastColumnExpr>()
- {
- let cast_type =
cast_expr.target_field().data_type();
- if cast_expr.expr().eq(&sort_expr.expr)
- && CastExpr::check_bigger_cast(cast_type,
&expr_type)
- {
- result.push(PhysicalSortExpr::new(
- r_expr,
- sort_expr.options,
- ));
- }
+ if let Some(substituted) =
Self::substitute_cast_like_ordering(
+ r_expr, &sort_expr, &expr_type,
+ ) {
+ result.push(substituted);
}
}
result.push(sort_expr);
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr/src/expressions/cast.rs
index 2d44215cf2..5a80daf663 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -25,6 +25,7 @@ use crate::physical_expr::PhysicalExpr;
use arrow::compute::{CastOptions, can_cast_types};
use arrow::datatypes::{DataType, DataType::*, FieldRef, Schema};
use arrow::record_batch::RecordBatch;
+use datafusion_common::datatype::DataTypeExt;
use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;
use datafusion_common::nested_struct::validate_struct_compatibility;
use datafusion_common::{Result, not_impl_err};
@@ -63,8 +64,8 @@ fn can_cast_struct_types(source: &DataType, target:
&DataType) -> bool {
pub struct CastExpr {
/// The expression to cast
pub expr: Arc<dyn PhysicalExpr>,
- /// The data type to cast to
- cast_type: DataType,
+ /// Field metadata describing the desired output after casting
+ target_field: FieldRef,
/// Cast options
cast_options: CastOptions<'static>,
}
@@ -73,7 +74,7 @@ pub struct CastExpr {
impl PartialEq for CastExpr {
fn eq(&self, other: &Self) -> bool {
self.expr.eq(&other.expr)
- && self.cast_type.eq(&other.cast_type)
+ && self.target_field.eq(&other.target_field)
&& self.cast_options.eq(&other.cast_options)
}
}
@@ -81,21 +82,55 @@ impl PartialEq for CastExpr {
impl Hash for CastExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.expr.hash(state);
- self.cast_type.hash(state);
+ self.target_field.hash(state);
self.cast_options.hash(state);
}
}
impl CastExpr {
- /// Create a new CastExpr
+ /// Create a new `CastExpr` using only a `DataType`.
+ ///
+ /// This constructor is provided for compatibility with existing call sites
+ /// that only know the target type. It synthesizes a ``Field`` with the
+ /// given type (**nullable by default**) and no name metadata. Callers
that
+ /// already have a `FieldRef` (for example, coming from schema inference
or a
+ /// resolved column) should prefer [`CastExpr::new_with_target_field`],
which
+ /// preserves the field's name, nullability, and other metadata. In other
+ /// words:
+ ///
+ /// * use `new()` when only a `DataType` is available and you want the
legacy
+ /// semantics of a type-only cast
+ /// * use `new_with_target_field()` when you need explicit field
+ /// metadata/name/nullability preserved
pub fn new(
expr: Arc<dyn PhysicalExpr>,
cast_type: DataType,
cast_options: Option<CastOptions<'static>>,
+ ) -> Self {
+ Self::new_with_target_field(
+ expr,
+ cast_type.into_nullable_field_ref(),
+ cast_options,
+ )
+ }
+
+ /// Create a new `CastExpr` with an explicit target `FieldRef`.
+ ///
+ /// The provided `target_field` is used verbatim for the expression's
+ /// return schema, so the field's name, nullability, and other metadata are
+ /// preserved. This is the preferred constructor when the caller already
+ /// has field information (for example, during logical-to-physical
planning).
+ ///
+ /// See [`CastExpr::new`] for the compatibility constructor that only
accepts
+ /// a `DataType`.
+ pub fn new_with_target_field(
+ expr: Arc<dyn PhysicalExpr>,
+ target_field: FieldRef,
+ cast_options: Option<CastOptions<'static>>,
) -> Self {
Self {
expr,
- cast_type,
+ target_field,
cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS),
}
}
@@ -107,7 +142,12 @@ impl CastExpr {
/// The data type to cast to
pub fn cast_type(&self) -> &DataType {
- &self.cast_type
+ self.target_field.data_type()
+ }
+
+ /// Field metadata describing the output column after casting.
+ pub fn target_field(&self) -> &FieldRef {
+ &self.target_field
}
/// The cast options
@@ -115,6 +155,27 @@ impl CastExpr {
&self.cast_options
}
+ fn is_default_target_field(&self) -> bool {
+ self.target_field.name().is_empty()
+ && self.target_field.is_nullable()
+ && self.target_field.metadata().is_empty()
+ }
+
+ fn resolved_target_field(&self, input_schema: &Schema) -> Result<FieldRef>
{
+ if self.is_default_target_field() {
+ self.expr.return_field(input_schema).map(|field| {
+ Arc::new(
+ field
+ .as_ref()
+ .clone()
+ .with_data_type(self.cast_type().clone()),
+ )
+ })
+ } else {
+ Ok(Arc::clone(&self.target_field))
+ }
+ }
+
/// Check if casting from the specified source type to the target type is a
/// widening cast (e.g. from `Int8` to `Int16`).
pub fn check_bigger_cast(cast_type: &DataType, src: &DataType) -> bool {
@@ -140,13 +201,34 @@ impl CastExpr {
/// Check if the cast is a widening cast (e.g. from `Int8` to `Int16`).
pub fn is_bigger_cast(&self, src: &DataType) -> bool {
- Self::check_bigger_cast(&self.cast_type, src)
+ Self::check_bigger_cast(self.cast_type(), src)
+ }
+}
+
+pub(crate) fn is_order_preserving_cast_family(
+ source_type: &DataType,
+ target_type: &DataType,
+) -> bool {
+ (source_type.is_numeric() || *source_type == Boolean) &&
target_type.is_numeric()
+ || source_type.is_temporal() && target_type.is_temporal()
+ || source_type.eq(target_type)
+}
+
+pub(crate) fn cast_expr_properties(
+ child: &ExprProperties,
+ target_type: &DataType,
+) -> Result<ExprProperties> {
+ let unbounded = Interval::make_unbounded(target_type)?;
+ if is_order_preserving_cast_family(&child.range.data_type(), target_type) {
+ Ok(child.clone().with_range(unbounded))
+ } else {
+ Ok(ExprProperties::new_unknown().with_range(unbounded))
}
}
impl fmt::Display for CastExpr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "CAST({} AS {})", self.expr, self.cast_type)
+ write!(f, "CAST({} AS {})", self.expr, self.cast_type())
}
}
@@ -157,26 +239,27 @@ impl PhysicalExpr for CastExpr {
}
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
- Ok(self.cast_type.clone())
+ Ok(self.cast_type().clone())
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
- self.expr.nullable(input_schema)
+ // A cast is nullable if **either** the child is nullable or the
+ // target field allows nulls. This conservative rule prevents
+ // optimizers from assuming a non-null result when a null input could
+ // still propagate. `return_field()` continues to expose the exact
+ // target metadata separately.
+ let child_nullable = self.expr.nullable(input_schema)?;
+ let target_nullable =
self.resolved_target_field(input_schema)?.is_nullable();
+ Ok(child_nullable || target_nullable)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let value = self.expr.evaluate(batch)?;
- value.cast_to(&self.cast_type, Some(&self.cast_options))
+ value.cast_to(self.cast_type(), Some(&self.cast_options))
}
fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
- Ok(self
- .expr
- .return_field(input_schema)?
- .as_ref()
- .clone()
- .with_data_type(self.cast_type.clone())
- .into())
+ self.resolved_target_field(input_schema)
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
@@ -187,16 +270,16 @@ impl PhysicalExpr for CastExpr {
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
- Ok(Arc::new(CastExpr::new(
+ Ok(Arc::new(CastExpr::new_with_target_field(
Arc::clone(&children[0]),
- self.cast_type.clone(),
+ Arc::clone(&self.target_field),
Some(self.cast_options.clone()),
)))
}
fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
// Cast current node's interval to the right type:
- children[0].cast_to(&self.cast_type, &self.cast_options)
+ children[0].cast_to(self.cast_type(), &self.cast_options)
}
fn propagate_constraints(
@@ -215,25 +298,13 @@ impl PhysicalExpr for CastExpr {
/// A [`CastExpr`] preserves the ordering of its child if the cast is done
/// under the same datatype family.
fn get_properties(&self, children: &[ExprProperties]) ->
Result<ExprProperties> {
- let source_datatype = children[0].range.data_type();
- let target_type = &self.cast_type;
-
- let unbounded = Interval::make_unbounded(target_type)?;
- if (source_datatype.is_numeric() || source_datatype == Boolean)
- && target_type.is_numeric()
- || source_datatype.is_temporal() && target_type.is_temporal()
- || source_datatype.eq(target_type)
- {
- Ok(children[0].clone().with_range(unbounded))
- } else {
- Ok(ExprProperties::new_unknown().with_range(unbounded))
- }
+ cast_expr_properties(&children[0], self.cast_type())
}
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CAST(")?;
self.expr.fmt_sql(f)?;
- write!(f, " AS {:?}", self.cast_type)?;
+ write!(f, " AS {:?}", self.cast_type())?;
write!(f, ")")
}
@@ -252,14 +323,18 @@ pub fn cast_with_options(
let expr_type = expr.data_type(input_schema)?;
if expr_type == cast_type {
Ok(Arc::clone(&expr))
+ } else if matches!((&expr_type, &cast_type), (Struct(_), Struct(_))) {
+ if can_cast_struct_types(&expr_type, &cast_type) {
+ // Allow struct-to-struct casts that pass name-based compatibility
validation.
+ // This validation is applied at planning time (now) to fail fast,
rather than
+ // deferring errors to execution time. The name-based casting
logic will be
+ // executed at runtime via ColumnarValue::cast_to.
+ Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
+ } else {
+ not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
+ }
} else if can_cast_types(&expr_type, &cast_type) {
Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
- } else if can_cast_struct_types(&expr_type, &cast_type) {
- // Allow struct-to-struct casts that pass name-based compatibility
validation.
- // This validation is applied at planning time (now) to fail fast,
rather than
- // deferring errors to execution time. The name-based casting logic
will be
- // executed at runtime via ColumnarValue::cast_to.
- Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
} else {
not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
}
@@ -293,6 +368,7 @@ mod tests {
};
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use insta::assert_snapshot;
+ use std::collections::HashMap;
// runs an end-to-end test of physical type cast
// 1. construct a record batch with a column "a" of type A
@@ -800,6 +876,106 @@ mod tests {
Ok(())
}
+ #[test]
+ fn field_aware_cast_return_field_preserves_target_metadata() -> Result<()>
{
+ let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+ let expr = CastExpr::new_with_target_field(
+ col("a", &schema)?,
+ Arc::new(Field::new("cast_target", Int64, true).with_metadata(
+ HashMap::from([("target_meta".to_string(), "1".to_string())]),
+ )),
+ None,
+ );
+
+ let field = expr.return_field(&schema)?;
+
+ assert_eq!(field.name(), "cast_target");
+ assert_eq!(field.data_type(), &Int64);
+ assert!(field.is_nullable());
+ assert_eq!(
+ field.metadata().get("target_meta").map(String::as_str),
+ Some("1")
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn field_aware_cast_nullable_prefers_child_nullability() -> Result<()> {
+ // When the child expression is nullable the cast must be treated as
+ // nullable even if the explicitly supplied target field is marked
+ // non-nullable. return_field() still reflects the target metadata.
+ let schema = Schema::new(vec![Field::new("a", Int32, true)]);
+ let expr = CastExpr::new_with_target_field(
+ col("a", &schema)?,
+ Arc::new(Field::new("cast_target", Int64, false)),
+ None,
+ );
+
+ assert!(expr.nullable(&schema)?);
+ assert!(!expr.return_field(&schema)?.is_nullable());
+
+ Ok(())
+ }
+
+ #[test]
+ fn type_only_cast_preserves_legacy_field_name_and_nullability() ->
Result<()> {
+ let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+ let expr = CastExpr::new(col("a", &schema)?, Int64, None);
+
+ let field = expr.return_field(&schema)?;
+
+ assert_eq!(field.name(), "a");
+ assert_eq!(field.data_type(), &Int64);
+ assert!(!field.is_nullable());
+ assert!(!expr.nullable(&schema)?);
+
+ Ok(())
+ }
+
+ #[test]
+ fn field_aware_cast_nullable_child_nonnullable_targets_nullable() ->
Result<()> {
+ // child is non-nullable but the target field is marked nullable; the
+ // nullable() result should still be true because the field allows
nulls.
+ let schema = Schema::new(vec![Field::new("a", Int32, false)]);
+ let expr = CastExpr::new_with_target_field(
+ col("a", &schema)?,
+ Arc::new(Field::new("cast_target", Int64, true)),
+ None,
+ );
+
+ assert!(expr.nullable(&schema)?);
+ assert!(expr.return_field(&schema)?.is_nullable());
+
+ Ok(())
+ }
+
+ #[test]
+ fn struct_cast_validation_uses_nested_target_fields() -> Result<()> {
+ let source_type = Struct(Fields::from(vec![
+ Arc::new(Field::new("x", Int32, true)),
+ Arc::new(Field::new("y", Utf8, true)),
+ ]));
+ let schema = Schema::new(vec![Field::new("a", source_type.clone(),
true)]);
+
+ let valid_target = Struct(Fields::from(vec![
+ Arc::new(Field::new("y", Utf8, true)),
+ Arc::new(Field::new("x", Int64, true)),
+ ]));
+ cast_with_options(col("a", &schema)?, &schema, valid_target, None)?;
+
+ let invalid_target = Struct(Fields::from(vec![
+ Arc::new(Field::new("y", Utf8, true)),
+ Arc::new(Field::new("missing", Int64, false)),
+ ]));
+ let err = cast_with_options(col("a", &schema)?, &schema,
invalid_target, None)
+ .expect_err("missing required struct field should fail");
+
+ assert!(err.to_string().contains("Unsupported CAST"));
+
+ Ok(())
+ }
+
#[test]
#[ignore] // TODO: https://github.com/apache/datafusion/issues/5396
fn test_cast_decimal() -> Result<()> {
diff --git a/datafusion/physical-expr/src/expressions/cast_column.rs
b/datafusion/physical-expr/src/expressions/cast_column.rs
index f6c4d080fc..a99953abdb 100644
--- a/datafusion/physical-expr/src/expressions/cast_column.rs
+++ b/datafusion/physical-expr/src/expressions/cast_column.rs
@@ -17,6 +17,7 @@
//! Physical expression for struct-aware casting of columns.
+use super::cast::cast_expr_properties;
use crate::physical_expr::PhysicalExpr;
use arrow::{
compute::CastOptions,
@@ -27,7 +28,6 @@ use datafusion_common::{
Result, ScalarValue, format::DEFAULT_CAST_OPTIONS,
nested_struct::cast_column,
};
use datafusion_expr_common::columnar_value::ColumnarValue;
-use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::sort_properties::ExprProperties;
use std::{
any::Any,
@@ -182,19 +182,7 @@ impl PhysicalExpr for CastColumnExpr {
/// A [`CastColumnExpr`] preserves the ordering of its child if the cast
is done
/// under the same datatype family.
fn get_properties(&self, children: &[ExprProperties]) ->
Result<ExprProperties> {
- let source_datatype = children[0].range.data_type();
- let target_type = self.target_field.data_type();
-
- let unbounded = Interval::make_unbounded(target_type)?;
- if (source_datatype.is_numeric() || source_datatype ==
DataType::Boolean)
- && target_type.is_numeric()
- || source_datatype.is_temporal() && target_type.is_temporal()
- || source_datatype.eq(target_type)
- {
- Ok(children[0].clone().with_range(unbounded))
- } else {
- Ok(ExprProperties::new_unknown().with_range(unbounded))
- }
+ cast_expr_properties(&children[0], self.target_field.data_type())
}
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]