This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ee52f85fd Add `CastColumnExpr` for struct-aware column casting 
(#17773)
3ee52f85fd is described below

commit 3ee52f85fdb94544da04f6a67f0c7fc03c714843
Author: kosiew <[email protected]>
AuthorDate: Thu Oct 2 13:30:58 2025 +0800

    Add `CastColumnExpr` for struct-aware column casting (#17773)
    
    * Add CastColumnExpr for struct-aware column casting
    
    * Refactor imports in CastColumnExpr for improved organization and 
readability
    
    * fix: clarify reference to `arrow::datatypes::Field` in CastColumnExpr 
documentation
---
 .../physical-expr/src/expressions/cast_column.rs   | 409 +++++++++++++++++++++
 datafusion/physical-expr/src/expressions/mod.rs    |   2 +
 2 files changed, 411 insertions(+)

diff --git a/datafusion/physical-expr/src/expressions/cast_column.rs 
b/datafusion/physical-expr/src/expressions/cast_column.rs
new file mode 100644
index 0000000000..80d71c3def
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/cast_column.rs
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression for struct-aware casting of columns.
+
+use crate::physical_expr::PhysicalExpr;
+use arrow::{
+    compute::CastOptions,
+    datatypes::{DataType, FieldRef, Schema},
+    record_batch::RecordBatch,
+};
+use datafusion_common::{
+    format::DEFAULT_CAST_OPTIONS, nested_struct::cast_column, Result, 
ScalarValue,
+};
+use datafusion_expr_common::columnar_value::ColumnarValue;
+use std::{
+    any::Any,
+    fmt::{self, Display},
+    hash::Hash,
+    sync::Arc,
+};
+/// A physical expression that applies [`cast_column`] to its input.
+///
+/// [`CastColumnExpr`] extends the regular [`CastExpr`](super::CastExpr) by
+/// retaining schema metadata for both the input and output fields. This allows
+/// the evaluator to perform struct-aware casts that honour nested field
+/// ordering, preserve nullability, and fill missing fields with null values.
+///
+/// This expression is intended for schema rewriting scenarios where the
+/// planner already resolved the input column but needs to adapt its physical
+/// representation to a new [`arrow::datatypes::Field`]. It mirrors the 
behaviour of the
+/// [`datafusion_common::nested_struct::cast_column`] helper while integrating
+/// with the `PhysicalExpr` trait so it can participate in the execution plan
+/// like any other column expression.
+#[derive(Debug, Clone, Eq)]
+pub struct CastColumnExpr {
+    /// The physical expression producing the value to cast.
+    expr: Arc<dyn PhysicalExpr>,
+    /// The logical field of the input column.
+    input_field: FieldRef,
+    /// The field metadata describing the desired output column.
+    target_field: FieldRef,
+    /// Options forwarded to [`cast_column`].
+    cast_options: CastOptions<'static>,
+}
+
+// Manually derive `PartialEq`/`Hash` as `Arc<dyn PhysicalExpr>` does not
+// implement these traits by default for the trait object.
+impl PartialEq for CastColumnExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.expr.eq(&other.expr)
+            && self.input_field.eq(&other.input_field)
+            && self.target_field.eq(&other.target_field)
+            && self.cast_options.eq(&other.cast_options)
+    }
+}
+
+impl Hash for CastColumnExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.expr.hash(state);
+        self.input_field.hash(state);
+        self.target_field.hash(state);
+        self.cast_options.hash(state);
+    }
+}
+
+impl CastColumnExpr {
+    /// Create a new [`CastColumnExpr`].
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        input_field: FieldRef,
+        target_field: FieldRef,
+        cast_options: Option<CastOptions<'static>>,
+    ) -> Self {
+        Self {
+            expr,
+            input_field,
+            target_field,
+            cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS),
+        }
+    }
+
+    /// The expression that produces the value to be cast.
+    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
+        &self.expr
+    }
+
+    /// Field metadata describing the resolved input column.
+    pub fn input_field(&self) -> &FieldRef {
+        &self.input_field
+    }
+
+    /// Field metadata describing the output column after casting.
+    pub fn target_field(&self) -> &FieldRef {
+        &self.target_field
+    }
+}
+
+impl Display for CastColumnExpr {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(
+            f,
+            "CAST_COLUMN({} AS {:?})",
+            self.expr,
+            self.target_field.data_type()
+        )
+    }
+}
+
+impl PhysicalExpr for CastColumnExpr {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(self.target_field.data_type().clone())
+    }
+
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        Ok(self.target_field.is_nullable())
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = self.expr.evaluate(batch)?;
+        match value {
+            ColumnarValue::Array(array) => {
+                let casted =
+                    cast_column(&array, self.target_field.as_ref(), 
&self.cast_options)?;
+                Ok(ColumnarValue::Array(casted))
+            }
+            ColumnarValue::Scalar(scalar) => {
+                let as_array = scalar.to_array_of_size(1)?;
+                let casted = cast_column(
+                    &as_array,
+                    self.target_field.as_ref(),
+                    &self.cast_options,
+                )?;
+                let result = ScalarValue::try_from_array(casted.as_ref(), 0)?;
+                Ok(ColumnarValue::Scalar(result))
+            }
+        }
+    }
+
+    fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
+        Ok(Arc::clone(&self.target_field))
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        vec![&self.expr]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        assert_eq!(children.len(), 1);
+        let child = children.pop().expect("CastColumnExpr child");
+        Ok(Arc::new(Self::new(
+            child,
+            Arc::clone(&self.input_field),
+            Arc::clone(&self.target_field),
+            Some(self.cast_options.clone()),
+        )))
+    }
+
+    fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        Display::fmt(self, f)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use crate::expressions::{Column, Literal};
+    use arrow::{
+        array::{Array, ArrayRef, BooleanArray, Int32Array, StringArray, 
StructArray},
+        datatypes::{DataType, Field, Fields, SchemaRef},
+    };
+    use datafusion_common::{
+        cast::{as_int64_array, as_string_array, as_struct_array, 
as_uint8_array},
+        Result as DFResult, ScalarValue,
+    };
+
+    fn make_schema(field: &Field) -> SchemaRef {
+        Arc::new(Schema::new(vec![field.clone()]))
+    }
+
+    fn make_struct_array(fields: Fields, arrays: Vec<ArrayRef>) -> StructArray 
{
+        StructArray::new(fields, arrays, None)
+    }
+
+    #[test]
+    fn cast_primitive_array() -> DFResult<()> {
+        let input_field = Field::new("a", DataType::Int32, true);
+        let target_field = Field::new("a", DataType::Int64, true);
+        let schema = make_schema(&input_field);
+
+        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
+        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![values])?;
+
+        let column = Arc::new(Column::new_with_schema("a", schema.as_ref())?);
+        let expr = CastColumnExpr::new(
+            column,
+            Arc::new(input_field.clone()),
+            Arc::new(target_field.clone()),
+            None,
+        );
+
+        let result = expr.evaluate(&batch)?;
+        let ColumnarValue::Array(array) = result else {
+            panic!("expected array");
+        };
+        let casted = as_int64_array(array.as_ref())?;
+        assert_eq!(casted.value(0), 1);
+        assert!(casted.is_null(1));
+        assert_eq!(casted.value(2), 3);
+        Ok(())
+    }
+
+    #[test]
+    fn cast_struct_array_missing_child() -> DFResult<()> {
+        let source_a = Field::new("a", DataType::Int32, true);
+        let source_b = Field::new("b", DataType::Utf8, true);
+        let input_field = Field::new(
+            "s",
+            DataType::Struct(
+                vec![Arc::new(source_a.clone()), 
Arc::new(source_b.clone())].into(),
+            ),
+            true,
+        );
+        let target_a = Field::new("a", DataType::Int64, true);
+        let target_c = Field::new("c", DataType::Utf8, true);
+        let target_field = Field::new(
+            "s",
+            DataType::Struct(
+                vec![Arc::new(target_a.clone()), 
Arc::new(target_c.clone())].into(),
+            ),
+            true,
+        );
+
+        let schema = make_schema(&input_field);
+        let struct_array = make_struct_array(
+            vec![Arc::new(source_a.clone()), 
Arc::new(source_b.clone())].into(),
+            vec![
+                Arc::new(Int32Array::from(vec![Some(1), None])) as ArrayRef,
+                Arc::new(StringArray::from(vec![Some("alpha"), Some("beta")]))
+                    as ArrayRef,
+            ],
+        );
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(struct_array) as Arc<_>],
+        )?;
+
+        let column = Arc::new(Column::new_with_schema("s", schema.as_ref())?);
+        let expr = CastColumnExpr::new(
+            column,
+            Arc::new(input_field.clone()),
+            Arc::new(target_field.clone()),
+            None,
+        );
+
+        let result = expr.evaluate(&batch)?;
+        let ColumnarValue::Array(array) = result else {
+            panic!("expected array");
+        };
+        let struct_array = as_struct_array(array.as_ref())?;
+        let cast_a = 
as_int64_array(struct_array.column_by_name("a").unwrap().as_ref())?;
+        assert_eq!(cast_a.value(0), 1);
+        assert!(cast_a.is_null(1));
+
+        let cast_c = 
as_string_array(struct_array.column_by_name("c").unwrap().as_ref())?;
+        assert!(cast_c.is_null(0));
+        assert!(cast_c.is_null(1));
+        Ok(())
+    }
+
+    #[test]
+    fn cast_nested_struct_array() -> DFResult<()> {
+        let inner_source = Field::new(
+            "inner",
+            DataType::Struct(
+                vec![Arc::new(Field::new("x", DataType::Int32, true))].into(),
+            ),
+            true,
+        );
+        let outer_field = Field::new(
+            "root",
+            DataType::Struct(vec![Arc::new(inner_source.clone())].into()),
+            true,
+        );
+
+        let inner_target = Field::new(
+            "inner",
+            DataType::Struct(
+                vec![
+                    Arc::new(Field::new("x", DataType::Int64, true)),
+                    Arc::new(Field::new("y", DataType::Boolean, true)),
+                ]
+                .into(),
+            ),
+            true,
+        );
+        let target_field = Field::new(
+            "root",
+            DataType::Struct(vec![Arc::new(inner_target.clone())].into()),
+            true,
+        );
+
+        let schema = make_schema(&outer_field);
+
+        let inner_struct = make_struct_array(
+            vec![Arc::new(Field::new("x", DataType::Int32, true))].into(),
+            vec![Arc::new(Int32Array::from(vec![Some(7), None])) as ArrayRef],
+        );
+        let outer_struct = make_struct_array(
+            vec![Arc::new(inner_source.clone())].into(),
+            vec![Arc::new(inner_struct) as ArrayRef],
+        );
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(outer_struct) as ArrayRef],
+        )?;
+
+        let column = Arc::new(Column::new_with_schema("root", 
schema.as_ref())?);
+        let expr = CastColumnExpr::new(
+            column,
+            Arc::new(outer_field.clone()),
+            Arc::new(target_field.clone()),
+            None,
+        );
+
+        let result = expr.evaluate(&batch)?;
+        let ColumnarValue::Array(array) = result else {
+            panic!("expected array");
+        };
+        let struct_array = as_struct_array(array.as_ref())?;
+        let inner =
+            
as_struct_array(struct_array.column_by_name("inner").unwrap().as_ref())?;
+        let x = as_int64_array(inner.column_by_name("x").unwrap().as_ref())?;
+        assert_eq!(x.value(0), 7);
+        assert!(x.is_null(1));
+        let y = inner.column_by_name("y").unwrap();
+        let y = y
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .expect("boolean array");
+        assert!(y.is_null(0));
+        assert!(y.is_null(1));
+        Ok(())
+    }
+
+    #[test]
+    fn cast_struct_scalar() -> DFResult<()> {
+        let source_field = Field::new("a", DataType::Int32, true);
+        let input_field = Field::new(
+            "s",
+            DataType::Struct(vec![Arc::new(source_field.clone())].into()),
+            true,
+        );
+        let target_field = Field::new(
+            "s",
+            DataType::Struct(
+                vec![Arc::new(Field::new("a", DataType::UInt8, true))].into(),
+            ),
+            true,
+        );
+
+        let schema = make_schema(&input_field);
+        let scalar_struct = StructArray::new(
+            vec![Arc::new(source_field.clone())].into(),
+            vec![Arc::new(Int32Array::from(vec![Some(9)])) as ArrayRef],
+            None,
+        );
+        let literal =
+            
Arc::new(Literal::new(ScalarValue::Struct(Arc::new(scalar_struct))));
+        let expr = CastColumnExpr::new(
+            literal,
+            Arc::new(input_field.clone()),
+            Arc::new(target_field.clone()),
+            None,
+        );
+
+        let batch = RecordBatch::new_empty(Arc::clone(&schema));
+        let result = expr.evaluate(&batch)?;
+        let ColumnarValue::Scalar(ScalarValue::Struct(array)) = result else {
+            panic!("expected struct scalar");
+        };
+        let casted = array.column_by_name("a").unwrap();
+        let casted = as_uint8_array(casted.as_ref())?;
+        assert_eq!(casted.value(0), 9);
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 8f46133ed0..59d675753d 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -21,6 +21,7 @@
 mod binary;
 mod case;
 mod cast;
+mod cast_column;
 mod column;
 mod dynamic_filters;
 mod in_list;
@@ -41,6 +42,7 @@ pub use crate::PhysicalSortExpr;
 pub use binary::{binary, similar_to, BinaryExpr};
 pub use case::{case, CaseExpr};
 pub use cast::{cast, CastExpr};
+pub use cast_column::CastColumnExpr;
 pub use column::{col, with_new_schema, Column};
 pub use datafusion_expr::utils::format_state_name;
 pub use dynamic_filters::DynamicFilterPhysicalExpr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to