lyne7-sc commented on code in PR #20243:
URL: https://github.com/apache/datafusion/pull/20243#discussion_r2788523390
##########
datafusion/functions-nested/src/set_ops.rs:
##########
@@ -358,69 +364,84 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
"{set_op:?} is not implemented for '{l:?}' and '{r:?}'"
);
- let mut offsets = vec![OffsetSize::usize_as(0)];
- let mut new_arrays = vec![];
+ // Convert all values to rows in batch for performance.
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
- for (l_arr, r_arr) in l.iter().zip(r.iter()) {
- let last_offset = *offsets.last().unwrap();
-
- let (l_values, r_values) = match (l_arr, r_arr) {
- (Some(l_arr), Some(r_arr)) => (
- converter.convert_columns(&[l_arr])?,
- converter.convert_columns(&[r_arr])?,
- ),
- _ => {
- offsets.push(last_offset);
- continue;
- }
- };
-
- let l_iter = l_values.iter().sorted().dedup();
- let values_set: HashSet<_> = l_iter.clone().collect();
- let mut rows = if set_op == SetOp::Union {
- l_iter.collect()
- } else {
- vec![]
- };
+ let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
+ let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;
+ let l_offsets = l.value_offsets();
+ let r_offsets = r.value_offsets();
+
+ let mut result_offsets = Vec::with_capacity(l.len() + 1);
+ result_offsets.push(OffsetSize::usize_as(0));
+ let mut final_rows = Vec::with_capacity(rows_l.num_rows());
+
+ // Reuse hash sets across iterations
+ let mut seen = HashSet::new();
+ let mut r_set = HashSet::new();
+ for i in 0..l.len() {
+ let last_offset = *result_offsets.last().unwrap();
+
+ if l.is_null(i) || r.is_null(i) {
+ result_offsets.push(last_offset);
+ continue;
+ }
- for r_val in r_values.iter().sorted().dedup() {
- match set_op {
- SetOp::Union => {
- if !values_set.contains(&r_val) {
- rows.push(r_val);
+ let l_start = l_offsets[i].as_usize();
+ let l_end = l_offsets[i + 1].as_usize();
+ let r_start = r_offsets[i].as_usize();
+ let r_end = r_offsets[i + 1].as_usize();
+
+ let mut count = 0usize;
+ // Clear sets for reuse
+ seen.clear();
+ r_set.clear();
+
+ match set_op {
+ SetOp::Union => {
+ for idx in l_start..l_end {
+ let row = rows_l.row(idx);
+ if seen.insert(row) {
+ final_rows.push(row);
+ count += 1;
}
}
- SetOp::Intersect => {
- if values_set.contains(&r_val) {
- rows.push(r_val);
+ for idx in r_start..r_end {
+ let row = rows_r.row(idx);
+ if seen.insert(row) {
+ final_rows.push(row);
+ count += 1;
}
}
}
- }
-
- offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
- let arrays = converter.convert_rows(rows)?;
- let array = match arrays.first() {
- Some(array) => Arc::clone(array),
- None => {
- return internal_err!("{set_op}: failed to get array from
rows");
+ SetOp::Intersect => {
+ // Build hash set from right array for lookup table
+ // then iterator left array to find common elements.
Review Comment:
now building the HashSet from the shorter array and iterating over the
longer one, good catch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]