Weijun-H commented on code in PR #8516:
URL: https://github.com/apache/arrow-datafusion/pull/8516#discussion_r1430025820
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1535,52 +1536,136 @@ macro_rules! to_string {
}};
}
-fn union_generic_lists<OffsetSize: OffsetSizeTrait>(
+#[derive(Debug, PartialEq)]
+enum SetOp {
+ Union,
+ Intersect,
+}
+
+impl Display for SetOp {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ SetOp::Union => write!(f, "array_union"),
+ SetOp::Intersect => write!(f, "array_intersect"),
+ }
+ }
+}
+
+fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
- field: &FieldRef,
-) -> Result<GenericListArray<OffsetSize>> {
- let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
+ set_op: SetOp,
+) -> Result<ArrayRef> {
+ if matches!(l.value_type(), DataType::Null) {
+ let field = Arc::new(Field::new("item", r.value_type(), true));
+ return general_array_distinct::<OffsetSize>(r, &field);
+ } else if matches!(r.value_type(), DataType::Null) {
+ let field = Arc::new(Field::new("item", l.value_type(), true));
+ return general_array_distinct::<OffsetSize>(l, &field);
+ }
- let nulls = NullBuffer::union(l.nulls(), r.nulls());
- let l_values = l.values().clone();
- let r_values = r.values().clone();
- let l_values = converter.convert_columns(&[l_values])?;
- let r_values = converter.convert_columns(&[r_values])?;
+ if l.value_type() != r.value_type() {
+ return internal_err!("{set_op} is not implemented for '{l:?}' and
'{r:?}'");
+ }
- // Might be worth adding an upstream OffsetBufferBuilder
- let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
- offsets.push(OffsetSize::usize_as(0));
- let mut rows = Vec::with_capacity(l_values.num_rows() +
r_values.num_rows());
- let mut dedup = HashSet::new();
- for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
- let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
- let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
- for i in l_slice {
- let left_row = l_values.row(i);
- if dedup.insert(left_row) {
- rows.push(left_row);
- }
- }
- for i in r_slice {
- let right_row = r_values.row(i);
- if dedup.insert(right_row) {
- rows.push(right_row);
+ let dt = l.value_type();
+
+ let mut offsets = vec![OffsetSize::usize_as(0)];
+ let mut new_arrays = vec![];
+
+ let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
+ for (first_arr, second_arr) in l.iter().zip(r.iter()) {
+ if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) {
+ let l_values = converter.convert_columns(&[first_arr])?;
Review Comment:
But the previous implementation is to `convert_columns ` for all values at
once, the current is to `convert_columns ` for each element each time. The
difference in cost is negligible. @jayzhan211
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]