This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ac48ba3fa6 feat(spark): implement Spark `map` function 
`map_from_arrays` (#17456)
ac48ba3fa6 is described below

commit ac48ba3fa666a6181776ca26227a3362a6a4212e
Author: Evgenii Glotov <[email protected]>
AuthorDate: Wed Sep 24 17:50:04 2025 +0300

    feat(spark): implement Spark `map` function `map_from_arrays` (#17456)
    
    * feat(spark): implement Spark `map` function `map_from_arrays`
    
    * chore: add test with nested `map_from_arrays` calls, refactor 
map_deduplicate_keys to remove unnesessary variables and array slices
    
    * fix: clippy warning
    
    * fix: null and different size input lists treatment, chore: move common 
map funcs to utils.rs, add more tests
    
    * fix: typo
    
    * fix: clippy docstring warning
    
    * chore: move more helpers needed for multiple map functions to utils
    
    * chore: add multi-row tests
    
    * fix: null values treatment
    
    * fix: docstring warnings
---
 .../spark/src/function/map/map_from_arrays.rs      | 105 ++++++++++
 datafusion/spark/src/function/map/mod.rs           |  18 +-
 datafusion/spark/src/function/map/utils.rs         | 222 +++++++++++++++++++++
 .../test_files/spark/map/map_from_arrays.slt       | 136 +++++++++++++
 4 files changed, 479 insertions(+), 2 deletions(-)

diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs 
b/datafusion/spark/src/function/map/map_from_arrays.rs
new file mode 100644
index 0000000000..987548e353
--- /dev/null
+++ b/datafusion/spark/src/function/map/map_from_arrays.rs
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+
+use crate::function::map::utils::{
+    get_element_type, get_list_offsets, get_list_values,
+    map_from_keys_values_offsets_nulls, map_type_from_key_value_types,
+};
+use arrow::array::{Array, ArrayRef, NullArray};
+use arrow::compute::kernels::cast;
+use arrow::datatypes::DataType;
+use datafusion_common::utils::take_function_args;
+use datafusion_common::Result;
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use datafusion_functions::utils::make_scalar_function;
+
+/// Spark-compatible `map_from_arrays` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#map_from_arrays>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapFromArrays {
+    signature: Signature,
+}
+
+impl Default for MapFromArrays {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapFromArrays {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::any(2, Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for MapFromArrays {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "map_from_arrays"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        let [key_type, value_type] = take_function_args("map_from_arrays", 
arg_types)?;
+        Ok(map_type_from_key_value_types(
+            get_element_type(key_type)?,
+            get_element_type(value_type)?,
+        ))
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: datafusion_expr::ScalarFunctionArgs,
+    ) -> Result<ColumnarValue> {
+        make_scalar_function(map_from_arrays_inner, vec![])(&args.args)
+    }
+}
+
+fn map_from_arrays_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+    let [keys, values] = take_function_args("map_from_arrays", args)?;
+
+    if matches!(keys.data_type(), DataType::Null)
+        || matches!(values.data_type(), DataType::Null)
+    {
+        return Ok(cast(
+            &NullArray::new(keys.len()),
+            &map_type_from_key_value_types(
+                get_element_type(keys.data_type())?,
+                get_element_type(values.data_type())?,
+            ),
+        )?);
+    }
+
+    map_from_keys_values_offsets_nulls(
+        get_list_values(keys)?,
+        get_list_values(values)?,
+        &get_list_offsets(keys)?,
+        &get_list_offsets(values)?,
+        keys.nulls(),
+        values.nulls(),
+    )
+}
diff --git a/datafusion/spark/src/function/map/mod.rs 
b/datafusion/spark/src/function/map/mod.rs
index a87df9a2c8..21d1e0f108 100644
--- a/datafusion/spark/src/function/map/mod.rs
+++ b/datafusion/spark/src/function/map/mod.rs
@@ -15,11 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod map_from_arrays;
+mod utils;
+
 use datafusion_expr::ScalarUDF;
+use datafusion_functions::make_udf_function;
 use std::sync::Arc;
 
-pub mod expr_fn {}
+make_udf_function!(map_from_arrays::MapFromArrays, map_from_arrays);
+
+pub mod expr_fn {
+    use datafusion_functions::export_functions;
+
+    export_functions!((
+        map_from_arrays,
+        "Creates a map from arrays of keys and values.",
+        keys values
+    ));
+}
 
 pub fn functions() -> Vec<Arc<ScalarUDF>> {
-    vec![]
+    vec![map_from_arrays()]
 }
diff --git a/datafusion/spark/src/function/map/utils.rs 
b/datafusion/spark/src/function/map/utils.rs
new file mode 100644
index 0000000000..fa4fc5fae4
--- /dev/null
+++ b/datafusion/spark/src/function/map/utils.rs
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef, AsArray, BooleanBuilder, MapArray, 
StructArray};
+use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::compute::filter;
+use arrow::datatypes::{DataType, Field, Fields};
+use datafusion_common::{exec_err, Result, ScalarValue};
+
+/// Helper function to get element [`DataType`]
+/// from 
[`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)<br>
+/// [`Null`](DataType::Null) can be coerced to 
`ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is 
returned<br>
+/// For all other types [`exec_err`] is raised
+pub fn get_element_type(data_type: &DataType) -> Result<&DataType> {
+    match data_type {
+        DataType::Null => Ok(data_type),
+        DataType::List(element)
+        | DataType::LargeList(element)
+        | DataType::FixedSizeList(element, _) => Ok(element.data_type()),
+        _ => exec_err!(
+            "get_element_type expects List/LargeList/FixedSizeList/Null as 
argument, got {data_type:?}"
+        ),
+    }
+}
+
+/// Helper function to get [`values`](arrow::array::ListArray::values)
+/// from 
[`ListArray`](arrow::array::ListArray)/[`LargeListArray`](arrow::array::LargeListArray)/[`FixedSizeListArray`](arrow::array::FixedSizeListArray)<br>
+/// [`NullArray`](arrow::array::NullArray) can be coerced to 
`ListType`([`Null`](DataType::Null)), so [`NullArray`](arrow::array::NullArray) 
is returned<br>
+/// For all other types [`exec_err`] is raised
+pub fn get_list_values(array: &ArrayRef) -> Result<&ArrayRef> {
+    match array.data_type() {
+        DataType::Null => Ok(array),
+        DataType::List(_) => Ok(array.as_list::<i32>().values()),
+        DataType::LargeList(_) => Ok(array.as_list::<i64>().values()),
+        DataType::FixedSizeList(..) => Ok(array.as_fixed_size_list().values()),
+        wrong_type => exec_err!(
+            "get_list_values expects List/LargeList/FixedSizeList/Null as 
argument, got {wrong_type:?}"
+        ),
+    }
+}
+
+/// Helper function to get [`offsets`](arrow::array::ListArray::offsets)
+/// from 
[`ListArray`](arrow::array::ListArray)/[`LargeListArray`](arrow::array::LargeListArray)/[`FixedSizeListArray`](arrow::array::FixedSizeListArray)<br>
+/// For all other types [`exec_err`] is raised
+pub fn get_list_offsets(array: &ArrayRef) -> Result<Cow<'_, [i32]>> {
+    match array.data_type() {
+        DataType::List(_) => 
Ok(Cow::Borrowed(array.as_list::<i32>().offsets().as_ref())),
+        DataType::LargeList(_) => Ok(Cow::Owned(
+            array.as_list::<i64>()
+                .offsets()
+                .iter()
+                .map(|i| *i as i32)
+                .collect::<Vec<_>>(),
+        )),
+        DataType::FixedSizeList(_, size) => Ok(Cow::Owned(
+             (0..=array.len() as i32).map(|i| size * i).collect()
+        )),
+        wrong_type => exec_err!(
+            "get_list_offsets expects List/LargeList/FixedSizeList as 
argument, got {wrong_type:?}"
+        ),
+    }
+}
+
+/// Helper function to construct [`MapType<K, V>`](DataType::Map) given K and 
V DataTypes for keys and values
+/// - Map keys are unsorted
+/// - Map keys are non-nullable
+/// - Map entries are non-nullable
+/// - Map values can be null
+pub fn map_type_from_key_value_types(
+    key_type: &DataType,
+    value_type: &DataType,
+) -> DataType {
+    DataType::Map(
+        Arc::new(Field::new(
+            "entries",
+            DataType::Struct(Fields::from(vec![
+                // the key must not be nullable
+                Field::new("key", key_type.clone(), false),
+                Field::new("value", value_type.clone(), true),
+            ])),
+            false, // the entry is not nullable
+        )),
+        false, // the keys are not sorted
+    )
+}
+
+/// Helper function to construct MapArray from flattened ListArrays and 
OffsetBuffer
+///
+/// Logic is close to 
`datafusion_functions_nested::map::make_map_array_internal`<br>
+/// But there are some core differences:
+/// 1. Input arrays are not [`ListArrays`](arrow::array::ListArray) itself, 
but their flattened [`values`](arrow::array::ListArray::values)<br>
+///    So the inputs can be 
[`ListArray`](`arrow::array::ListArray`)/[`LargeListArray`](`arrow::array::LargeListArray`)/[`FixedSizeListArray`](`arrow::array::FixedSizeListArray`)<br>
+///    To preserve the row info, [`offsets`](arrow::array::ListArray::offsets) 
and [`nulls`](arrow::array::ListArray::nulls) for both keys and values need to 
be provided<br>
+///    [`FixedSizeListArray`](`arrow::array::FixedSizeListArray`) has no 
`offsets`, so they can be generated as a cumulative sum of it's `Size`
+/// 2. Spark provides 
[spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961)
+///    to handle duplicate keys<br>
+///    For now, configurable functions are not supported by Datafusion<br>
+///    So more permissive `LAST_WIN` option is used in this implementation 
(instead of `EXCEPTION`)<br>
+///    `EXCEPTION` behaviour can still be achieved externally in cost of 
performance:<br>
+///    `when(array_length(array_distinct(keys)) == array_length(keys), 
constructed_map)`<br>
+///    `.otherwise(raise_error("duplicate keys occurred during map 
construction"))`
+pub fn map_from_keys_values_offsets_nulls(
+    flat_keys: &ArrayRef,
+    flat_values: &ArrayRef,
+    keys_offsets: &[i32],
+    values_offsets: &[i32],
+    keys_nulls: Option<&NullBuffer>,
+    values_nulls: Option<&NullBuffer>,
+) -> Result<ArrayRef> {
+    let (keys, values, offsets) = map_deduplicate_keys(
+        flat_keys,
+        flat_values,
+        keys_offsets,
+        values_offsets,
+        keys_nulls,
+        values_nulls,
+    )?;
+    let nulls = NullBuffer::union(keys_nulls, values_nulls);
+
+    let fields = Fields::from(vec![
+        Field::new("key", flat_keys.data_type().clone(), false),
+        Field::new("value", flat_values.data_type().clone(), true),
+    ]);
+    let entries = StructArray::try_new(fields.clone(), vec![keys, values], 
None)?;
+    let field = Arc::new(Field::new("entries", DataType::Struct(fields), 
false));
+    Ok(Arc::new(MapArray::try_new(
+        field, offsets, entries, nulls, false,
+    )?))
+}
+
+fn map_deduplicate_keys(
+    flat_keys: &ArrayRef,
+    flat_values: &ArrayRef,
+    keys_offsets: &[i32],
+    values_offsets: &[i32],
+    keys_nulls: Option<&NullBuffer>,
+    values_nulls: Option<&NullBuffer>,
+) -> Result<(ArrayRef, ArrayRef, OffsetBuffer<i32>)> {
+    let offsets_len = keys_offsets.len();
+    let mut new_offsets = Vec::with_capacity(offsets_len);
+
+    let mut cur_keys_offset = 0;
+    let mut cur_values_offset = 0;
+    let mut new_last_offset = 0;
+    new_offsets.push(new_last_offset);
+
+    let mut keys_mask_builder = BooleanBuilder::new();
+    let mut values_mask_builder = BooleanBuilder::new();
+    for (row_idx, (next_keys_offset, next_values_offset)) in keys_offsets
+        .iter()
+        .zip(values_offsets.iter())
+        .skip(1)
+        .enumerate()
+    {
+        let num_keys_entries = *next_keys_offset as usize - cur_keys_offset;
+        let num_values_entries = *next_values_offset as usize - 
cur_values_offset;
+
+        let mut keys_mask_one = [false].repeat(num_keys_entries);
+        let mut values_mask_one = [false].repeat(num_values_entries);
+
+        if num_keys_entries != num_values_entries {
+            let key_is_valid = keys_nulls.is_none_or(|buf| 
buf.is_valid(row_idx));
+            let value_is_valid = values_nulls.is_none_or(|buf| 
buf.is_valid(row_idx));
+            if key_is_valid && value_is_valid {
+                return exec_err!("map_deduplicate_keys: keys and values lists 
in the same row must have equal lengths");
+            }
+            // else the result entry is NULL
+            // both current row offsets are skipped
+            // keys or values in the current row are marked false in the masks
+        } else if num_keys_entries != 0 {
+            let mut seen_keys = HashSet::new();
+
+            for cur_entry_idx in (0..num_keys_entries).rev() {
+                let key = ScalarValue::try_from_array(
+                    &flat_keys,
+                    cur_keys_offset + cur_entry_idx,
+                )?
+                .compacted();
+                if seen_keys.contains(&key) {
+                    // TODO: implement configuration and logic for 
spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config)
+                    // exec_err!("invalid argument: duplicate keys in map")
+                    // 
https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961
+                } else {
+                    // This code implements deduplication logic for 
spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config)
+                    keys_mask_one[cur_entry_idx] = true;
+                    values_mask_one[cur_entry_idx] = true;
+                    seen_keys.insert(key);
+                    new_last_offset += 1;
+                }
+            }
+        }
+        keys_mask_builder.append_array(&keys_mask_one.into());
+        values_mask_builder.append_array(&values_mask_one.into());
+        new_offsets.push(new_last_offset);
+        cur_keys_offset += num_keys_entries;
+        cur_values_offset += num_values_entries;
+    }
+    let keys_mask = keys_mask_builder.finish();
+    let values_mask = values_mask_builder.finish();
+    let needed_keys = filter(&flat_keys, &keys_mask)?;
+    let needed_values = filter(&flat_values, &values_mask)?;
+    let offsets = OffsetBuffer::new(new_offsets.into());
+    Ok((needed_keys, needed_values, offsets))
+}
diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt 
b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt
new file mode 100644
index 0000000000..a26b0435c9
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Spark doctests
+query ?
+SELECT map_from_arrays(array(1.0, 3.0), array('2', '4'));
+----
+{1.0: 2, 3.0: 4}
+
+query ?
+SELECT map_from_arrays(array(2, 5), array('a', 'b'));
+----
+{2: a, 5: b}
+
+query ?
+SELECT map_from_arrays(array(1, 2), array('a', NULL));
+----
+{1: a, 2: NULL}
+
+query ?
+SELECT map_from_arrays(cast(array() as array<int>), cast(array() as 
array<string>));
+----
+{}
+
+# Tests with DataType:Null input arrays
+query ?
+SELECT map_from_arrays(NULL, NULL);
+----
+NULL
+
+query ?
+SELECT map_from_arrays(array(1), NULL);
+----
+NULL
+
+query ?
+SELECT map_from_arrays(NULL, array(1));
+----
+NULL
+
+# Tests with different inner lists lengths
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and 
values lists in the same row must have equal lengths
+SELECT map_from_arrays(array(1, 2, 3), array('a', 'b'));
+
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and 
values lists in the same row must have equal lengths
+SELECT map_from_arrays(array(), array('a', 'b'));
+
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and 
values lists in the same row must have equal lengths
+SELECT map_from_arrays(array(1, 2, 3), array());
+
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and 
values lists in the same row must have equal lengths
+select map_from_arrays(a, b)
+from values 
+    (array[1], array[1]),
+    (array[2, 3, 4], array[2, 3]),
+    (array[5], array[4])
+as tab(a, b);
+
+#Test with multiple rows: good, empty and nullable
+query ?
+select map_from_arrays(a, b)
+from values 
+    (array[1], array['a']), 
+    (NULL, NULL),
+    (array[1,2,3], NULL),
+    (NULL, array['b', 'c']), 
+    (array[4, 5], array['d', 'e']), 
+    (array[], array[]),
+    (array[6, 7, 8], array['f', 'g', 'h']) 
+as tab(a, b);
+----
+{1: a}
+NULL
+NULL
+NULL
+{4: d, 5: e}
+{}
+{6: f, 7: g, 8: h}
+
+# Test with complex types
+query ?
+SELECT map_from_arrays(array(array('a', 'b'), array('c', 'd')), 
array(struct(1, 2, 3), struct(4, 5, 6)));
+----
+{[a, b]: {c0: 1, c1: 2, c2: 3}, [c, d]: {c0: 4, c1: 5, c2: 6}}
+
+# Test with nested function calls
+query ?
+SELECT
+    map_from_arrays(
+        array['outer_key1', 'outer_key2'],
+        array[
+            -- value for outer_key1: a map itself
+            map_from_arrays(
+                array['inner_a', 'inner_b'],
+                array[1, 2]
+            ),
+            -- value for outer_key2: another map
+            map_from_arrays(
+                array['inner_x', 'inner_y', 'inner_z'],
+                array[10, 20, 30]
+            )
+        ]
+    ) AS nested_map;
+----
+{outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, 
inner_z: 30}}
+
+# Test with duplicate keys
+query ?
+SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b'));
+----
+{false: NULL, true: b}
+
+# Tests with different list types
+query ?
+SELECT map_from_arrays(arrow_cast(array(2, 5), 'LargeList(Int32)'), 
arrow_cast(array('a', 'b'), 'FixedSizeList(2, Utf8)'));
+----
+{2: a, 5: b}
+
+query ?
+SELECT map_from_arrays(arrow_cast(array('a', 'b', 'c'), 'FixedSizeList(3, 
Utf8)'), arrow_cast(array(1, 2, 3), 'LargeList(Int32)'));
+----
+{a: 1, b: 2, c: 3}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to