This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5ba634aa4f Implement ScalarFunction `MAKE_MAP` and `MAP` (#11361)
5ba634aa4f is described below
commit 5ba634aa4f6d3d4ed5eefbc15dba5448f4f30923
Author: Jax Liu <[email protected]>
AuthorDate: Fri Jul 12 14:43:49 2024 +0800
Implement ScalarFunction `MAKE_MAP` and `MAP` (#11361)
* tmp
* opt
* modify test
* add another version
* implement make_map function
* implement make_map function
* implement map function
* format and modify the doc
* add benchmark for map function
* add empty end-line
* fix cargo check
* update lock
* upate lock
* fix clippy
* fmt and clippy
* support FixedSizeList and LargeList
* check type and handle null array in coerce_types
* make array value throw todo error
* fix clippy
* simpify the error tests
---
datafusion-cli/Cargo.lock | 1 +
datafusion/functions/Cargo.toml | 7 +-
datafusion/functions/benches/map.rs | 101 ++++++++++
datafusion/functions/src/core/map.rs | 312 +++++++++++++++++++++++++++++
datafusion/functions/src/core/mod.rs | 13 ++
datafusion/sqllogictest/test_files/map.slt | 112 +++++++++++
6 files changed, 545 insertions(+), 1 deletion(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 8af42cb439..7da9cc427c 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1278,6 +1278,7 @@ name = "datafusion-functions"
version = "40.0.0"
dependencies = [
"arrow",
+ "arrow-buffer",
"base64 0.22.1",
"blake2",
"blake3",
diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index 884a66724c..b143080b19 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -66,6 +66,7 @@ path = "src/lib.rs"
[dependencies]
arrow = { workspace = true }
+arrow-buffer = { workspace = true }
base64 = { version = "0.22", optional = true }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
@@ -86,7 +87,6 @@ uuid = { version = "1.7", features = ["v4"], optional = true }
[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
-arrow-buffer = { workspace = true }
criterion = "0.5"
rand = { workspace = true }
rstest = { workspace = true }
@@ -141,3 +141,8 @@ required-features = ["string_expressions"]
harness = false
name = "upper"
required-features = ["string_expressions"]
+
+[[bench]]
+harness = false
+name = "map"
+required-features = ["core_expressions"]
diff --git a/datafusion/functions/benches/map.rs
b/datafusion/functions/benches/map.rs
new file mode 100644
index 0000000000..cd863d0e33
--- /dev/null
+++ b/datafusion/functions/benches/map.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use arrow::array::{Int32Array, ListArray, StringArray};
+use arrow::datatypes::{DataType, Field};
+use arrow_buffer::{OffsetBuffer, ScalarBuffer};
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_common::ScalarValue;
+use datafusion_expr::ColumnarValue;
+use datafusion_functions::core::{make_map, map};
+use rand::prelude::ThreadRng;
+use rand::Rng;
+use std::sync::Arc;
+
+fn keys(rng: &mut ThreadRng) -> Vec<String> {
+ let mut keys = vec![];
+ for _ in 0..1000 {
+ keys.push(rng.gen_range(0..9999).to_string());
+ }
+ keys
+}
+
+fn values(rng: &mut ThreadRng) -> Vec<i32> {
+ let mut values = vec![];
+ for _ in 0..1000 {
+ values.push(rng.gen_range(0..9999));
+ }
+ values
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ c.bench_function("make_map_1000", |b| {
+ let mut rng = rand::thread_rng();
+ let keys = keys(&mut rng);
+ let values = values(&mut rng);
+ let mut buffer = Vec::new();
+ for i in 0..1000 {
+ buffer.push(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
+ keys[i].clone(),
+ ))));
+
buffer.push(ColumnarValue::Scalar(ScalarValue::Int32(Some(values[i]))));
+ }
+
+ b.iter(|| {
+ black_box(
+ make_map()
+ .invoke(&buffer)
+ .expect("map should work on valid values"),
+ );
+ });
+ });
+
+ c.bench_function("map_1000", |b| {
+ let mut rng = rand::thread_rng();
+ let field = Arc::new(Field::new("item", DataType::Utf8, true));
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+ let key_list = ListArray::new(
+ field,
+ offsets,
+ Arc::new(StringArray::from(keys(&mut rng))),
+ None,
+ );
+ let field = Arc::new(Field::new("item", DataType::Int32, true));
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+ let value_list = ListArray::new(
+ field,
+ offsets,
+ Arc::new(Int32Array::from(values(&mut rng))),
+ None,
+ );
+ let keys =
ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
+ let values =
ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));
+
+ b.iter(|| {
+ black_box(
+ map()
+ .invoke(&[keys.clone(), values.clone()])
+ .expect("map should work on valid values"),
+ );
+ });
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions/src/core/map.rs
b/datafusion/functions/src/core/map.rs
new file mode 100644
index 0000000000..8a8a19d7af
--- /dev/null
+++ b/datafusion/functions/src/core/map.rs
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray};
+use arrow::compute::concat;
+use arrow::datatypes::{DataType, Field, SchemaBuilder};
+use arrow_buffer::{Buffer, ToByteSlice};
+
+use datafusion_common::{exec_err, internal_err, ScalarValue};
+use datafusion_common::{not_impl_err, Result};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
+fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let (key, value): (Vec<_>, Vec<_>) = args
+ .chunks_exact(2)
+ .map(|chunk| {
+ if let ColumnarValue::Array(_) = chunk[0] {
+ return not_impl_err!("make_map does not support array keys");
+ }
+ if let ColumnarValue::Array(_) = chunk[1] {
+ return not_impl_err!("make_map does not support array values");
+ }
+ Ok((chunk[0].clone(), chunk[1].clone()))
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .unzip();
+
+ let keys = ColumnarValue::values_to_arrays(&key)?;
+ let values = ColumnarValue::values_to_arrays(&value)?;
+
+ let keys: Vec<_> = keys.iter().map(|k| k.as_ref()).collect();
+ let values: Vec<_> = values.iter().map(|v| v.as_ref()).collect();
+
+ let key = match concat(&keys) {
+ Ok(key) => key,
+ Err(e) => return internal_err!("Error concatenating keys: {}", e),
+ };
+ let value = match concat(&values) {
+ Ok(value) => value,
+ Err(e) => return internal_err!("Error concatenating values: {}", e),
+ };
+ make_map_batch_internal(key, value)
+}
+
+fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ if args.len() != 2 {
+ return exec_err!(
+ "make_map requires exactly 2 arguments, got {} instead",
+ args.len()
+ );
+ }
+ let key = get_first_array_ref(&args[0])?;
+ let value = get_first_array_ref(&args[1])?;
+ make_map_batch_internal(key, value)
+}
+
+fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
+ match columnar_value {
+ ColumnarValue::Scalar(value) => match value {
+ ScalarValue::List(array) => Ok(array.value(0).clone()),
+ ScalarValue::LargeList(array) => Ok(array.value(0).clone()),
+ ScalarValue::FixedSizeList(array) => Ok(array.value(0).clone()),
+ _ => exec_err!("Expected array, got {:?}", value),
+ },
+ ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}",
array),
+ }
+}
+
+fn make_map_batch_internal(keys: ArrayRef, values: ArrayRef) ->
Result<ColumnarValue> {
+ if keys.null_count() > 0 {
+ return exec_err!("map key cannot be null");
+ }
+
+ if keys.len() != values.len() {
+ return exec_err!("map requires key and value lists to have the same
length");
+ }
+
+ let key_field = Arc::new(Field::new("key", keys.data_type().clone(),
false));
+ let value_field = Arc::new(Field::new("value", values.data_type().clone(),
true));
+ let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> =
VecDeque::new();
+ let mut entry_offsets_buffer = VecDeque::new();
+ entry_offsets_buffer.push_back(0);
+
+ entry_struct_buffer.push_back((Arc::clone(&key_field), Arc::clone(&keys)));
+ entry_struct_buffer.push_back((Arc::clone(&value_field),
Arc::clone(&values)));
+ entry_offsets_buffer.push_back(keys.len() as u32);
+
+ let entry_struct: Vec<(Arc<Field>, ArrayRef)> = entry_struct_buffer.into();
+ let entry_struct = StructArray::from(entry_struct);
+
+ let map_data_type = DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ entry_struct.data_type().clone(),
+ false,
+ )),
+ false,
+ );
+
+ let entry_offsets: Vec<u32> = entry_offsets_buffer.into();
+ let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice());
+
+ let map_data = ArrayData::builder(map_data_type)
+ .len(entry_offsets.len() - 1)
+ .add_buffer(entry_offsets_buffer)
+ .add_child_data(entry_struct.to_data())
+ .build()?;
+
+ Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data))))
+}
+
+#[derive(Debug)]
+pub struct MakeMap {
+ signature: Signature,
+}
+
+impl Default for MakeMap {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl MakeMap {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::user_defined(Volatility::Immutable),
+ }
+ }
+}
+
+impl ScalarUDFImpl for MakeMap {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "make_map"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+ if arg_types.is_empty() {
+ return exec_err!(
+ "make_map requires at least one pair of arguments, got 0
instead"
+ );
+ }
+ if arg_types.len() % 2 != 0 {
+ return exec_err!(
+ "make_map requires an even number of arguments, got {}
instead",
+ arg_types.len()
+ );
+ }
+
+ let key_type = &arg_types[0];
+ let mut value_type = &arg_types[1];
+
+ for (i, chunk) in arg_types.chunks_exact(2).enumerate() {
+ if chunk[0].is_null() {
+ return exec_err!("make_map key cannot be null at position {}",
i);
+ }
+ if &chunk[0] != key_type {
+ return exec_err!(
+ "make_map requires all keys to have the same type {}, got
{} instead at position {}",
+ key_type,
+ chunk[0],
+ i
+ );
+ }
+
+ if !chunk[1].is_null() {
+ if value_type.is_null() {
+ value_type = &chunk[1];
+ } else if &chunk[1] != value_type {
+ return exec_err!(
+ "map requires all values to have the same type {}, got
{} instead at position {}",
+ value_type,
+ &chunk[1],
+ i
+ );
+ }
+ }
+ }
+
+ let mut result = Vec::new();
+ for _ in 0..arg_types.len() / 2 {
+ result.push(key_type.clone());
+ result.push(value_type.clone());
+ }
+
+ Ok(result)
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ let key_type = &arg_types[0];
+ let mut value_type = &arg_types[1];
+
+ for chunk in arg_types.chunks_exact(2) {
+ if !chunk[1].is_null() && value_type.is_null() {
+ value_type = &chunk[1];
+ }
+ }
+
+ let mut builder = SchemaBuilder::new();
+ builder.push(Field::new("key", key_type.clone(), false));
+ builder.push(Field::new("value", value_type.clone(), true));
+ let fields = builder.finish().fields;
+ Ok(DataType::Map(
+ Arc::new(Field::new("entries", DataType::Struct(fields), false)),
+ false,
+ ))
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ make_map(args)
+ }
+}
+
+#[derive(Debug)]
+pub struct MapFunc {
+ signature: Signature,
+}
+
+impl Default for MapFunc {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl MapFunc {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::variadic_any(Volatility::Immutable),
+ }
+ }
+}
+
+impl ScalarUDFImpl for MapFunc {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "map"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ if arg_types.len() % 2 != 0 {
+ return exec_err!(
+ "map requires an even number of arguments, got {} instead",
+ arg_types.len()
+ );
+ }
+ let mut builder = SchemaBuilder::new();
+ builder.push(Field::new(
+ "key",
+ get_element_type(&arg_types[0])?.clone(),
+ false,
+ ));
+ builder.push(Field::new(
+ "value",
+ get_element_type(&arg_types[1])?.clone(),
+ true,
+ ));
+ let fields = builder.finish().fields;
+ Ok(DataType::Map(
+ Arc::new(Field::new("entries", DataType::Struct(fields), false)),
+ false,
+ ))
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ make_map_batch(args)
+ }
+}
+
+fn get_element_type(data_type: &DataType) -> Result<&DataType> {
+ match data_type {
+ DataType::List(element) => Ok(element.data_type()),
+ DataType::LargeList(element) => Ok(element.data_type()),
+ DataType::FixedSizeList(element, _) => Ok(element.data_type()),
+ _ => exec_err!(
+ "Expected list, large_list or fixed_size_list, got {:?}",
+ data_type
+ ),
+ }
+}
diff --git a/datafusion/functions/src/core/mod.rs
b/datafusion/functions/src/core/mod.rs
index 062a4a104d..31bce04bee 100644
--- a/datafusion/functions/src/core/mod.rs
+++ b/datafusion/functions/src/core/mod.rs
@@ -25,6 +25,7 @@ pub mod arrowtypeof;
pub mod coalesce;
pub mod expr_ext;
pub mod getfield;
+pub mod map;
pub mod named_struct;
pub mod nullif;
pub mod nvl;
@@ -42,6 +43,8 @@ make_udf_function!(r#struct::StructFunc, STRUCT, r#struct);
make_udf_function!(named_struct::NamedStructFunc, NAMED_STRUCT, named_struct);
make_udf_function!(getfield::GetFieldFunc, GET_FIELD, get_field);
make_udf_function!(coalesce::CoalesceFunc, COALESCE, coalesce);
+make_udf_function!(map::MakeMap, MAKE_MAP, make_map);
+make_udf_function!(map::MapFunc, MAP, map);
pub mod expr_fn {
use datafusion_expr::{Expr, Literal};
@@ -78,6 +81,14 @@ pub mod expr_fn {
coalesce,
"Returns `coalesce(args...)`, which evaluates to the value of the
first expr which is not NULL",
args,
+ ),(
+ make_map,
+ "Returns a map created from the given keys and values pairs. This
function isn't efficient for large maps. Use the `map` function instead.",
+ args,
+ ),(
+ map,
+ "Returns a map created from a key list and a value list",
+ args,
));
#[doc = "Returns the value of the field with the given name from the
struct"]
@@ -96,5 +107,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
named_struct(),
get_field(),
coalesce(),
+ make_map(),
+ map(),
]
}
diff --git a/datafusion/sqllogictest/test_files/map.slt
b/datafusion/sqllogictest/test_files/map.slt
index 417947dc6c..abf5b2ebbf 100644
--- a/datafusion/sqllogictest/test_files/map.slt
+++ b/datafusion/sqllogictest/test_files/map.slt
@@ -100,3 +100,115 @@ physical_plan
statement ok
drop table table_with_map;
+
+query ?
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27,
'PUT', 25, 'DELETE', 24) AS method_count;
+----
+{POST: 41, HEAD: 33, PATCH: 30, OPTION: 29, GET: 27, PUT: 25, DELETE: 24}
+
+query I
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33)['POST'];
+----
+41
+
+query ?
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', null);
+----
+{POST: 41, HEAD: 33, PATCH: }
+
+query ?
+SELECT MAKE_MAP('POST', null, 'HEAD', 33, 'PATCH', null);
+----
+{POST: , HEAD: 33, PATCH: }
+
+query ?
+SELECT MAKE_MAP(1, null, 2, 33, 3, null);
+----
+{1: , 2: 33, 3: }
+
+query ?
+SELECT MAKE_MAP([1,2], ['a', 'b'], [3,4], ['b']);
+----
+{[1, 2]: [a, b], [3, 4]: [b]}
+
+query error
+SELECT MAKE_MAP('POST', 41, 'HEAD', 'ab', 'PATCH', 30);
+
+query error
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, null, 30);
+
+query error
+SELECT MAKE_MAP('POST', 41, 123, 33,'PATCH', 30);
+
+query error
+SELECT MAKE_MAP()
+
+query error
+SELECT MAKE_MAP('POST', 41, 'HEAD');
+
+query ?
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, 30]);
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]);
+----
+{POST: 41, HEAD: 33, PATCH: }
+
+query ?
+SELECT MAP([[1,2], [3,4]], ['a', 'b']);
+----
+{[1, 2]: a, [3, 4]: b}
+
+query error
+SELECT MAP()
+
+query error DataFusion error: Execution error: map requires an even number of
arguments, got 1 instead
+SELECT MAP(['POST', 'HEAD'])
+
+query error DataFusion error: Execution error: Expected list, large_list or
fixed_size_list, got Null
+SELECT MAP(null, [41, 33, 30]);
+
+query error DataFusion error: Execution error: map requires key and value
lists to have the same length
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33]);
+
+query error DataFusion error: Execution error: map key cannot be null
+SELECT MAP(['POST', 'HEAD', null], [41, 33, 30]);
+
+query ?
+SELECT MAP(make_array('POST', 'HEAD', 'PATCH'), make_array(41, 33, 30));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'FixedSizeList(3,
Utf8)'), arrow_cast(make_array(41, 33, 30), 'FixedSizeList(3, Int64)'));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'LargeList(Utf8)'),
arrow_cast(make_array(41, 33, 30), 'LargeList(Int64)'));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+statement ok
+create table t as values
+('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2]),
+('b', 2, 'k3', 30, ['k3'], [3]),
+('d', 4, 'k5', 50, ['k5'], [5]);
+
+query error
+SELECT make_map(column1, column2, column3, column4) FROM t;
+# TODO: support array value
+# ----
+# {a: 1, k1: 10}
+# {b: 2, k3: 30}
+# {d: 4, k5: 50}
+
+query error
+SELECT map(column5, column6) FROM t;
+# TODO: support array value
+# ----
+# {k1:1, k2:2}
+# {k3: 3}
+# {k5: 5}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]