This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new eb4d4eb5d fix: support `map_values` (#1835)
eb4d4eb5d is described below

commit eb4d4eb5d784b533fe3d5fea5eaa640760886fd1
Author: Oleks V <comph...@users.noreply.github.com>
AuthorDate: Tue Jun 3 16:13:24 2025 -0700

    fix: support `map_values` (#1835)
    
    
    * fix: support `map_values`
---
 native/core/src/execution/planner.rs               |  6 --
 native/core/src/parquet/parquet_support.rs         | 77 +++++++++++-----------
 .../src/array_funcs/get_array_struct_fields.rs     | 44 ++++++++++---
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  8 +--
 .../apache/comet/exec/CometNativeReaderSuite.scala | 29 ++++----
 5 files changed, 90 insertions(+), 74 deletions(-)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 94e79e4e0..8ae8c9c02 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2954,14 +2954,8 @@ mod tests {
             })),
         };
 
-        let a = Int32Array::from(vec![0, 3]);
-        let b = Int32Array::from(vec![1, 4]);
-        let c = Int32Array::from(vec![2, 5]);
-        let input_batch = InputBatch::Batch(vec![Arc::new(a), Arc::new(b), 
Arc::new(c)], 2);
-
         let (mut scans, datafusion_plan) =
             planner.create_plan(&projection, &mut vec![], 1).unwrap();
-        scans[0].set_input_batch(input_batch);
 
         let mut stream = datafusion_plan.native_plan.execute(0, 
task_ctx).unwrap();
 
diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index 4e6f8a172..c77d60135 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -255,53 +255,52 @@ fn cast_struct_to_struct(
 
 /// Cast a map type to another map type. The same as arrow-cast except we 
recursively call our own
 /// cast_array
-pub(crate) fn cast_map_values(
+fn cast_map_values(
     from: &MapArray,
     to_data_type: &DataType,
     parquet_options: &SparkParquetOptions,
     to_ordered: bool,
 ) -> Result<ArrayRef, DataFusionError> {
-    let entries_field = if let DataType::Map(entries_field, _) = to_data_type {
-        entries_field
-    } else {
-        return Err(DataFusionError::Internal(
-            "Internal Error: to_data_type is not a map type.".to_string(),
-        ));
-    };
+    match to_data_type {
+        DataType::Map(entries_field, _) => {
+            let key_field = 
key_field(entries_field).ok_or(DataFusionError::Internal(
+                "map is missing key field".to_string(),
+            ))?;
+            let value_field = 
value_field(entries_field).ok_or(DataFusionError::Internal(
+                "map is missing value field".to_string(),
+            ))?;
+
+            let key_array = cast_array(
+                Arc::<dyn Array>::clone(from.keys()),
+                key_field.data_type(),
+                parquet_options,
+            )?;
+            let value_array = cast_array(
+                Arc::<dyn Array>::clone(from.values()),
+                value_field.data_type(),
+                parquet_options,
+            )?;
 
-    let key_field = key_field(entries_field).ok_or(DataFusionError::Internal(
-        "map is missing key field".to_string(),
-    ))?;
-    let value_field = 
value_field(entries_field).ok_or(DataFusionError::Internal(
-        "map is missing value field".to_string(),
-    ))?;
-
-    let key_array = cast_array(
-        Arc::<dyn Array>::clone(from.keys()),
-        key_field.data_type(),
-        parquet_options,
-    )?;
-    let value_array = cast_array(
-        Arc::<dyn Array>::clone(from.values()),
-        value_field.data_type(),
-        parquet_options,
-    )?;
-
-    Ok(Arc::new(MapArray::new(
-        Arc::<arrow::datatypes::Field>::clone(entries_field),
-        from.offsets().clone(),
-        StructArray::new(
-            Fields::from(vec![key_field, value_field]),
-            vec![key_array, value_array],
-            from.entries().nulls().cloned(),
-        ),
-        from.nulls().cloned(),
-        to_ordered,
-    )))
+            Ok(Arc::new(MapArray::new(
+                Arc::<arrow::datatypes::Field>::clone(entries_field),
+                from.offsets().clone(),
+                StructArray::new(
+                    Fields::from(vec![key_field, value_field]),
+                    vec![key_array, value_array],
+                    from.entries().nulls().cloned(),
+                ),
+                from.nulls().cloned(),
+                to_ordered,
+            )))
+        }
+        dt => Err(DataFusionError::Internal(format!(
+            "Expected MapType. Got: {dt}"
+        ))),
+    }
 }
 
 /// Gets the key field from the entries of a map.  For all other types returns 
None.
-pub(crate) fn key_field(entries_field: &FieldRef) -> Option<FieldRef> {
+fn key_field(entries_field: &FieldRef) -> Option<FieldRef> {
     if let DataType::Struct(fields) = entries_field.data_type() {
         fields.first().cloned()
     } else {
@@ -310,7 +309,7 @@ pub(crate) fn key_field(entries_field: &FieldRef) -> 
Option<FieldRef> {
 }
 
 /// Gets the value field from the entries of a map.  For all other types 
returns None.
-pub(crate) fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
+fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
     if let DataType::Struct(fields) = entries_field.data_type() {
         fields.get(1).cloned()
     } else {
diff --git a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs 
b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs
index 990327c5d..6571a8081 100644
--- a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs
+++ b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{Array, GenericListArray, OffsetSizeTrait, StructArray};
+use arrow::array::{make_array, Array, GenericListArray, OffsetSizeTrait, 
StructArray};
+use arrow::buffer::NullBuffer;
 use arrow::datatypes::{DataType, FieldRef, Schema};
 use arrow::record_batch::RecordBatch;
 use datafusion::common::{
@@ -80,10 +81,6 @@ impl PhysicalExpr for GetArrayStructFields {
         self
     }
 
-    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
-        unimplemented!()
-    }
-
     fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
         let struct_field = self.child_field(input_schema)?;
         match self.child.data_type(input_schema)? {
@@ -138,6 +135,10 @@ impl PhysicalExpr for GetArrayStructFields {
             _ => internal_err!("GetArrayStructFields should have exactly one 
child"),
         }
     }
+
+    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
+        unimplemented!()
+    }
 }
 
 fn get_array_struct_fields<O: OffsetSizeTrait>(
@@ -148,13 +149,36 @@ fn get_array_struct_fields<O: OffsetSizeTrait>(
         .values()
         .as_any()
         .downcast_ref::<StructArray>()
-        .expect("A struct is expected");
+        .expect("A StructType is expected");
 
-    let column = Arc::clone(values.column(ordinal));
     let field = Arc::clone(&values.fields()[ordinal]);
-
-    let offsets = list_array.offsets();
-    let array = GenericListArray::new(field, offsets.clone(), column, 
list_array.nulls().cloned());
+    // Get struct column by ordinal
+    let extracted_column = values.column(ordinal);
+
+    let data = if values.null_count() == extracted_column.null_count() {
+        Arc::clone(extracted_column)
+    } else {
+        // In some cases the column obtained from struct by ordinal doesn't
+        // represent all nulls that imposed by parent values.
+        // This maybe caused by a low level reader bug and needs more 
investigation.
+        // For this specific case we patch the null buffer for the column by 
merging nulls buffers
+        // from parent and column
+        let merged_nulls = NullBuffer::union(values.nulls(), 
extracted_column.nulls());
+        make_array(
+            extracted_column
+                .into_data()
+                .into_builder()
+                .nulls(merged_nulls)
+                .build()?,
+        )
+    };
+
+    let array = GenericListArray::new(
+        field,
+        list_array.offsets().clone(),
+        data,
+        list_array.nulls().cloned(),
+    );
 
     Ok(ColumnarValue::Array(Arc::new(array)))
 }
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index f7060d8a1..3fedaa7e3 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1975,11 +1975,9 @@ object QueryPlanSerde extends Logging with CometExprShim 
{
       case mk: MapKeys =>
         val childExpr = exprToProtoInternal(mk.child, inputs, binding)
         scalarFunctionExprToProto("map_keys", childExpr)
-//  commented out because of correctness issue
-//  https://github.com/apache/datafusion-comet/issues/1789
-//      case mv: MapValues =>
-//        val childExpr = exprToProtoInternal(mv.child, inputs, binding)
-//        scalarFunctionExprToProto("map_values", childExpr)
+      case mv: MapValues =>
+        val childExpr = exprToProtoInternal(mv.child, inputs, binding)
+        scalarFunctionExprToProto("map_values", childExpr)
       case _ =>
         withInfo(expr, s"${expr.prettyName} is not supported", expr.children: 
_*)
         None
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
index 3d725ee48..6d31d305d 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
@@ -332,18 +332,19 @@ class CometNativeReaderSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
       "select map_keys(c0).b from tbl")
   }
 
-//  commented out because of correctness issue 
https://github.com/apache/datafusion-comet/issues/1789
-//  test("native reader - select nested field from a complex map[struct, 
struct] using map_values") {
-//    testSingleLineQuery(
-//      """
-//        | select map(str0, str1) c0 from
-//        | (
-//        |   select named_struct('a', cast(1 as long), 'b', cast(2 as long), 
'c', cast(3 as long)) str0,
-//        |          named_struct('x', cast(8 as long), 'y', cast(9 as long), 
'z', cast(0 as long)) str1 union all
-//        |   select named_struct('a', cast(3 as long), 'b', cast(4 as long), 
'c', cast(5 as long)) str0,
-//        |          named_struct('x', cast(6 as long), 'y', cast(7 as long), 
'z', cast(8 as long)) str1
-//        | )
-//        |""".stripMargin,
-//      "select map_values(c0).b from tbl")
-//  }
+  test(
+    "native reader - select nested field from a complex map[struct, struct] 
using map_values") {
+    testSingleLineQuery(
+      """
+        | select map(str0, str1) c0 from
+        | (
+        |   select named_struct('a', cast(1 as long), 'b', cast(2 as long), 
'c', cast(3 as long)) str0,
+        |          named_struct('x', cast(8 as long), 'y', cast(9 as long), 
'z', cast(0 as long)) str1 union all
+        |   select named_struct('a', cast(3 as long), 'b', cast(4 as long), 
'c', cast(5 as long)) str0,
+        |          named_struct('x', cast(6 as long), 'y', cast(7 as long), 
'z', cast(8 as long)) str1 union all
+        |   select named_struct('a', cast(31 as long), 'b', cast(41 as long), 
'c', cast(51 as long)), null
+        | )
+        |""".stripMargin,
+      "select map_values(c0).y from tbl")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to