This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b4d75e37 fix: cast_struct_to_struct aligns to Spark behavior (#1879)
0b4d75e37 is described below

commit 0b4d75e377339d9a36dca564ae78d63073bc40d9
Author: Matt Butrovich <mbutrov...@users.noreply.github.com>
AuthorDate: Fri Jun 13 09:46:16 2025 -0400

    fix: cast_struct_to_struct aligns to Spark behavior (#1879)
---
 native/core/src/parquet/parquet_support.rs         | 36 +++++++++++++---------
 native/spark-expr/src/conversion_funcs/cast.rs     | 34 ++++++++++----------
 .../scala/org/apache/comet/CometCastSuite.scala    | 18 +++++++++++
 3 files changed, 56 insertions(+), 32 deletions(-)

diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index 3430592c0..00961d33c 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -44,6 +44,10 @@ use url::Url;
 
 use super::objectstore;
 
+// This file originates from cast.rs. While developing native scan support and 
implementing
+// SparkSchemaAdapter we observed that Spark's type conversion logic on 
Parquet reads does not
+// always align to the CAST expression's logic, so it was duplicated here to 
adapt its behavior.
+
 static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");
 
 static PARQUET_OPTIONS: CastOptions = CastOptions {
@@ -53,7 +57,7 @@ static PARQUET_OPTIONS: CastOptions = CastOptions {
         .with_timestamp_format(TIMESTAMP_FORMAT),
 };
 
-/// Spark cast options
+/// Spark Parquet type conversion options
 #[derive(Debug, Clone, Hash, PartialEq, Eq)]
 pub struct SparkParquetOptions {
     /// Spark evaluation mode
@@ -109,7 +113,7 @@ pub fn spark_parquet_convert(
     parquet_options: &SparkParquetOptions,
 ) -> DataFusionResult<ColumnarValue> {
     match arg {
-        ColumnarValue::Array(array) => Ok(ColumnarValue::Array(cast_array(
+        ColumnarValue::Array(array) => 
Ok(ColumnarValue::Array(parquet_convert_array(
             array,
             data_type,
             parquet_options,
@@ -119,14 +123,16 @@ pub fn spark_parquet_convert(
             // some cases e.g., scalar subquery, Spark will not fold it, so we 
need to handle it
             // here.
             let array = scalar.to_array()?;
-            let scalar =
-                ScalarValue::try_from_array(&cast_array(array, data_type, 
parquet_options)?, 0)?;
+            let scalar = ScalarValue::try_from_array(
+                &parquet_convert_array(array, data_type, parquet_options)?,
+                0,
+            )?;
             Ok(ColumnarValue::Scalar(scalar))
         }
     }
 }
 
-fn cast_array(
+fn parquet_convert_array(
     array: ArrayRef,
     to_type: &DataType,
     parquet_options: &SparkParquetOptions,
@@ -146,7 +152,7 @@ fn cast_array(
 
             let casted_dictionary = DictionaryArray::<Int32Type>::new(
                 dict_array.keys().clone(),
-                cast_array(Arc::clone(dict_array.values()), to_type, 
parquet_options)?,
+                parquet_convert_array(Arc::clone(dict_array.values()), 
to_type, parquet_options)?,
             );
 
             let casted_result = match to_type {
@@ -162,7 +168,7 @@ fn cast_array(
     // Try Comet specific handlers first, then arrow-rs cast if supported,
     // return uncasted data otherwise
     match (from_type, to_type) {
-        (Struct(_), Struct(_)) => Ok(cast_struct_to_struct(
+        (Struct(_), Struct(_)) => Ok(parquet_convert_struct_to_struct(
             array.as_struct(),
             from_type,
             to_type,
@@ -170,7 +176,7 @@ fn cast_array(
         )?),
         (List(_), List(to_inner_type)) => {
             let list_arr: &ListArray = array.as_list();
-            let cast_field = cast_array(
+            let cast_field = parquet_convert_array(
                 Arc::clone(list_arr.values()),
                 to_inner_type.data_type(),
                 parquet_options,
@@ -192,7 +198,7 @@ fn cast_array(
             ))
         }
         (Map(_, ordered_from), Map(_, ordered_to)) if ordered_from == 
ordered_to =>
-            cast_map_values(array.as_map(), to_type, parquet_options, 
*ordered_to)
+            parquet_convert_map_to_map(array.as_map(), to_type, 
parquet_options, *ordered_to)
             ,
         // If Arrow cast supports the cast, delegate the cast to Arrow
         _ if can_cast_types(from_type, to_type) => {
@@ -204,7 +210,7 @@ fn cast_array(
 
 /// Cast between struct types based on logic in
 /// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`.
-fn cast_struct_to_struct(
+fn parquet_convert_struct_to_struct(
     array: &StructArray,
     from_type: &DataType,
     to_type: &DataType,
@@ -236,7 +242,7 @@ fn cast_struct_to_struct(
                 };
                 if field_name_to_index_map.contains_key(&key) {
                     let from_index = field_name_to_index_map[&key];
-                    let cast_field = cast_array(
+                    let cast_field = parquet_convert_array(
                         Arc::clone(array.column(from_index)),
                         to_fields[i].data_type(),
                         parquet_options,
@@ -267,8 +273,8 @@ fn cast_struct_to_struct(
 }
 
 /// Cast a map type to another map type. The same as arrow-cast except we 
recursively call our own
-/// cast_array
-fn cast_map_values(
+/// parquet_convert_array
+fn parquet_convert_map_to_map(
     from: &MapArray,
     to_data_type: &DataType,
     parquet_options: &SparkParquetOptions,
@@ -283,12 +289,12 @@ fn cast_map_values(
                 "map is missing value field".to_string(),
             ))?;
 
-            let key_array = cast_array(
+            let key_array = parquet_convert_array(
                 Arc::clone(from.keys()),
                 key_field.data_type(),
                 parquet_options,
             )?;
-            let value_array = cast_array(
+            let value_array = parquet_convert_array(
                 Arc::clone(from.values()),
                 value_field.data_type(),
                 parquet_options,
diff --git a/native/spark-expr/src/conversion_funcs/cast.rs 
b/native/spark-expr/src/conversion_funcs/cast.rs
index ba8433a36..b8402a8e6 100644
--- a/native/spark-expr/src/conversion_funcs/cast.rs
+++ b/native/spark-expr/src/conversion_funcs/cast.rs
@@ -49,7 +49,6 @@ use num::{
     ToPrimitive,
 };
 use regex::Regex;
-use std::collections::HashMap;
 use std::str::FromStr;
 use std::{
     any::Any,
@@ -1081,22 +1080,23 @@ fn cast_struct_to_struct(
 ) -> DataFusionResult<ArrayRef> {
     match (from_type, to_type) {
         (DataType::Struct(from_fields), DataType::Struct(to_fields)) => {
-            // TODO some of this logic may be specific to converting Parquet 
to Spark
-            let mut field_name_to_index_map = HashMap::new();
-            for (i, field) in from_fields.iter().enumerate() {
-                field_name_to_index_map.insert(field.name(), i);
-            }
-            assert_eq!(field_name_to_index_map.len(), from_fields.len());
-            let mut cast_fields: Vec<ArrayRef> = 
Vec::with_capacity(to_fields.len());
-            for i in 0..to_fields.len() {
-                let from_index = field_name_to_index_map[to_fields[i].name()];
-                let cast_field = cast_array(
-                    Arc::clone(array.column(from_index)),
-                    to_fields[i].data_type(),
-                    cast_options,
-                )?;
-                cast_fields.push(cast_field);
-            }
+            let cast_fields: Vec<ArrayRef> = from_fields
+                .iter()
+                .enumerate()
+                .zip(to_fields.iter())
+                .map(|((idx, _from), to)| {
+                    let from_field = Arc::clone(array.column(idx));
+                    let array_length = from_field.len();
+                    let cast_result = spark_cast(
+                        ColumnarValue::from(from_field),
+                        to.data_type(),
+                        cast_options,
+                    )
+                    .unwrap();
+                    cast_result.to_array(array_length).unwrap()
+                })
+                .collect();
+
             Ok(Arc::new(StructArray::new(
                 to_fields.clone(),
                 cast_fields,
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 11e44251b..cb12dde15 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -972,6 +972,24 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  test("cast StructType to StructType with different names") {
+    withTable("tab1") {
+      sql("""
+           |CREATE TABLE tab1 (s struct<a: string, b: string>)
+           |USING parquet
+         """.stripMargin)
+      sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')")
+      if (usingDataSourceExec) {
+        checkSparkAnswerAndOperator(
+          "SELECT CAST(s AS struct<field1:string, field2:string>) AS 
new_struct FROM tab1")
+      } else {
+        // Should just fall back to Spark since non-DataSourceExec scan does 
not support nested types.
+        checkSparkAnswer(
+          "SELECT CAST(s AS struct<field1:string, field2:string>) AS 
new_struct FROM tab1")
+      }
+    }
+  }
+
   test("cast between decimals with different precision and scale") {
     val rowData = Seq(
       Row(BigDecimal("12345.6789")),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to