Veeupup commented on code in PR #8081:
URL: https://github.com/apache/arrow-datafusion/pull/8081#discussion_r1386766064
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1821,88 +1822,95 @@ pub fn array_has_all(args: &[ArrayRef]) ->
Result<ArrayRef> {
Ok(Arc::new(boolean_builder.finish()))
}
-macro_rules! array_intersect_normal {
- ($FIRST_ARRAY:expr, $SECOND_ARRAY:expr, $DATA_TYPE:expr,
$ARRAY_TYPE:ident, $BUILDER:ident) => {{
- let mut offsets: Vec<i32> = vec![0];
- let mut values =
- downcast_arg!(new_empty_array(&$DATA_TYPE), $ARRAY_TYPE).clone();
-
- for (first_arr, second_arr) in
$FIRST_ARRAY.iter().zip($SECOND_ARRAY.iter()) {
- let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
- DataFusionError::Internal(format!("offsets should not be
empty"))
- })?;
- match (first_arr, second_arr) {
- (Some(first_arr), Some(second_arr)) => {
- let first_arr = downcast_arg!(first_arr, $ARRAY_TYPE);
- // TODO(veeupup): maybe use stack-implemented map to avoid
heap memory allocation
- let first_set =
first_arr.iter().dedup().flatten().collect::<HashSet<_>>();
- let second_arr = downcast_arg!(second_arr, $ARRAY_TYPE);
-
- let mut builder = $BUILDER::new();
- for elem in second_arr.iter().dedup().flatten() {
- if first_set.contains(&elem) {
- builder.append_value(elem);
- }
- }
-
- let arr = builder.finish();
- values = downcast_arg!(
- compute::concat(&[
- &values,
- &arr
- ])?
- .clone(),
- $ARRAY_TYPE
- )
- .clone();
- offsets.push(last_offset + arr.len() as i32);
- },
- _ => {
- offsets.push(last_offset);
- }
- }
- }
- let field = Arc::new(Field::new("item", $DATA_TYPE, true));
-
- Ok(Arc::new(ListArray::try_new(
- field,
- OffsetBuffer::new(offsets.into()),
- Arc::new(values),
- None,
- )?))
-
- }};
-}
-
/// array_intersect SQL function
pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 2);
let first_array = as_list_array(&args[0])?;
let second_array = as_list_array(&args[1])?;
- match (first_array.value_type(), second_array.value_type()) {
+ let dt = match (first_array.value_type(), second_array.value_type()) {
// (DataType::List(_), DataType::List(_)) => concat_internal(args)?,
- (DataType::Utf8, DataType::Utf8) =>
array_intersect_normal!(first_array, second_array, DataType::Utf8, StringArray,
StringBuilder),
- (DataType::LargeUtf8, DataType::LargeUtf8) =>
array_intersect_normal!(first_array, second_array, DataType::LargeUtf8,
LargeStringArray, LargeStringBuilder),
- (DataType::Boolean, DataType::Boolean) =>
array_intersect_normal!(first_array, second_array, DataType::Boolean,
BooleanArray, BooleanBuilder),
- // (DataType::Float32, DataType::Float32) =>
array_intersect_normal!(arr, element, Float32Array),
- // (DataType::Float64, DataType::Float64) =>
array_intersect_normal!(arr, element, Float64Array),
- (DataType::Int8, DataType::Int8) =>
array_intersect_normal!(first_array, second_array, DataType::Int8, Int8Array,
Int8Builder),
- (DataType::Int16, DataType::Int16) =>
array_intersect_normal!(first_array, second_array, DataType::Int16, Int16Array,
Int16Builder),
- (DataType::Int32, DataType::Int32) =>
array_intersect_normal!(first_array, second_array, DataType::Int32, Int32Array,
Int32Builder),
- (DataType::Int64, DataType::Int64) =>
array_intersect_normal!(first_array, second_array, DataType::Int64, Int64Array,
Int64Builder),
- (DataType::UInt8, DataType::UInt8) =>
array_intersect_normal!(first_array, second_array, DataType::UInt8, UInt8Array,
UInt8Builder),
- (DataType::UInt16, DataType::UInt16) =>
array_intersect_normal!(first_array, second_array, DataType::UInt16,
UInt16Array, UInt16Builder),
- (DataType::UInt32, DataType::UInt32) =>
array_intersect_normal!(first_array, second_array, DataType::UInt32,
UInt32Array, UInt32Builder),
- (DataType::UInt64, DataType::UInt64) =>
array_intersect_normal!(first_array, second_array, DataType::UInt64,
UInt64Array, UInt64Builder),
+ (DataType::Utf8, DataType::Utf8) => DataType::Utf8,
+ (DataType::LargeUtf8, DataType::LargeUtf8) => DataType::LargeUtf8,
+ (DataType::Boolean, DataType::Boolean) => DataType::Boolean,
+ (DataType::Float32, DataType::Float32) => DataType::Float32,
+ (DataType::Float64, DataType::Float64) => DataType::Float64,
+ (DataType::Int8, DataType::Int8) => DataType::Int8,
+ (DataType::Int16, DataType::Int16) => DataType::Int16,
+ (DataType::Int32, DataType::Int32) => DataType::Int32,
+ (DataType::Int64, DataType::Int64) => DataType::Int64,
+ (DataType::UInt8, DataType::UInt8) => DataType::UInt8,
+ (DataType::UInt16, DataType::UInt16) => DataType::UInt16,
+ (DataType::UInt32, DataType::UInt32) => DataType::UInt32,
+ (DataType::UInt64, DataType::UInt64) => DataType::UInt64,
// (DataType::Null, _) => return
Ok(array(&[ColumnarValue::Array(args[1].clone())])?.into_array(1)),
- (first_value_dt, second_value_dt) => {
- Err(DataFusionError::NotImplemented(format!(
+ (first_value_dt, second_value_dt) =>
+ return Err(DataFusionError::NotImplemented(format!(
"array_intersect is not implemented for '{first_value_dt:?}'
and '{second_value_dt:?}'",
)))
+ };
+
+ let mut offsets = vec![0];
+
+ let mut tmp_values = vec![];
+
+ let mut converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
Review Comment:
haven't finished. ^ . ^ will be finished in hours..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]