alamb commented on code in PR #11361:
URL: https://github.com/apache/datafusion/pull/11361#discussion_r1672721834


##########
datafusion/functions/src/core/map.rs:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, 
StructArray};
+use arrow::compute::concat;
+use arrow::datatypes::{DataType, Field, SchemaBuilder};
+use arrow_buffer::{Buffer, ToByteSlice};
+use datafusion_common::Result;
+use datafusion_common::{exec_err, internal_err, ScalarValue};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
+fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    if args.is_empty() {

Review Comment:
   these conditions should have been checked by the planner, so it would 
probably be ok to panic here or return an internal error. A real error is ok 
too, but I suspect it would be impossible to actually hit



##########
datafusion/functions/src/core/map.rs:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, 
StructArray};
+use arrow::compute::concat;
+use arrow::datatypes::{DataType, Field, SchemaBuilder};
+use arrow_buffer::{Buffer, ToByteSlice};
+use datafusion_common::Result;
+use datafusion_common::{exec_err, internal_err, ScalarValue};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
+fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    if args.is_empty() {
+        return exec_err!("map requires at least one pair of arguments, got 0 
instead");
+    }
+
+    if args.len() % 2 != 0 {
+        return exec_err!(
+            "map requires an even number of arguments, got {} instead",
+            args.len()
+        );
+    }
+
+    let mut value_type = args[1].data_type();
+
+    let (key, value): (Vec<_>, Vec<_>) = args.chunks_exact(2)
+        .enumerate()
+        .map(|(i, chunk)| {
+            if chunk[0].data_type().is_null() {
+                return exec_err!("map key cannot be null");
+            }
+            if !chunk[1].data_type().is_null() {
+                if value_type.is_null() {
+                    value_type = chunk[1].data_type();
+                }
+                else if chunk[1].data_type() != value_type {
+                    return exec_err!(
+                        "map requires all values to have the same type {}, got 
{} instead at position {}",
+                        value_type,
+                        chunk[1].data_type(),
+                        i
+                    );
+                }
+            }
+            Ok((chunk[0].clone(), chunk[1].clone()))
+        })
+        .collect::<Result<Vec<_>>>()?.into_iter().unzip();
+
+    let key = ColumnarValue::values_to_arrays(&key)?;
+    let value = ColumnarValue::values_to_arrays(&value)?;
+
+    let mut keys = Vec::new();
+    let mut values = Vec::new();
+
+    // Since a null scalar value be transformed into [NullArray] by 
ColumnarValue::values_to_arrays,
+    // `arrow_select::concat` will panic if we pass a [NullArray] and a 
non-null array to it.
+    // So we need to create a [NullArray] with the same data type as the 
non-null array.
+    let null_array = new_null_array(&value_type, 1);
+    for (key, value) in key.iter().zip(value.iter()) {
+        keys.push(key.as_ref());
+        if value.data_type().is_null() {
+            values.push(null_array.as_ref());
+        } else {
+            values.push(value.as_ref());
+        }
+    }
+    let key = match concat(&keys) {
+        Ok(key) => key,
+        Err(e) => return internal_err!("Error concatenating keys: {}", e),
+    };
+    let value = match concat(&values) {
+        Ok(value) => value,
+        Err(e) => return internal_err!("Error concatenating values: {}", e),
+    };
+    make_map_batch_internal(key, value)
+}
+
+fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    if args.len() != 2 {
+        return exec_err!(
+            "make_map requires exactly 2 arguments, got {} instead",
+            args.len()
+        );
+    }
+    let key = get_first_array_ref(&args[0])?;
+    let value = get_first_array_ref(&args[1])?;
+    make_map_batch_internal(key, value)
+}
+
+fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
+    match columnar_value {
+        ColumnarValue::Scalar(value) => match value {
+            ScalarValue::List(array) => Ok(array.value(0).clone()),
+            ScalarValue::LargeList(array) => Ok(array.value(0).clone()),
+            ScalarValue::FixedSizeList(array) => Ok(array.value(0).clone()),
+            _ => exec_err!("Expected array, got {:?}", value),
+        },
+        ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}", 
array),
+    }
+}
+
+fn make_map_batch_internal(keys: ArrayRef, values: ArrayRef) -> 
Result<ColumnarValue> {
+    if keys.null_count() > 0 {
+        return exec_err!("map key cannot be null");
+    }
+
+    if keys.len() != values.len() {
+        return exec_err!("map requires key and value lists to have the same 
length");
+    }
+
+    let key_field = Arc::new(Field::new("key", keys.data_type().clone(), 
false));
+    let value_field = Arc::new(Field::new("value", values.data_type().clone(), 
true));
+    let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> = 
VecDeque::new();
+    let mut entry_offsets_buffer = VecDeque::new();
+    entry_offsets_buffer.push_back(0);
+
+    entry_struct_buffer.push_back((Arc::clone(&key_field), Arc::clone(&keys)));
+    entry_struct_buffer.push_back((Arc::clone(&value_field), 
Arc::clone(&values)));
+    entry_offsets_buffer.push_back(keys.len() as u32);
+
+    let entry_struct: Vec<(Arc<Field>, ArrayRef)> = entry_struct_buffer.into();
+    let entry_struct = StructArray::from(entry_struct);
+
+    let map_data_type = DataType::Map(
+        Arc::new(Field::new(
+            "entries",
+            entry_struct.data_type().clone(),
+            false,
+        )),
+        false,
+    );
+
+    let entry_offsets: Vec<u32> = entry_offsets_buffer.into();
+    let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice());
+
+    let map_data = ArrayData::builder(map_data_type)
+        .len(entry_offsets.len() - 1)
+        .add_buffer(entry_offsets_buffer)
+        .add_child_data(entry_struct.to_data())
+        .build()?;
+
+    Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data))))
+}
+
+#[derive(Debug)]
+pub struct MakeMap {
+    signature: Signature,
+}
+
+impl Default for MakeMap {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MakeMap {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::variadic_any(Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for MakeMap {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "make_map"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return exec_err!(
+                "make_map requires at least one pair of arguments, got 0 
instead"
+            );
+        }
+        if arg_types.len() % 2 != 0 {
+            return exec_err!(
+                "make_map requires an even number of arguments, got {} 
instead",
+                arg_types.len()
+            );
+        }
+
+        let key_type = &arg_types[0];
+        let mut value_type = &arg_types[1];
+
+        for (i, chunk) in arg_types.chunks_exact(2).enumerate() {

Review Comment:
   Same comment here about type coercion



##########
datafusion/functions/src/core/map.rs:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, 
StructArray};
+use arrow::compute::concat;
+use arrow::datatypes::{DataType, Field, SchemaBuilder};
+use arrow_buffer::{Buffer, ToByteSlice};
+use datafusion_common::Result;
+use datafusion_common::{exec_err, internal_err, ScalarValue};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
+fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    if args.is_empty() {
+        return exec_err!("map requires at least one pair of arguments, got 0 
instead");
+    }
+
+    if args.len() % 2 != 0 {
+        return exec_err!(
+            "map requires an even number of arguments, got {} instead",
+            args.len()
+        );
+    }
+
+    let mut value_type = args[1].data_type();
+
+    let (key, value): (Vec<_>, Vec<_>) = args.chunks_exact(2)
+        .enumerate()
+        .map(|(i, chunk)| {
+            if chunk[0].data_type().is_null() {
+                return exec_err!("map key cannot be null");
+            }
+            if !chunk[1].data_type().is_null() {
+                if value_type.is_null() {
+                    value_type = chunk[1].data_type();
+                }
+                else if chunk[1].data_type() != value_type {
+                    return exec_err!(
+                        "map requires all values to have the same type {}, got 
{} instead at position {}",

Review Comment:
   ```suggestion
                           "make_map requires all values to have the same type 
{}, got {} instead at position {}",
   ```



##########
datafusion/functions/src/core/map.rs:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+use arrow::array::{new_null_array, Array, ArrayData, ArrayRef, MapArray, 
StructArray};
+use arrow::compute::concat;
+use arrow::datatypes::{DataType, Field, SchemaBuilder};
+use arrow_buffer::{Buffer, ToByteSlice};
+use datafusion_common::Result;
+use datafusion_common::{exec_err, internal_err, ScalarValue};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+
+fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    if args.is_empty() {
+        return exec_err!("map requires at least one pair of arguments, got 0 
instead");
+    }
+
+    if args.len() % 2 != 0 {
+        return exec_err!(
+            "map requires an even number of arguments, got {} instead",
+            args.len()
+        );
+    }
+
+    let mut value_type = args[1].data_type();
+
+    let (key, value): (Vec<_>, Vec<_>) = args.chunks_exact(2)
+        .enumerate()
+        .map(|(i, chunk)| {
+            if chunk[0].data_type().is_null() {
+                return exec_err!("map key cannot be null");
+            }
+            if !chunk[1].data_type().is_null() {

Review Comment:
   I found this code to do null checking / coercion somewhat confusing
   
   I would have expected that the planner had done the cocersion once at plan 
time rather than doing it on all inputs
   
   Perhaps you could implement `coerce_types` for the `map` function once 
   
   
https://github.com/apache/datafusion/blob/585504a31fd7d9a44c97f3f19af42bace08b8cc3/datafusion/expr/src/udf.rs#L540-L542



##########
datafusion/sqllogictest/test_files/map.slt:
##########
@@ -100,3 +100,93 @@ physical_plan
 
 statement ok
 drop table table_with_map;
+
+query ?
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27, 
'PUT', 25, 'DELETE', 24) AS method_count;
+----
+{POST: 41, HEAD: 33, PATCH: 30, OPTION: 29, GET: 27, PUT: 25, DELETE: 24}
+
+query I
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33)['POST'];
+----
+41
+
+query ?
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', null);
+----
+{POST: 41, HEAD: 33, PATCH: }
+
+query ?
+SELECT MAKE_MAP('POST', null, 'HEAD', 33, 'PATCH', null);
+----
+{POST: , HEAD: 33, PATCH: }
+
+query ?
+SELECT MAKE_MAP(1, null, 2, 33, 3, null);
+----
+{1: , 2: 33, 3: }
+
+query ?
+SELECT MAKE_MAP([1,2], ['a', 'b'], [3,4], ['b']);
+----
+{[1, 2]: [a, b], [3, 4]: [b]}
+
+query error DataFusion error: Execution error: map requires all values to have 
the same type Int64, got Utf8 instead at position 1
+SELECT MAKE_MAP('POST', 41, 'HEAD', 'ab', 'PATCH', 30);
+
+query error DataFusion error: Execution error: make_map key cannot be null at 
position 2
+SELECT MAKE_MAP('POST', 41, 'HEAD', 33, null, 30);
+
+query error DataFusion error: Execution error: make_map requires all keys to 
have the same type Utf8, got Int64 instead at position 1
+SELECT MAKE_MAP('POST', 41, 123, 33,'PATCH', 30);
+
+query error
+SELECT MAKE_MAP()
+
+query error DataFusion error: Execution error: make_map requires an even 
number of arguments, got 3 instead
+SELECT MAKE_MAP('POST', 41, 'HEAD');
+
+query ?
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, 30]);
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]);
+----
+{POST: 41, HEAD: 33, PATCH: }
+
+query ?
+SELECT MAP([[1,2], [3,4]], ['a', 'b']);
+----
+{[1, 2]: a, [3, 4]: b}
+
+query error
+SELECT MAP()
+
+query error DataFusion error: Execution error: map requires an even number of 
arguments, got 1 instead
+SELECT MAP(['POST', 'HEAD'])
+
+query error DataFusion error: Execution error: Expected list, large_list or 
fixed_size_list, got Null
+SELECT MAP(null, [41, 33, 30]);
+
+query error DataFusion error: Execution error: map requires key and value 
lists to have the same length
+SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33]);
+
+query error DataFusion error: Execution error: map key cannot be null
+SELECT MAP(['POST', 'HEAD', null], [41, 33, 30]);
+
+query ?
+SELECT MAP(make_array('POST', 'HEAD', 'PATCH'), make_array(41, 33, 30));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'FixedSizeList(3, 
Utf8)'), arrow_cast(make_array(41, 33, 30), 'FixedSizeList(3, Int64)'));
+----
+{POST: 41, HEAD: 33, PATCH: 30}
+
+query ?
+SELECT MAP(arrow_cast(make_array('POST', 'HEAD', 'PATCH'), 'LargeList(Utf8)'), 
arrow_cast(make_array(41, 33, 30), 'LargeList(Int64)'));
+----
+{POST: 41, HEAD: 33, PATCH: 30}

Review Comment:
   I think if possible we should also add tests for array values (not just 
scalars)
   
   I wrote up some examples, like this:
   
   ```
   # test that maps can be created from arrays
   statement ok
   create table t as values
   ('a', 1, ['k1', 'k2'], [10.0, 20.0]),
   ('b', 2, ['k3'], [30.0]),
   ('d', 4, ['k5', 'k6'], [50.0, 60.0]);
   
   query error
   select make_map(column1, column2) from t;
   ----
   DataFusion error: Internal error: UDF returned a different number of rows 
than expected. Expected: 3, Got: 1.
   This was likely caused by a bug in DataFusion's code and we would welcome 
that you file an bug report in our issue tracker
   
   
   query error
   select map(column3, column4) from t;
   ----
   DataFusion error: Execution error: Expected scalar, got ListArray
   [
     StringArray
   [
     "k1",
     "k2",
   ],
     StringArray
   [
     "k3",
   ],
     StringArray
   [
     "k5",
     "k6",
   ],
   ]
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to