jayzhan211 commented on code in PR #8516:
URL: https://github.com/apache/arrow-datafusion/pull/8516#discussion_r1432736674
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1535,97 +1536,173 @@ macro_rules! to_string {
}};
}
-fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+#[derive(Debug, PartialEq)]
+enum SetOp {
+ Union,
+ Intersect,
+}
+
+impl Display for SetOp {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ SetOp::Union => write!(f, "array_union"),
+ SetOp::Intersect => write!(f, "array_intersect"),
+ }
+ }
+}
+
+fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
- field: &FieldRef,
-) -> Result<GenericListArray<OffsetSize>> {
- let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
+ set_op: SetOp,
+) -> Result<ArrayRef> {
+ if matches!(l.value_type(), DataType::Null) {
+ let field = Arc::new(Field::new("item", r.value_type(), true));
+ return general_array_distinct::<OffsetSize>(r, &field);
+ } else if matches!(r.value_type(), DataType::Null) {
+ let field = Arc::new(Field::new("item", l.value_type(), true));
+ return general_array_distinct::<OffsetSize>(l, &field);
+ }
- let nulls = NullBuffer::union(l.nulls(), r.nulls());
- let l_values = l.values().clone();
- let r_values = r.values().clone();
- let l_values = converter.convert_columns(&[l_values])?;
- let r_values = converter.convert_columns(&[r_values])?;
+ if l.value_type() != r.value_type() {
+ return internal_err!("{set_op:?} is not implemented for '{l:?}' and
'{r:?}'");
+ }
- // Might be worth adding an upstream OffsetBufferBuilder
- let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
- offsets.push(OffsetSize::usize_as(0));
- let mut rows = Vec::with_capacity(l_values.num_rows() +
r_values.num_rows());
- let mut dedup = HashSet::new();
- for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
- let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
- let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
- for i in l_slice {
- let left_row = l_values.row(i);
- if dedup.insert(left_row) {
- rows.push(left_row);
- }
- }
- for i in r_slice {
- let right_row = r_values.row(i);
- if dedup.insert(right_row) {
- rows.push(right_row);
+ let dt = l.value_type();
+
+ let mut offsets = vec![OffsetSize::usize_as(0)];
+ let mut new_arrays = vec![];
+
+ let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
+ for (first_arr, second_arr) in l.iter().zip(r.iter()) {
+ if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) {
+ let l_values = converter.convert_columns(&[first_arr])?;
+ let r_values = converter.convert_columns(&[second_arr])?;
+
+ let l_iter = l_values.iter().sorted().dedup();
+ let values_set: HashSet<_> = l_iter.clone().collect();
+ let mut rows = if set_op == SetOp::Union {
+ l_iter.collect::<Vec<_>>()
+ } else {
+ vec![]
+ };
+ for r_val in r_values.iter().sorted().dedup() {
+ match set_op {
+ SetOp::Union => {
+ if !values_set.contains(&r_val) {
+ rows.push(r_val);
+ }
+ }
+ SetOp::Intersect => {
+ if values_set.contains(&r_val) {
+ rows.push(r_val);
+ }
+ }
+ }
}
+
+ let last_offset = match offsets.last().copied() {
+ Some(offset) => offset,
+ None => return internal_err!("offsets should not be empty"),
+ };
+ offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
+ let arrays = converter.convert_rows(rows)?;
+ let array = match arrays.first() {
+ Some(array) => array.clone(),
+ None => {
+ return internal_err!("{set_op}: failed to get array from
rows");
+ }
+ };
+ new_arrays.push(array);
}
- offsets.push(OffsetSize::usize_as(rows.len()));
- dedup.clear();
}
- let values = converter.convert_rows(rows)?;
+ let field = Arc::new(Field::new("item", dt, true));
Review Comment:
why should we create a field here instead of pass the cloned one? if you got
the error related to field, likely there is error in return_type or elsewhere
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]