This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 44b9066ea4 Support Arrays for the Map scalar functions (#11712)
44b9066ea4 is described below
commit 44b9066ea452f760621e3d8076338abcdfd67de0
Author: Dharan Aditya <[email protected]>
AuthorDate: Tue Aug 13 23:55:16 2024 +0530
Support Arrays for the Map scalar functions (#11712)
* crude impl to support array
* ++improvement
* uncomment logic test
* working impl
* leverage return_type_from_exprs
* add documentation
* remove unwrap method
* add more slt tests
* typos
* typos
* remove extract based on dt
* few more tests
* move back to return_type
* improve error & tests
* Update datafusion/functions-nested/src/map.rs
Co-authored-by: Alex Huang <[email protected]>
---------
Co-authored-by: Alex Huang <[email protected]>
---
datafusion/common/src/utils/mod.rs | 9 +-
datafusion/functions-nested/src/map.rs | 141 +++++++++++++++++++++++++++--
datafusion/sqllogictest/test_files/map.slt | 55 ++++++++---
3 files changed, 180 insertions(+), 25 deletions(-)
diff --git a/datafusion/common/src/utils/mod.rs
b/datafusion/common/src/utils/mod.rs
index 12e306ffaf..bf506c0551 100644
--- a/datafusion/common/src/utils/mod.rs
+++ b/datafusion/common/src/utils/mod.rs
@@ -29,8 +29,10 @@ use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
+use arrow_array::cast::AsArray;
use arrow_array::{
- Array, FixedSizeListArray, LargeListArray, ListArray, RecordBatchOptions,
+ Array, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait,
+ RecordBatchOptions,
};
use arrow_schema::DataType;
use sqlparser::ast::Ident;
@@ -440,6 +442,11 @@ pub fn arrays_into_list_array(
))
}
+/// Helper function to convert a ListArray into a vector of ArrayRefs.
+pub fn list_to_arrays<O: OffsetSizeTrait>(a: ArrayRef) -> Vec<ArrayRef> {
+ a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
+}
+
/// Get the base type of a data type.
///
/// Example
diff --git a/datafusion/functions-nested/src/map.rs
b/datafusion/functions-nested/src/map.rs
index e218b501dc..b6068fdff0 100644
--- a/datafusion/functions-nested/src/map.rs
+++ b/datafusion/functions-nested/src/map.rs
@@ -15,17 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-use crate::make_array::make_array;
+use std::any::Any;
+use std::collections::VecDeque;
+use std::sync::Arc;
+
use arrow::array::ArrayData;
-use arrow_array::{Array, ArrayRef, MapArray, StructArray};
+use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray};
use arrow_buffer::{Buffer, ToByteSlice};
use arrow_schema::{DataType, Field, SchemaBuilder};
+
use datafusion_common::{exec_err, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature,
Volatility};
-use std::any::Any;
-use std::collections::VecDeque;
-use std::sync::Arc;
+
+use crate::make_array::make_array;
/// Returns a map created from a key list and a value list
pub fn map(keys: Vec<Expr>, values: Vec<Expr>) -> Expr {
@@ -56,11 +59,11 @@ fn make_map_batch(args: &[ColumnarValue]) ->
datafusion_common::Result<ColumnarV
);
}
+ let data_type = args[0].data_type();
let can_evaluate_to_const = can_evaluate_to_const(args);
-
let key = get_first_array_ref(&args[0])?;
let value = get_first_array_ref(&args[1])?;
- make_map_batch_internal(key, value, can_evaluate_to_const)
+ make_map_batch_internal(key, value, can_evaluate_to_const, data_type)
}
fn get_first_array_ref(
@@ -73,7 +76,7 @@ fn get_first_array_ref(
ScalarValue::FixedSizeList(array) => Ok(array.value(0)),
_ => exec_err!("Expected array, got {:?}", value),
},
- ColumnarValue::Array(array) => exec_err!("Expected scalar, got {:?}",
array),
+ ColumnarValue::Array(array) => Ok(array.to_owned()),
}
}
@@ -81,6 +84,7 @@ fn make_map_batch_internal(
keys: ArrayRef,
values: ArrayRef,
can_evaluate_to_const: bool,
+ data_type: DataType,
) -> datafusion_common::Result<ColumnarValue> {
if keys.null_count() > 0 {
return exec_err!("map key cannot be null");
@@ -90,6 +94,14 @@ fn make_map_batch_internal(
return exec_err!("map requires key and value lists to have the same
length");
}
+ if !can_evaluate_to_const {
+ return if let DataType::LargeList(..) = data_type {
+ make_map_array_internal::<i64>(keys, values)
+ } else {
+ make_map_array_internal::<i32>(keys, values)
+ };
+ }
+
let key_field = Arc::new(Field::new("key", keys.data_type().clone(),
false));
let value_field = Arc::new(Field::new("value", values.data_type().clone(),
true));
let mut entry_struct_buffer: VecDeque<(Arc<Field>, ArrayRef)> =
VecDeque::new();
@@ -190,7 +202,6 @@ impl ScalarUDFImpl for MapFunc {
make_map_batch(args)
}
}
-
fn get_element_type(data_type: &DataType) ->
datafusion_common::Result<&DataType> {
match data_type {
DataType::List(element) => Ok(element.data_type()),
@@ -202,3 +213,115 @@ fn get_element_type(data_type: &DataType) ->
datafusion_common::Result<&DataType
),
}
}
+
+/// Helper function to create MapArray from array of values to support arrays
for Map scalar function
+///
+/// ``` text
+/// Format of input KEYS and VALUES column
+/// keys values
+/// +---------------------+ +---------------------+
+/// | +-----------------+ | | +-----------------+ |
+/// | | [k11, k12, k13] | | | | [v11, v12, v13] | |
+/// | +-----------------+ | | +-----------------+ |
+/// | | | |
+/// | +-----------------+ | | +-----------------+ |
+/// | | [k21, k22, k23] | | | | [v21, v22, v23] | |
+/// | +-----------------+ | | +-----------------+ |
+/// | | | |
+/// | +-----------------+ | | +-----------------+ |
+/// | |[k31, k32, k33] | | | |[v31, v32, v33] | |
+/// | +-----------------+ | | +-----------------+ |
+/// +---------------------+ +---------------------+
+/// ```
+/// Flattened keys and values array to user create `StructArray`,
+/// which serves as inner child for `MapArray`
+///
+/// ``` text
+/// Flattened Flattened
+/// Keys Values
+/// +-----------+ +-----------+
+/// | +-------+ | | +-------+ |
+/// | | k11 | | | | v11 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k12 | | | | v12 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k13 | | | | v13 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k21 | | | | v21 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k22 | | | | v22 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k23 | | | | v23 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k31 | | | | v31 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k32 | | | | v32 | |
+/// | +-------+ | | +-------+ |
+/// | +-------+ | | +-------+ |
+/// | | k33 | | | | v33 | |
+/// | +-------+ | | +-------+ |
+/// +-----------+ +-----------+
+/// ```text
+
+fn make_map_array_internal<O: OffsetSizeTrait>(
+ keys: ArrayRef,
+ values: ArrayRef,
+) -> datafusion_common::Result<ColumnarValue> {
+ let mut offset_buffer = vec![O::zero()];
+ let mut running_offset = O::zero();
+
+ let keys = datafusion_common::utils::list_to_arrays::<O>(keys);
+ let values = datafusion_common::utils::list_to_arrays::<O>(values);
+
+ let mut key_array_vec = vec![];
+ let mut value_array_vec = vec![];
+ for (k, v) in keys.iter().zip(values.iter()) {
+ running_offset = running_offset.add(O::usize_as(k.len()));
+ offset_buffer.push(running_offset);
+ key_array_vec.push(k.as_ref());
+ value_array_vec.push(v.as_ref());
+ }
+
+ // concatenate all the arrays
+ let flattened_keys = arrow::compute::concat(key_array_vec.as_ref())?;
+ if flattened_keys.null_count() > 0 {
+ return exec_err!("keys cannot be null");
+ }
+ let flattened_values = arrow::compute::concat(value_array_vec.as_ref())?;
+
+ let fields = vec![
+ Arc::new(Field::new("key", flattened_keys.data_type().clone(), false)),
+ Arc::new(Field::new(
+ "value",
+ flattened_values.data_type().clone(),
+ true,
+ )),
+ ];
+
+ let struct_data = ArrayData::builder(DataType::Struct(fields.into()))
+ .len(flattened_keys.len())
+ .add_child_data(flattened_keys.to_data())
+ .add_child_data(flattened_values.to_data())
+ .build()?;
+
+ let map_data = ArrayData::builder(DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ struct_data.data_type().clone(),
+ false,
+ )),
+ false,
+ ))
+ .len(keys.len())
+ .add_child_data(struct_data)
+ .add_buffer(Buffer::from_slice_ref(offset_buffer.as_slice()))
+ .build()?;
+ Ok(ColumnarValue::Array(Arc::new(MapArray::from(map_data))))
+}
diff --git a/datafusion/sqllogictest/test_files/map.slt
b/datafusion/sqllogictest/test_files/map.slt
index eb350c22bb..0dc37c68bc 100644
--- a/datafusion/sqllogictest/test_files/map.slt
+++ b/datafusion/sqllogictest/test_files/map.slt
@@ -199,25 +199,50 @@ SELECT MAP(arrow_cast(make_array('POST', 'HEAD',
'PATCH'), 'LargeList(Utf8)'), a
statement ok
create table t as values
-('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2]),
-('b', 2, 'k3', 30, ['k3'], [3]),
-('d', 4, 'k5', 50, ['k5'], [5]);
+('a', 1, 'k1', 10, ['k1', 'k2'], [1, 2], 'POST', [[1,2,3]], ['a']),
+('b', 2, 'k3', 30, ['k3'], [3], 'PUT', [[4]], ['b']),
+('d', 4, 'k5', 50, ['k5'], [5], null, [[1,2]], ['c']);
-query error
+query ?
SELECT make_map(column1, column2, column3, column4) FROM t;
-# TODO: support array value
-# ----
-# {a: 1, k1: 10}
-# {b: 2, k3: 30}
-# {d: 4, k5: 50}
+----
+{a: 1, k1: 10}
+{b: 2, k3: 30}
+{d: 4, k5: 50}
-query error
+query ?
SELECT map(column5, column6) FROM t;
-# TODO: support array value
-# ----
-# {k1:1, k2:2}
-# {k3: 3}
-# {k5: 5}
+----
+{k1: 1, k2: 2}
+{k3: 3}
+{k5: 5}
+
+query ?
+SELECT map(column8, column9) FROM t;
+----
+{[1, 2, 3]: a}
+{[4]: b}
+{[1, 2]: c}
+
+query error
+SELECT map(column6, column7) FROM t;
+
+query ?
+select Map {column6: column7} from t;
+----
+{[1, 2]: POST}
+{[3]: PUT}
+{[5]: }
+
+query ?
+select Map {column8: column7} from t;
+----
+{[[1, 2, 3]]: POST}
+{[[4]]: PUT}
+{[[1, 2]]: }
+
+query error
+select Map {column7: column8} from t;
query ?
SELECT MAKE_MAP('POST', 41, 'HEAD', 33, 'PATCH', 30, 'OPTION', 29, 'GET', 27,
'PUT', 25, 'DELETE', 24) AS method_count from t;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]