This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 0b4d75e37 fix: cast_struct_to_struct aligns to Spark behavior (#1879) 0b4d75e37 is described below commit 0b4d75e377339d9a36dca564ae78d63073bc40d9 Author: Matt Butrovich <mbutrov...@users.noreply.github.com> AuthorDate: Fri Jun 13 09:46:16 2025 -0400 fix: cast_struct_to_struct aligns to Spark behavior (#1879) --- native/core/src/parquet/parquet_support.rs | 36 +++++++++++++--------- native/spark-expr/src/conversion_funcs/cast.rs | 34 ++++++++++---------- .../scala/org/apache/comet/CometCastSuite.scala | 18 +++++++++++ 3 files changed, 56 insertions(+), 32 deletions(-) diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 3430592c0..00961d33c 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -44,6 +44,10 @@ use url::Url; use super::objectstore; +// This file originates from cast.rs. While developing native scan support and implementing +// SparkSchemaAdapter we observed that Spark's type conversion logic on Parquet reads does not +// always align to the CAST expression's logic, so it was duplicated here to adapt its behavior. + static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f"); static PARQUET_OPTIONS: CastOptions = CastOptions { @@ -53,7 +57,7 @@ static PARQUET_OPTIONS: CastOptions = CastOptions { .with_timestamp_format(TIMESTAMP_FORMAT), }; -/// Spark cast options +/// Spark Parquet type conversion options #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct SparkParquetOptions { /// Spark evaluation mode @@ -109,7 +113,7 @@ pub fn spark_parquet_convert( parquet_options: &SparkParquetOptions, ) -> DataFusionResult<ColumnarValue> { match arg { - ColumnarValue::Array(array) => Ok(ColumnarValue::Array(cast_array( + ColumnarValue::Array(array) => Ok(ColumnarValue::Array(parquet_convert_array( array, data_type, parquet_options, @@ -119,14 +123,16 @@ pub fn spark_parquet_convert( // some cases e.g., scalar subquery, Spark will not fold it, so we need to handle it // here. let array = scalar.to_array()?; - let scalar = - ScalarValue::try_from_array(&cast_array(array, data_type, parquet_options)?, 0)?; + let scalar = ScalarValue::try_from_array( + &parquet_convert_array(array, data_type, parquet_options)?, + 0, + )?; Ok(ColumnarValue::Scalar(scalar)) } } } -fn cast_array( +fn parquet_convert_array( array: ArrayRef, to_type: &DataType, parquet_options: &SparkParquetOptions, @@ -146,7 +152,7 @@ fn cast_array( let casted_dictionary = DictionaryArray::<Int32Type>::new( dict_array.keys().clone(), - cast_array(Arc::clone(dict_array.values()), to_type, parquet_options)?, + parquet_convert_array(Arc::clone(dict_array.values()), to_type, parquet_options)?, ); let casted_result = match to_type { @@ -162,7 +168,7 @@ fn cast_array( // Try Comet specific handlers first, then arrow-rs cast if supported, // return uncasted data otherwise match (from_type, to_type) { - (Struct(_), Struct(_)) => Ok(cast_struct_to_struct( + (Struct(_), Struct(_)) => Ok(parquet_convert_struct_to_struct( array.as_struct(), from_type, to_type, @@ -170,7 +176,7 @@ fn cast_array( )?), (List(_), List(to_inner_type)) => { let list_arr: &ListArray = array.as_list(); - let cast_field = cast_array( + let cast_field = parquet_convert_array( Arc::clone(list_arr.values()), to_inner_type.data_type(), parquet_options, @@ -192,7 +198,7 @@ fn cast_array( )) } (Map(_, ordered_from), Map(_, ordered_to)) if ordered_from == ordered_to => - cast_map_values(array.as_map(), to_type, parquet_options, *ordered_to) + parquet_convert_map_to_map(array.as_map(), to_type, parquet_options, *ordered_to) , // If Arrow cast supports the cast, delegate the cast to Arrow _ if can_cast_types(from_type, to_type) => { @@ -204,7 +210,7 @@ fn cast_array( /// Cast between struct types based on logic in /// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`. -fn cast_struct_to_struct( +fn parquet_convert_struct_to_struct( array: &StructArray, from_type: &DataType, to_type: &DataType, @@ -236,7 +242,7 @@ fn cast_struct_to_struct( }; if field_name_to_index_map.contains_key(&key) { let from_index = field_name_to_index_map[&key]; - let cast_field = cast_array( + let cast_field = parquet_convert_array( Arc::clone(array.column(from_index)), to_fields[i].data_type(), parquet_options, @@ -267,8 +273,8 @@ fn cast_struct_to_struct( } /// Cast a map type to another map type. The same as arrow-cast except we recursively call our own -/// cast_array -fn cast_map_values( +/// parquet_convert_array +fn parquet_convert_map_to_map( from: &MapArray, to_data_type: &DataType, parquet_options: &SparkParquetOptions, @@ -283,12 +289,12 @@ fn cast_map_values( "map is missing value field".to_string(), ))?; - let key_array = cast_array( + let key_array = parquet_convert_array( Arc::clone(from.keys()), key_field.data_type(), parquet_options, )?; - let value_array = cast_array( + let value_array = parquet_convert_array( Arc::clone(from.values()), value_field.data_type(), parquet_options, diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index ba8433a36..b8402a8e6 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -49,7 +49,6 @@ use num::{ ToPrimitive, }; use regex::Regex; -use std::collections::HashMap; use std::str::FromStr; use std::{ any::Any, @@ -1081,22 +1080,23 @@ fn cast_struct_to_struct( ) -> DataFusionResult<ArrayRef> { match (from_type, to_type) { (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { - // TODO some of this logic may be specific to converting Parquet to Spark - let mut field_name_to_index_map = HashMap::new(); - for (i, field) in from_fields.iter().enumerate() { - field_name_to_index_map.insert(field.name(), i); - } - assert_eq!(field_name_to_index_map.len(), from_fields.len()); - let mut cast_fields: Vec<ArrayRef> = Vec::with_capacity(to_fields.len()); - for i in 0..to_fields.len() { - let from_index = field_name_to_index_map[to_fields[i].name()]; - let cast_field = cast_array( - Arc::clone(array.column(from_index)), - to_fields[i].data_type(), - cast_options, - )?; - cast_fields.push(cast_field); - } + let cast_fields: Vec<ArrayRef> = from_fields + .iter() + .enumerate() + .zip(to_fields.iter()) + .map(|((idx, _from), to)| { + let from_field = Arc::clone(array.column(idx)); + let array_length = from_field.len(); + let cast_result = spark_cast( + ColumnarValue::from(from_field), + to.data_type(), + cast_options, + ) + .unwrap(); + cast_result.to_array(array_length).unwrap() + }) + .collect(); + Ok(Arc::new(StructArray::new( to_fields.clone(), cast_fields, diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 11e44251b..cb12dde15 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -972,6 +972,24 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("cast StructType to StructType with different names") { + withTable("tab1") { + sql(""" + |CREATE TABLE tab1 (s struct<a: string, b: string>) + |USING parquet + """.stripMargin) + sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')") + if (usingDataSourceExec) { + checkSparkAnswerAndOperator( + "SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct FROM tab1") + } else { + // Should just fall back to Spark since non-DataSourceExec scan does not support nested types. + checkSparkAnswer( + "SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct FROM tab1") + } + } + } + test("cast between decimals with different precision and scale") { val rowData = Seq( Row(BigDecimal("12345.6789")), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org