This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ac48ba3fa6 feat(spark): implement Spark `map` function
`map_from_arrays` (#17456)
ac48ba3fa6 is described below
commit ac48ba3fa666a6181776ca26227a3362a6a4212e
Author: Evgenii Glotov <[email protected]>
AuthorDate: Wed Sep 24 17:50:04 2025 +0300
feat(spark): implement Spark `map` function `map_from_arrays` (#17456)
* feat(spark): implement Spark `map` function `map_from_arrays`
* chore: add test with nested `map_from_arrays` calls, refactor
map_deduplicate_keys to remove unnesessary variables and array slices
* fix: clippy warning
* fix: null and different size input lists treatment, chore: move common
map funcs to utils.rs, add more tests
* fix: typo
* fix: clippy docstring warning
* chore: move more helpers needed for multiple map functions to utils
* chore: add multi-row tests
* fix: null values treatment
* fix: docstring warnings
---
.../spark/src/function/map/map_from_arrays.rs | 105 ++++++++++
datafusion/spark/src/function/map/mod.rs | 18 +-
datafusion/spark/src/function/map/utils.rs | 222 +++++++++++++++++++++
.../test_files/spark/map/map_from_arrays.slt | 136 +++++++++++++
4 files changed, 479 insertions(+), 2 deletions(-)
diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs
b/datafusion/spark/src/function/map/map_from_arrays.rs
new file mode 100644
index 0000000000..987548e353
--- /dev/null
+++ b/datafusion/spark/src/function/map/map_from_arrays.rs
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+
+use crate::function::map::utils::{
+ get_element_type, get_list_offsets, get_list_values,
+ map_from_keys_values_offsets_nulls, map_type_from_key_value_types,
+};
+use arrow::array::{Array, ArrayRef, NullArray};
+use arrow::compute::kernels::cast;
+use arrow::datatypes::DataType;
+use datafusion_common::utils::take_function_args;
+use datafusion_common::Result;
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use datafusion_functions::utils::make_scalar_function;
+
+/// Spark-compatible `map_from_arrays` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#map_from_arrays>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct MapFromArrays {
+ signature: Signature,
+}
+
+impl Default for MapFromArrays {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl MapFromArrays {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::any(2, Volatility::Immutable),
+ }
+ }
+}
+
+impl ScalarUDFImpl for MapFromArrays {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "map_from_arrays"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ let [key_type, value_type] = take_function_args("map_from_arrays",
arg_types)?;
+ Ok(map_type_from_key_value_types(
+ get_element_type(key_type)?,
+ get_element_type(value_type)?,
+ ))
+ }
+
+ fn invoke_with_args(
+ &self,
+ args: datafusion_expr::ScalarFunctionArgs,
+ ) -> Result<ColumnarValue> {
+ make_scalar_function(map_from_arrays_inner, vec![])(&args.args)
+ }
+}
+
+fn map_from_arrays_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let [keys, values] = take_function_args("map_from_arrays", args)?;
+
+ if matches!(keys.data_type(), DataType::Null)
+ || matches!(values.data_type(), DataType::Null)
+ {
+ return Ok(cast(
+ &NullArray::new(keys.len()),
+ &map_type_from_key_value_types(
+ get_element_type(keys.data_type())?,
+ get_element_type(values.data_type())?,
+ ),
+ )?);
+ }
+
+ map_from_keys_values_offsets_nulls(
+ get_list_values(keys)?,
+ get_list_values(values)?,
+ &get_list_offsets(keys)?,
+ &get_list_offsets(values)?,
+ keys.nulls(),
+ values.nulls(),
+ )
+}
diff --git a/datafusion/spark/src/function/map/mod.rs
b/datafusion/spark/src/function/map/mod.rs
index a87df9a2c8..21d1e0f108 100644
--- a/datafusion/spark/src/function/map/mod.rs
+++ b/datafusion/spark/src/function/map/mod.rs
@@ -15,11 +15,25 @@
// specific language governing permissions and limitations
// under the License.
+pub mod map_from_arrays;
+mod utils;
+
use datafusion_expr::ScalarUDF;
+use datafusion_functions::make_udf_function;
use std::sync::Arc;
-pub mod expr_fn {}
+make_udf_function!(map_from_arrays::MapFromArrays, map_from_arrays);
+
+pub mod expr_fn {
+ use datafusion_functions::export_functions;
+
+ export_functions!((
+ map_from_arrays,
+ "Creates a map from arrays of keys and values.",
+ keys values
+ ));
+}
pub fn functions() -> Vec<Arc<ScalarUDF>> {
- vec![]
+ vec![map_from_arrays()]
}
diff --git a/datafusion/spark/src/function/map/utils.rs
b/datafusion/spark/src/function/map/utils.rs
new file mode 100644
index 0000000000..fa4fc5fae4
--- /dev/null
+++ b/datafusion/spark/src/function/map/utils.rs
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashSet;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef, AsArray, BooleanBuilder, MapArray,
StructArray};
+use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::compute::filter;
+use arrow::datatypes::{DataType, Field, Fields};
+use datafusion_common::{exec_err, Result, ScalarValue};
+
+/// Helper function to get element [`DataType`]
+/// from
[`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)<br>
+/// [`Null`](DataType::Null) can be coerced to
`ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is
returned<br>
+/// For all other types [`exec_err`] is raised
+pub fn get_element_type(data_type: &DataType) -> Result<&DataType> {
+ match data_type {
+ DataType::Null => Ok(data_type),
+ DataType::List(element)
+ | DataType::LargeList(element)
+ | DataType::FixedSizeList(element, _) => Ok(element.data_type()),
+ _ => exec_err!(
+ "get_element_type expects List/LargeList/FixedSizeList/Null as
argument, got {data_type:?}"
+ ),
+ }
+}
+
+/// Helper function to get [`values`](arrow::array::ListArray::values)
+/// from
[`ListArray`](arrow::array::ListArray)/[`LargeListArray`](arrow::array::LargeListArray)/[`FixedSizeListArray`](arrow::array::FixedSizeListArray)<br>
+/// [`NullArray`](arrow::array::NullArray) can be coerced to
`ListType`([`Null`](DataType::Null)), so [`NullArray`](arrow::array::NullArray)
is returned<br>
+/// For all other types [`exec_err`] is raised
+pub fn get_list_values(array: &ArrayRef) -> Result<&ArrayRef> {
+ match array.data_type() {
+ DataType::Null => Ok(array),
+ DataType::List(_) => Ok(array.as_list::<i32>().values()),
+ DataType::LargeList(_) => Ok(array.as_list::<i64>().values()),
+ DataType::FixedSizeList(..) => Ok(array.as_fixed_size_list().values()),
+ wrong_type => exec_err!(
+ "get_list_values expects List/LargeList/FixedSizeList/Null as
argument, got {wrong_type:?}"
+ ),
+ }
+}
+
+/// Helper function to get [`offsets`](arrow::array::ListArray::offsets)
+/// from
[`ListArray`](arrow::array::ListArray)/[`LargeListArray`](arrow::array::LargeListArray)/[`FixedSizeListArray`](arrow::array::FixedSizeListArray)<br>
+/// For all other types [`exec_err`] is raised
+pub fn get_list_offsets(array: &ArrayRef) -> Result<Cow<'_, [i32]>> {
+ match array.data_type() {
+ DataType::List(_) =>
Ok(Cow::Borrowed(array.as_list::<i32>().offsets().as_ref())),
+ DataType::LargeList(_) => Ok(Cow::Owned(
+ array.as_list::<i64>()
+ .offsets()
+ .iter()
+ .map(|i| *i as i32)
+ .collect::<Vec<_>>(),
+ )),
+ DataType::FixedSizeList(_, size) => Ok(Cow::Owned(
+ (0..=array.len() as i32).map(|i| size * i).collect()
+ )),
+ wrong_type => exec_err!(
+ "get_list_offsets expects List/LargeList/FixedSizeList as
argument, got {wrong_type:?}"
+ ),
+ }
+}
+
+/// Helper function to construct [`MapType<K, V>`](DataType::Map) given K and
V DataTypes for keys and values
+/// - Map keys are unsorted
+/// - Map keys are non-nullable
+/// - Map entries are non-nullable
+/// - Map values can be null
+pub fn map_type_from_key_value_types(
+ key_type: &DataType,
+ value_type: &DataType,
+) -> DataType {
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(Fields::from(vec![
+ // the key must not be nullable
+ Field::new("key", key_type.clone(), false),
+ Field::new("value", value_type.clone(), true),
+ ])),
+ false, // the entry is not nullable
+ )),
+ false, // the keys are not sorted
+ )
+}
+
+/// Helper function to construct MapArray from flattened ListArrays and
OffsetBuffer
+///
+/// Logic is close to
`datafusion_functions_nested::map::make_map_array_internal`<br>
+/// But there are some core differences:
+/// 1. Input arrays are not [`ListArrays`](arrow::array::ListArray) itself,
but their flattened [`values`](arrow::array::ListArray::values)<br>
+/// So the inputs can be
[`ListArray`](`arrow::array::ListArray`)/[`LargeListArray`](`arrow::array::LargeListArray`)/[`FixedSizeListArray`](`arrow::array::FixedSizeListArray`)<br>
+/// To preserve the row info, [`offsets`](arrow::array::ListArray::offsets)
and [`nulls`](arrow::array::ListArray::nulls) for both keys and values need to
be provided<br>
+/// [`FixedSizeListArray`](`arrow::array::FixedSizeListArray`) has no
`offsets`, so they can be generated as a cumulative sum of it's `Size`
+/// 2. Spark provides
[spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961)
+/// to handle duplicate keys<br>
+/// For now, configurable functions are not supported by Datafusion<br>
+/// So more permissive `LAST_WIN` option is used in this implementation
(instead of `EXCEPTION`)<br>
+/// `EXCEPTION` behaviour can still be achieved externally in cost of
performance:<br>
+/// `when(array_length(array_distinct(keys)) == array_length(keys),
constructed_map)`<br>
+/// `.otherwise(raise_error("duplicate keys occurred during map
construction"))`
+pub fn map_from_keys_values_offsets_nulls(
+ flat_keys: &ArrayRef,
+ flat_values: &ArrayRef,
+ keys_offsets: &[i32],
+ values_offsets: &[i32],
+ keys_nulls: Option<&NullBuffer>,
+ values_nulls: Option<&NullBuffer>,
+) -> Result<ArrayRef> {
+ let (keys, values, offsets) = map_deduplicate_keys(
+ flat_keys,
+ flat_values,
+ keys_offsets,
+ values_offsets,
+ keys_nulls,
+ values_nulls,
+ )?;
+ let nulls = NullBuffer::union(keys_nulls, values_nulls);
+
+ let fields = Fields::from(vec![
+ Field::new("key", flat_keys.data_type().clone(), false),
+ Field::new("value", flat_values.data_type().clone(), true),
+ ]);
+ let entries = StructArray::try_new(fields.clone(), vec![keys, values],
None)?;
+ let field = Arc::new(Field::new("entries", DataType::Struct(fields),
false));
+ Ok(Arc::new(MapArray::try_new(
+ field, offsets, entries, nulls, false,
+ )?))
+}
+
+fn map_deduplicate_keys(
+ flat_keys: &ArrayRef,
+ flat_values: &ArrayRef,
+ keys_offsets: &[i32],
+ values_offsets: &[i32],
+ keys_nulls: Option<&NullBuffer>,
+ values_nulls: Option<&NullBuffer>,
+) -> Result<(ArrayRef, ArrayRef, OffsetBuffer<i32>)> {
+ let offsets_len = keys_offsets.len();
+ let mut new_offsets = Vec::with_capacity(offsets_len);
+
+ let mut cur_keys_offset = 0;
+ let mut cur_values_offset = 0;
+ let mut new_last_offset = 0;
+ new_offsets.push(new_last_offset);
+
+ let mut keys_mask_builder = BooleanBuilder::new();
+ let mut values_mask_builder = BooleanBuilder::new();
+ for (row_idx, (next_keys_offset, next_values_offset)) in keys_offsets
+ .iter()
+ .zip(values_offsets.iter())
+ .skip(1)
+ .enumerate()
+ {
+ let num_keys_entries = *next_keys_offset as usize - cur_keys_offset;
+ let num_values_entries = *next_values_offset as usize -
cur_values_offset;
+
+ let mut keys_mask_one = [false].repeat(num_keys_entries);
+ let mut values_mask_one = [false].repeat(num_values_entries);
+
+ if num_keys_entries != num_values_entries {
+ let key_is_valid = keys_nulls.is_none_or(|buf|
buf.is_valid(row_idx));
+ let value_is_valid = values_nulls.is_none_or(|buf|
buf.is_valid(row_idx));
+ if key_is_valid && value_is_valid {
+ return exec_err!("map_deduplicate_keys: keys and values lists
in the same row must have equal lengths");
+ }
+ // else the result entry is NULL
+ // both current row offsets are skipped
+ // keys or values in the current row are marked false in the masks
+ } else if num_keys_entries != 0 {
+ let mut seen_keys = HashSet::new();
+
+ for cur_entry_idx in (0..num_keys_entries).rev() {
+ let key = ScalarValue::try_from_array(
+ &flat_keys,
+ cur_keys_offset + cur_entry_idx,
+ )?
+ .compacted();
+ if seen_keys.contains(&key) {
+ // TODO: implement configuration and logic for
spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config)
+ // exec_err!("invalid argument: duplicate keys in map")
+ //
https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961
+ } else {
+ // This code implements deduplication logic for
spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config)
+ keys_mask_one[cur_entry_idx] = true;
+ values_mask_one[cur_entry_idx] = true;
+ seen_keys.insert(key);
+ new_last_offset += 1;
+ }
+ }
+ }
+ keys_mask_builder.append_array(&keys_mask_one.into());
+ values_mask_builder.append_array(&values_mask_one.into());
+ new_offsets.push(new_last_offset);
+ cur_keys_offset += num_keys_entries;
+ cur_values_offset += num_values_entries;
+ }
+ let keys_mask = keys_mask_builder.finish();
+ let values_mask = values_mask_builder.finish();
+ let needed_keys = filter(&flat_keys, &keys_mask)?;
+ let needed_values = filter(&flat_values, &values_mask)?;
+ let offsets = OffsetBuffer::new(new_offsets.into());
+ Ok((needed_keys, needed_values, offsets))
+}
diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt
b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt
new file mode 100644
index 0000000000..a26b0435c9
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Spark doctests
+query ?
+SELECT map_from_arrays(array(1.0, 3.0), array('2', '4'));
+----
+{1.0: 2, 3.0: 4}
+
+query ?
+SELECT map_from_arrays(array(2, 5), array('a', 'b'));
+----
+{2: a, 5: b}
+
+query ?
+SELECT map_from_arrays(array(1, 2), array('a', NULL));
+----
+{1: a, 2: NULL}
+
+query ?
+SELECT map_from_arrays(cast(array() as array<int>), cast(array() as
array<string>));
+----
+{}
+
+# Tests with DataType:Null input arrays
+query ?
+SELECT map_from_arrays(NULL, NULL);
+----
+NULL
+
+query ?
+SELECT map_from_arrays(array(1), NULL);
+----
+NULL
+
+query ?
+SELECT map_from_arrays(NULL, array(1));
+----
+NULL
+
+# Tests with different inner lists lengths
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and
values lists in the same row must have equal lengths
+SELECT map_from_arrays(array(1, 2, 3), array('a', 'b'));
+
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and
values lists in the same row must have equal lengths
+SELECT map_from_arrays(array(), array('a', 'b'));
+
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and
values lists in the same row must have equal lengths
+SELECT map_from_arrays(array(1, 2, 3), array());
+
+query error DataFusion error: Execution error: map_deduplicate_keys: keys and
values lists in the same row must have equal lengths
+select map_from_arrays(a, b)
+from values
+ (array[1], array[1]),
+ (array[2, 3, 4], array[2, 3]),
+ (array[5], array[4])
+as tab(a, b);
+
+#Test with multiple rows: good, empty and nullable
+query ?
+select map_from_arrays(a, b)
+from values
+ (array[1], array['a']),
+ (NULL, NULL),
+ (array[1,2,3], NULL),
+ (NULL, array['b', 'c']),
+ (array[4, 5], array['d', 'e']),
+ (array[], array[]),
+ (array[6, 7, 8], array['f', 'g', 'h'])
+as tab(a, b);
+----
+{1: a}
+NULL
+NULL
+NULL
+{4: d, 5: e}
+{}
+{6: f, 7: g, 8: h}
+
+# Test with complex types
+query ?
+SELECT map_from_arrays(array(array('a', 'b'), array('c', 'd')),
array(struct(1, 2, 3), struct(4, 5, 6)));
+----
+{[a, b]: {c0: 1, c1: 2, c2: 3}, [c, d]: {c0: 4, c1: 5, c2: 6}}
+
+# Test with nested function calls
+query ?
+SELECT
+ map_from_arrays(
+ array['outer_key1', 'outer_key2'],
+ array[
+ -- value for outer_key1: a map itself
+ map_from_arrays(
+ array['inner_a', 'inner_b'],
+ array[1, 2]
+ ),
+ -- value for outer_key2: another map
+ map_from_arrays(
+ array['inner_x', 'inner_y', 'inner_z'],
+ array[10, 20, 30]
+ )
+ ]
+ ) AS nested_map;
+----
+{outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20,
inner_z: 30}}
+
+# Test with duplicate keys
+query ?
+SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b'));
+----
+{false: NULL, true: b}
+
+# Tests with different list types
+query ?
+SELECT map_from_arrays(arrow_cast(array(2, 5), 'LargeList(Int32)'),
arrow_cast(array('a', 'b'), 'FixedSizeList(2, Utf8)'));
+----
+{2: a, 5: b}
+
+query ?
+SELECT map_from_arrays(arrow_cast(array('a', 'b', 'c'), 'FixedSizeList(3,
Utf8)'), arrow_cast(array(1, 2, 3), 'LargeList(Int32)'));
+----
+{a: 1, b: 2, c: 3}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]