This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ba634aa4f Implement ScalarFunction `MAKE_MAP` and `MAP` (#11361)
5ba634aa4f is described below

commit 5ba634aa4f6d3d4ed5eefbc15dba5448f4f30923
Author: Jax Liu <[email protected]>
AuthorDate: Fri Jul 12 14:43:49 2024 +0800

    Implement ScalarFunction `MAKE_MAP` and `MAP` (#11361)
    
    * tmp
    
    * opt
    
    * modify test
    
    * add another version
    
    * implement make_map function
    
    * implement make_map function
    
    * implement map function
    
    * format and modify the doc
    
    * add benchmark for map function
    
    * add empty end-line
    
    * fix cargo check
    
    * update lock
    
    * upate lock
    
    * fix clippy
    
    * fmt and clippy
    
    * support FixedSizeList and LargeList
    
    * check type and handle null array in coerce_types
    
    * make array value throw todo error
    
    * fix clippy
    
    * simpify the error tests
---
 datafusion-cli/Cargo.lock                  |   1 +
 datafusion/functions/Cargo.toml            |   7 +-
 datafusion/functions/benches/map.rs        | 101 ++++++++++
 datafusion/functions/src/core/map.rs       | 312 +++++++++++++++++++++++++++++
 datafusion/functions/src/core/mod.rs       |  13 ++
 datafusion/sqllogictest/test_files/map.slt | 112 +++++++++++
 6 files changed, 545 insertions(+), 1 deletion(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 8af42cb439..7da9cc427c 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1278,6 +1278,7 @@ name = "datafusion-functions"
 version = "40.0.0"
 dependencies = [
  "arrow",
+ "arrow-buffer",
  "base64 0.22.1",
  "blake2",
  "blake3",
diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index 884a66724c..b143080b19 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -66,6 +66,7 @@ path = "src/lib.rs"
 
 [dependencies]
 arrow = { workspace = true }
+arrow-buffer = { workspace = true }
 base64 = { version = "0.22", optional = true }
 blake2 = { version = "^0.10.2", optional = true }
 blake3 = { version = "1.0", optional = true }
@@ -86,7 +87,6 @@ uuid = { version = "1.7", features = ["v4"], optional = true }
 
 [dev-dependencies]
 arrow = { workspace = true, features = ["test_utils"] }
-arrow-buffer = { workspace = true }
 criterion = "0.5"
 rand = { workspace = true }
 rstest = { workspace = true }
@@ -141,3 +141,8 @@ required-features = ["string_expressions"]
 harness = false
 name = "upper"
 required-features = ["string_expressions"]
+
+[[bench]]
+harness = false
+name = "map"
+required-features = ["core_expressions"]
diff --git a/datafusion/functions/benches/map.rs 
b/datafusion/functions/benches/map.rs
new file mode 100644
index 0000000000..cd863d0e33
--- /dev/null
+++ b/datafusion/functions/benches/map.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use arrow::array::{Int32Array, ListArray, StringArray};
+use arrow::datatypes::{DataType, Field};
+use arrow_buffer::{OffsetBuffer, ScalarBuffer};
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_common::ScalarValue;
+use datafusion_expr::ColumnarValue;
+use datafusion_functions::core::{make_map, map};
+use rand::prelude::ThreadRng;
+use rand::Rng;
+use std::sync::Arc;
+
+fn keys(rng: &mut ThreadRng) -> Vec<String> {
+    let mut keys = vec![];
+    for _ in 0..1000 {
+        keys.push(rng.gen_range(0..9999).to_string());
+    }
+    keys
+}
+
+fn values(rng: &mut ThreadRng) -> Vec<i32> {
+    let mut values = vec![];
+    for _ in 0..1000 {
+        values.push(rng.gen_range(0..9999));
+    }
+    values
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    c.bench_function("make_map_1000", |b| {
+        let mut rng = rand::thread_rng();
+        let keys = keys(&mut rng);
+        let values = values(&mut rng);
+        let mut buffer = Vec::new();
+        for i in 0..1000 {
+            buffer.push(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
+                keys[i].clone(),
+            ))));
+            
buffer.push(ColumnarValue::Scalar(ScalarValue::Int32(Some(values[i]))));
+        }
+
+        b.iter(|| {
+            black_box(
+                make_map()
+                    .invoke(&buffer)
+                    .expect("map should work on valid values"),
+            );
+        });
+    });
+
+    c.bench_function("map_1000", |b| {
+        let mut rng = rand::thread_rng();
+        let field = Arc::new(Field::new("item", DataType::Utf8, true));
+        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+        let key_list = ListArray::new(
+            field,
+            offsets,
+            Arc::new(StringArray::from(keys(&mut rng))),
+            None,
+        );
+        let field = Arc::new(Field::new("item", DataType::Int32, true));
+        let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
+        let value_list = ListArray::new(
+            field,
+            offsets,
+            Arc::new(Int32Array::from(values(&mut rng))),
+            None,
+        );
+        let keys = 
ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
+        let values = 
ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));
+
+        b.iter(|| {
+            black_box(
+                map()
+                    .invoke(&[keys.clone(), values.clone()])
+                    .expect("map should work on valid values"),
+            );
+        });
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions/src/core/map.rs 
b/datafusion/functions/src/core/map.rs
new file mode 100644
index 0000000000..8a8a19d7af
--- /dev/null
+++ b/datafusion/functions/src/core/map.rs
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray};
+use arrow::compute::concat;
+use arrow::datatypes::{DataType, Field, SchemaBuilder};
+use arrow_buffer::{Buffer, ToByteSlice};
+
+use datafusion_common::{exec_err, internal_err, ScalarValue};
+use datafusion_common::{not_impl_err, Result};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
+fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let (key, value): (Vec<_>, Vec<_>) = args
+        .chunks_exact(2)
+        .map(|chunk| {
+            if let ColumnarValue::Array(_) = chunk[0] {
+                return not_impl_err!("make_map does not support array keys");
+            }
+            if let ColumnarValue::Array(_) = chunk[1] {
+                return not_impl_err!("make_map does not support array values");
+            }
+            Ok((chunk[0].clone(), chunk[1].clone()))
+        })
+        .collect::<Result<Vec<_>>>()?
+        .into_iter()
+        .unzip();
+
+    let keys = ColumnarValue::values_to_arrays(&key)?;
+    let values = ColumnarValue::values_to_arrays(&value)?;
+
+    let keys: Vec<_> = keys.iter().map(|k| k.as_ref()).collect();
+    let values: Vec<_> = values.iter().map(|v| v.as_ref()).collect();
+
+    let key = match concat(&keys) {
+        Ok(key) => key,
+        Err(e) => return internal_err!("Error concatenating keys: {}", e),
+    };
+    let value = match concat(&values) {
+        Ok(value) => value,
+        Err(e) => return internal_err!("Error concatenating values: {}", e),
+    };
+    make_map_batch_internal(key, value)
+}
+
+fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    if args.len() != 2 {
+        return exec_err!(
+            "make_map requires exactly 2 arguments, got {} instead",
+            args.len()
+        );
+    }
+    let key = get_first_array_ref(&args[0])?;
+    let value = get_first_array_ref(&args[1])?;
+    make_map_batch_internal(key, value)
+}
+
+fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
+    match columnar_value {
+        ColumnarValue::Scalar(value) => match value {
+            ScalarValue::List(array) => Ok(array.value(0).clone()),
+            ScalarValue::LargeList(array) => Ok(array.value(0).clone()),
+            ScalarValue::FixedSizeList(array) => Ok(array.value(0).clone()),
+            _ => exec_err!("Expected array, got {:?}", value),
+        },
+        ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}", 
array),
+    }
+}
+
+fn make_map_batch_internal(keys: ArrayRef, values: ArrayRef) -> 
Result<ColumnarValue> {
+    if keys.null_count() > 0 {
+        return exec_err!("map key cannot be null");
+    }
+
+    if keys.len() != values.len() {
+        return exec_err!("map requires key and value lists to have the same 
length");
+    }
+
+    let key_field = Arc::new(Field::new("key", keys.data_type().clone(), 
false));
+    let value_field = Arc::new(Field::new("value", values.data_type().clone(), 
true));
+    let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> = 
VecDeque::new();
+    let mut entry_offsets_buffer = VecDeque::new();
+    entry_offsets_buffer.push_back(0);
+
+    entry_struct_buffer.push_back((Arc::clone(&key_field), Arc::clone(&keys)));
+    entry_struct_buffer.push_back((Arc::clone(&value_field), 
Arc::clone(&values)));
+    entry_offsets_buffer.push_back(keys.len() as u32);
+
+    let entry_struct: Vec<(Arc<Field>, ArrayRef)> = entry_struct_buffer.into();
+    let entry_struct = StructArray::from(entry_struct);
+
+    let map_data_type = DataType::Map(
+        Arc::new(Field::new(
+            "entries",
+            entry_struct.data_type().clone(),
+            false,
+        )),
+        false,
+    );
+
+    let entry_offsets: Vec<u32> = entry_offsets_buffer.into();
+    let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice());
+
+    let map_data = ArrayData::builder(map_data_type)
+        .len(entry_offsets.len() - 1)
+        .add_buffer(entry_offsets_buffer)
+        .add_child_data(entry_struct.to_data())
+        .build()?;
+
+    Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data))))
+}
+
+#[derive(Debug)]
+pub struct MakeMap {
+    signature: Signature,
+}
+
+impl Default for MakeMap {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MakeMap {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::user_defined(Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for MakeMap {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "make_map"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+        if arg_types.is_empty() {
+            return exec_err!(
+                "make_map requires at least one pair of arguments, got 0 
instead"
+            );
+        }
+        if arg_types.len() % 2 != 0 {
+            return exec_err!(
+                "make_map requires an even number of arguments, got {} 
instead",
+                arg_types.len()
+            );
+        }
+
+        let key_type = &arg_types[0];
+        let mut value_type = &arg_types[1];
+
+        for (i, chunk) in arg_types.chunks_exact(2).enumerate() {
+            if chunk[0].is_null() {
+                return exec_err!("make_map key cannot be null at position {}", 
i);
+            }
+            if &chunk[0] != key_type {
+                return exec_err!(
+                    "make_map requires all keys to have the same type {}, got 
{} instead at position {}",
+                    key_type,
+                    chunk[0],
+                    i
+                );
+            }
+
+            if !chunk[1].is_null() {
+                if value_type.is_null() {
+                    value_type = &chunk[1];
+                } else if &chunk[1] != value_type {
+                    return exec_err!(
+                        "map requires all values to have the same type {}, got 
{} instead at position {}",
+                        value_type,
+                        &chunk[1],
+                        i
+                    );
+                }
+            }
+        }
+
+        let mut result = Vec::new();
+        for _ in 0..arg_types.len() / 2 {
+            result.push(key_type.clone());
+            result.push(value_type.clone());
+        }
+
+        Ok(result)
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        let key_type = &arg_types[0];
+        let mut value_type = &arg_types[1];
+
+        for chunk in arg_types.chunks_exact(2) {
+            if !chunk[1].is_null() && value_type.is_null() {
+                value_type = &chunk[1];
+            }
+        }
+
+        let mut builder = SchemaBuilder::new();
+        builder.push(Field::new("key", key_type.clone(), false));
+        builder.push(Field::new("value", value_type.clone(), true));
+        let fields = builder.finish().fields;
+        Ok(DataType::Map(
+            Arc::new(Field::new("entries", DataType::Struct(fields), false)),
+            false,
+        ))
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_map(args)
+    }
+}
+
+#[derive(Debug)]
+pub struct MapFunc {
+    signature: Signature,
+}
+
+impl Default for MapFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MapFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::variadic_any(Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for MapFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "map"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.len() % 2 != 0 {
+            return exec_err!(
+                "map requires an even number of arguments, got {} instead",
+                arg_types.len()
+            );
+        }
+        let mut builder = SchemaBuilder::new();
+        builder.push(Field::new(
+            "key",
+            get_element_type(&arg_types[0])?.clone(),
+            false,
+        ));
+        builder.push(Field::new(
+            "value",
+            get_element_type(&arg_types[1])?.clone(),
+            true,
+        ));
+        let fields = builder.finish().fields;
+        Ok(DataType::Map(
+            Arc::new(Field::new("entries", DataType::Struct(fields), false)),
+            false,
+        ))
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_map_batch(args)
+    }
+}
+
+fn get_element_type(data_type: &DataType) -> Result<&DataType> {
+    match data_type {
+        DataType::List(element) => Ok(element.data_type()),
+        DataType::LargeList(element) => Ok(element.data_type()),
+        DataType::FixedSizeList(element, _) => Ok(element.data_type()),
+        _ => exec_err!(
+            "Expected list, large_list or fixed_size_list, got {:?}",
+            data_type
+        ),
+    }
+}
diff --git a/datafusion/functions/src/core/mod.rs 
b/datafusion/functions/src/core/mod.rs
index 062a4a104d..31bce04bee 100644
--- a/datafusion/functions/src/core/mod.rs
+++ b/datafusion/functions/src/core/mod.rs
@@ -25,6 +25,7 @@ pub mod arrowtypeof;
 pub mod coalesce;
 pub mod expr_ext;
 pub mod getfield;
+pub mod map;
 pub mod named_struct;
 pub mod nullif;
 pub mod nvl;
@@ -42,6 +43,8 @@ make_udf_function!(r#struct::StructFunc, STRUCT, r#struct);
 make_udf_function!(named_struct::NamedStructFunc, NAMED_STRUCT, named_struct);
 make_udf_function!(getfield::GetFieldFunc, GET_FIELD, get_field);
 make_udf_function!(coalesce::CoalesceFunc, COALESCE, coalesce);
+make_udf_function!(map::MakeMap, MAKE_MAP, make_map);
+make_udf_function!(map::MapFunc, MAP, map);
 
 pub mod expr_fn {
     use datafusion_expr::{Expr, Literal};
@@ -78,6 +81,14 @@ pub mod expr_fn {
         coalesce,
         "Returns `coalesce(args...)`, which evaluates to the value of the 
first expr which is not NULL",
         args,
+    ),(
+        make_map,
+        "Returns a map created from the given keys and values pairs. This 
function isn't efficient for large maps. Use the `map` function instead.",
+        args,
+    ),(
+        map,
+        "Returns a map created from a key list and a value list",
+        args,
     ));
 
     #[doc = "Returns the value of the field with the given name from the 
struct"]
@@ -96,5 +107,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
         named_struct(),
         get_field(),
         coalesce(),
+        make_map(),
+        map(),
     ]
 }
diff --git a/datafusion/sqllogictest/test_files/map.slt 
b/datafusion/sqllogictest/test_files/map.slt
index 417947dc6c..abf5b2ebbf 100644
--- a/datafusion/sqllogictest/test_files/map.slt
+++ b/datafusion/sqllogictest/test_files/map.slt
@@ -100,3 +100,115 @@ physical_plan
 
 statement ok
 drop table table_with_map;
+
+query ?
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 
'PUT', 25, 'DELETE', 24) AS method_count;
+----
+{POST: 41, HEAD: 33, PATCH: 30, OPTION: 29, GET: 27, PUT: 25, DELETE: 24}
+
+query I
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33)['POST'];
+----
+41
+
+query ?
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', null);
+----
+{POST: 41, HEAD: 33, PATCH: }
+
+query ?
+SELECT MAKE_MAP('POST', null, 'HEAD', 33, 'PATCH', null);
+----
+{POST: , HEAD: 33, PATCH: }
+
+query ?
+SELECT MAKE_MAP(1, null, 2, 33, 3, null);
+----
+{1: , 2: 33, 3: }
+
+query ?
+SELECT MAKE_MAP([1,2], ['a', 'b'], [3,4], ['b']);
+----
+{[1, 2]: [a, b], [3, 4]: [b]}
+
+query error
+SELECT MAKE_MAP('POST', 41, 'HEAD', 'ab', 'PATCH', 30);
+
+query error
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, null, 30);
+
+query error
+SELECT MAKE_MAP('POST', 41, 123, 33,'PATCH', 30);
+
+query error
+SELECT MAKE_MAP()
+
+query error
+SELECT MAKE_MAP('POST', 41, 'HEAD');
+
+query ?
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, 30]);
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]);
+----
+{POST: 41, HEAD: 33, PATCH: }
+
+query ?
+SELECT MAP([[1,2], [3,4]], ['a', 'b']);
+----
+{[1, 2]: a, [3, 4]: b}
+
+query error
+SELECT MAP()
+
+query error DataFusion error: Execution error: map requires an even number of 
arguments, got 1 instead
+SELECT MAP(['POST', 'HEAD'])
+
+query error DataFusion error: Execution error: Expected list, large_list or 
fixed_size_list, got Null
+SELECT MAP(null, [41, 33, 30]);
+
+query error DataFusion error: Execution error: map requires key and value 
lists to have the same length
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33]);
+
+query error DataFusion error: Execution error: map key cannot be null
+SELECT MAP(['POST', 'HEAD', null], [41, 33, 30]);
+
+query ?
+SELECT MAP(make_array('POST', 'HEAD', 'PATCH'), make_array(41, 33, 30));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'FixedSizeList(3, 
Utf8)'), arrow_cast(make_array(41, 33, 30), 'FixedSizeList(3, Int64)'));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'LargeList(Utf8)'), 
arrow_cast(make_array(41, 33, 30), 'LargeList(Int64)'));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+statement ok
+create table t as values
+('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2]),
+('b', 2, 'k3', 30, ['k3'], [3]),
+('d', 4, 'k5', 50, ['k5'], [5]);
+
+query error
+SELECT make_map(column1, column2, column3, column4) FROM t;
+# TODO: support array value
+# ----
+# {a: 1, k1: 10}
+# {b: 2, k3: 30}
+# {d: 4, k5: 50}
+
+query error
+SELECT map(column5, column6) FROM t;
+# TODO: support array value
+# ----
+# {k1:1, k2:2}
+# {k3: 3}
+# {k5: 5}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to