alamb commented on code in PR #11361: URL: https://github.com/apache/datafusion/pull/11361#discussion_r1672721834
########## datafusion/functions/src/core/map.rs: ########## @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, StructArray}; +use arrow::compute::concat; +use arrow::datatypes::{DataType, Field, SchemaBuilder}; +use arrow_buffer::{Buffer, ToByteSlice}; +use datafusion_common::Result; +use datafusion_common::{exec_err, internal_err, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> { + if args.is_empty() { Review Comment: these conditions should have been checked by the planner, so it would probably be ok to panic here or return an internal error. A real error is ok too, but I suspect it would be impossible to actually hit ########## datafusion/functions/src/core/map.rs: ########## @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, StructArray}; +use arrow::compute::concat; +use arrow::datatypes::{DataType, Field, SchemaBuilder}; +use arrow_buffer::{Buffer, ToByteSlice}; +use datafusion_common::Result; +use datafusion_common::{exec_err, internal_err, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> { + if args.is_empty() { + return exec_err!("map requires at least one pair of arguments, got 0 instead"); + } + + if args.len() % 2 != 0 { + return exec_err!( + "map requires an even number of arguments, got {} instead", + args.len() + ); + } + + let mut value_type = args[1].data_type(); + + let (key, value): (Vec<_>, Vec<_>) = args.chunks_exact(2) + .enumerate() + .map(|(i, chunk)| { + if chunk[0].data_type().is_null() { + return exec_err!("map key cannot be null"); + } + if !chunk[1].data_type().is_null() { + if value_type.is_null() { + value_type = chunk[1].data_type(); + } + else if chunk[1].data_type() != value_type { + return exec_err!( + "map requires all values to have the same type {}, got {} instead at position {}", + value_type, + chunk[1].data_type(), + i + ); + } + } + Ok((chunk[0].clone(), chunk[1].clone())) + }) + .collect::<Result<Vec<_>>>()?.into_iter().unzip(); + + let key = ColumnarValue::values_to_arrays(&key)?; + let value = ColumnarValue::values_to_arrays(&value)?; + + let mut keys = Vec::new(); + let mut values = Vec::new(); + + // Since a null scalar value be transformed into [NullArray] by ColumnarValue::values_to_arrays, + // `arrow_select::concat` will panic if we pass a [NullArray] and a non-null array to it. + // So we need to create a [NullArray] with the same data type as the non-null array. + let null_array = new_null_array(&value_type, 1); + for (key, value) in key.iter().zip(value.iter()) { + keys.push(key.as_ref()); + if value.data_type().is_null() { + values.push(null_array.as_ref()); + } else { + values.push(value.as_ref()); + } + } + let key = match concat(&keys) { + Ok(key) => key, + Err(e) => return internal_err!("Error concatenating keys: {}", e), + }; + let value = match concat(&values) { + Ok(value) => value, + Err(e) => return internal_err!("Error concatenating values: {}", e), + }; + make_map_batch_internal(key, value) +} + +fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> { + if args.len() != 2 { + return exec_err!( + "make_map requires exactly 2 arguments, got {} instead", + args.len() + ); + } + let key = get_first_array_ref(&args[0])?; + let value = get_first_array_ref(&args[1])?; + make_map_batch_internal(key, value) +} + +fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> { + match columnar_value { + ColumnarValue::Scalar(value) => match value { + ScalarValue::List(array) => Ok(array.value(0).clone()), + ScalarValue::LargeList(array) => Ok(array.value(0).clone()), + ScalarValue::FixedSizeList(array) => Ok(array.value(0).clone()), + _ => exec_err!("Expected array, got {:?}", value), + }, + ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}", array), + } +} + +fn make_map_batch_internal(keys: ArrayRef, values: ArrayRef) -> Result<ColumnarValue> { + if keys.null_count() > 0 { + return exec_err!("map key cannot be null"); + } + + if keys.len() != values.len() { + return exec_err!("map requires key and value lists to have the same length"); + } + + let key_field = Arc::new(Field::new("key", keys.data_type().clone(), false)); + let value_field = Arc::new(Field::new("value", values.data_type().clone(), true)); + let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> = VecDeque::new(); + let mut entry_offsets_buffer = VecDeque::new(); + entry_offsets_buffer.push_back(0); + + entry_struct_buffer.push_back((Arc::clone(&key_field), Arc::clone(&keys))); + entry_struct_buffer.push_back((Arc::clone(&value_field), Arc::clone(&values))); + entry_offsets_buffer.push_back(keys.len() as u32); + + let entry_struct: Vec<(Arc<Field>, ArrayRef)> = entry_struct_buffer.into(); + let entry_struct = StructArray::from(entry_struct); + + let map_data_type = DataType::Map( + Arc::new(Field::new( + "entries", + entry_struct.data_type().clone(), + false, + )), + false, + ); + + let entry_offsets: Vec<u32> = entry_offsets_buffer.into(); + let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice()); + + let map_data = ArrayData::builder(map_data_type) + .len(entry_offsets.len() - 1) + .add_buffer(entry_offsets_buffer) + .add_child_data(entry_struct.to_data()) + .build()?; + + Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data)))) +} + +#[derive(Debug)] +pub struct MakeMap { + signature: Signature, +} + +impl Default for MakeMap { + fn default() -> Self { + Self::new() + } +} + +impl MakeMap { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for MakeMap { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "make_map" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { + if arg_types.is_empty() { + return exec_err!( + "make_map requires at least one pair of arguments, got 0 instead" + ); + } + if arg_types.len() % 2 != 0 { + return exec_err!( + "make_map requires an even number of arguments, got {} instead", + arg_types.len() + ); + } + + let key_type = &arg_types[0]; + let mut value_type = &arg_types[1]; + + for (i, chunk) in arg_types.chunks_exact(2).enumerate() { Review Comment: Same comment here about type coercion ########## datafusion/functions/src/core/map.rs: ########## @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, StructArray}; +use arrow::compute::concat; +use arrow::datatypes::{DataType, Field, SchemaBuilder}; +use arrow_buffer::{Buffer, ToByteSlice}; +use datafusion_common::Result; +use datafusion_common::{exec_err, internal_err, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> { + if args.is_empty() { + return exec_err!("map requires at least one pair of arguments, got 0 instead"); + } + + if args.len() % 2 != 0 { + return exec_err!( + "map requires an even number of arguments, got {} instead", + args.len() + ); + } + + let mut value_type = args[1].data_type(); + + let (key, value): (Vec<_>, Vec<_>) = args.chunks_exact(2) + .enumerate() + .map(|(i, chunk)| { + if chunk[0].data_type().is_null() { + return exec_err!("map key cannot be null"); + } + if !chunk[1].data_type().is_null() { + if value_type.is_null() { + value_type = chunk[1].data_type(); + } + else if chunk[1].data_type() != value_type { + return exec_err!( + "map requires all values to have the same type {}, got {} instead at position {}", Review Comment: ```suggestion "make_map requires all values to have the same type {}, got {} instead at position {}", ``` ########## datafusion/functions/src/core/map.rs: ########## @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, StructArray}; +use arrow::compute::concat; +use arrow::datatypes::{DataType, Field, SchemaBuilder}; +use arrow_buffer::{Buffer, ToByteSlice}; +use datafusion_common::Result; +use datafusion_common::{exec_err, internal_err, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> { + if args.is_empty() { + return exec_err!("map requires at least one pair of arguments, got 0 instead"); + } + + if args.len() % 2 != 0 { + return exec_err!( + "map requires an even number of arguments, got {} instead", + args.len() + ); + } + + let mut value_type = args[1].data_type(); + + let (key, value): (Vec<_>, Vec<_>) = args.chunks_exact(2) + .enumerate() + .map(|(i, chunk)| { + if chunk[0].data_type().is_null() { + return exec_err!("map key cannot be null"); + } + if !chunk[1].data_type().is_null() { Review Comment: I found this code to do null checking / coercion somewhat confusing I would have expected that the planner had done the cocersion once at plan time rather than doing it on all inputs Perhaps you could implement `coerce_types` for the `map` function once https://github.com/apache/datafusion/blob/585504a31fd7d9a44c97f3f19af42bace08b8cc3/datafusion/expr/src/udf.rs#L540-L542 ########## datafusion/sqllogictest/test_files/map.slt: ########## @@ -100,3 +100,93 @@ physical_plan statement ok drop table table_with_map; + +query ? +SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 'PUT', 25, 'DELETE', 24) AS method_count; +---- +{POST: 41, HEAD: 33, PATCH: 30, OPTION: 29, GET: 27, PUT: 25, DELETE: 24} + +query I +SELECT MAKE_MAP('POST', 41, 'HEAD', 33)['POST']; +---- +41 + +query ? +SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', null); +---- +{POST: 41, HEAD: 33, PATCH: } + +query ? +SELECT MAKE_MAP('POST', null, 'HEAD', 33, 'PATCH', null); +---- +{POST: , HEAD: 33, PATCH: } + +query ? +SELECT MAKE_MAP(1, null, 2, 33, 3, null); +---- +{1: , 2: 33, 3: } + +query ? +SELECT MAKE_MAP([1,2], ['a', 'b'], [3,4], ['b']); +---- +{[1, 2]: [a, b], [3, 4]: [b]} + +query error DataFusion error: Execution error: map requires all values to have the same type Int64, got Utf8 instead at position 1 +SELECT MAKE_MAP('POST', 41, 'HEAD', 'ab', 'PATCH', 30); + +query error DataFusion error: Execution error: make_map key cannot be null at position 2 +SELECT MAKE_MAP('POST', 41, 'HEAD', 33, null, 30); + +query error DataFusion error: Execution error: make_map requires all keys to have the same type Utf8, got Int64 instead at position 1 +SELECT MAKE_MAP('POST', 41, 123, 33,'PATCH', 30); + +query error +SELECT MAKE_MAP() + +query error DataFusion error: Execution error: make_map requires an even number of arguments, got 3 instead +SELECT MAKE_MAP('POST', 41, 'HEAD'); + +query ? +SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, 30]); +---- +{POST: 41, HEAD: 33, PATCH: 30} + +query ? +SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]); +---- +{POST: 41, HEAD: 33, PATCH: } + +query ? +SELECT MAP([[1,2], [3,4]], ['a', 'b']); +---- +{[1, 2]: a, [3, 4]: b} + +query error +SELECT MAP() + +query error DataFusion error: Execution error: map requires an even number of arguments, got 1 instead +SELECT MAP(['POST', 'HEAD']) + +query error DataFusion error: Execution error: Expected list, large_list or fixed_size_list, got Null +SELECT MAP(null, [41, 33, 30]); + +query error DataFusion error: Execution error: map requires key and value lists to have the same length +SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33]); + +query error DataFusion error: Execution error: map key cannot be null +SELECT MAP(['POST', 'HEAD', null], [41, 33, 30]); + +query ? +SELECT MAP(make_array('POST', 'HEAD', 'PATCH'), make_array(41, 33, 30)); +---- +{POST: 41, HEAD: 33, PATCH: 30} + +query ? +SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'FixedSizeList(3, Utf8)'), arrow_cast(make_array(41, 33, 30), 'FixedSizeList(3, Int64)')); +---- +{POST: 41, HEAD: 33, PATCH: 30} + +query ? +SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'LargeList(Utf8)'), arrow_cast(make_array(41, 33, 30), 'LargeList(Int64)')); +---- +{POST: 41, HEAD: 33, PATCH: 30} Review Comment: I think if possible we should also add tests for array values (not just scalars) I wrote up some examples, like this: ``` # test that maps can be created from arrays statement ok create table t as values ('a', 1, ['k1', 'k2'], [10.0, 20.0]), ('b', 2, ['k3'], [30.0]), ('d', 4, ['k5', 'k6'], [50.0, 60.0]); query error select make_map(column1, column2) from t; ---- DataFusion error: Internal error: UDF returned a different number of rows than expected. Expected: 3, Got: 1. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker query error select map(column3, column4) from t; ---- DataFusion error: Execution error: Expected scalar, got ListArray [ StringArray [ "k1", "k2", ], StringArray [ "k3", ], StringArray [ "k5", "k6", ], ] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org