Copilot commented on code in PR #1701:
URL: https://github.com/apache/auron/pull/1701#discussion_r2591948505
##########
native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs:
##########
@@ -131,66 +131,37 @@ impl<const L_OUTER: bool, const R_OUTER: bool> Joiner for
FullJoiner<L_OUTER, R_
cur_forward!(cur2);
// iterate both stream, find smaller one, use it for
probing
- let mut has_multi_equal = false;
let mut l_equal = true;
let mut r_equal = true;
- while l_equal && r_equal {
+
+ while l_equal {
+ l_equal = !cur1.finished() && cur1.cur_key() ==
cur1.key(l_key_idx);
if l_equal {
- l_equal = !cur1.finished() && cur1.cur_key() ==
cur1.key(l_key_idx);
- if l_equal {
- has_multi_equal = true;
- equal_lindices.push(cur1.cur_idx());
- cur_forward!(cur1);
- }
+ equal_lindices.push(cur1.cur_idx());
+ cur_forward!(cur1);
}
+ }
+
+ while r_equal {
+ r_equal = !cur2.finished() && cur2.cur_key() ==
cur2.key(r_key_idx);
if r_equal {
- r_equal = !cur2.finished() && cur2.cur_key() ==
cur2.key(r_key_idx);
- if r_equal {
- has_multi_equal = true;
- equal_rindices.push(cur2.cur_idx());
- cur_forward!(cur2);
- }
+ equal_rindices.push(cur2.cur_idx());
+ cur_forward!(cur2);
}
}
// fast path for one-to-one join
- if !has_multi_equal {
+ if equal_lindices.len() <= 1 && equal_rindices.len() <= 1 {
self.lindices.push(l_key_idx);
self.rindices.push(r_key_idx);
continue;
}
-
- for (&lidx, &ridx) in
equal_lindices.iter().cartesian_product(&equal_rindices) {
- self.lindices.push(lidx);
- self.rindices.push(ridx);
- }
-
- if r_equal {
- // stream right side
- while !cur2.finished() && cur2.cur_key() ==
cur1.key(l_key_idx) {
- for &lidx in &equal_lindices {
- self.lindices.push(lidx);
- self.rindices.push(cur2.cur_idx());
- }
- cur_forward!(cur2);
- if self.should_flush() ||
cur2.num_buffered_batches() > 1 {
- self.as_mut().flush(cur1, cur2).await?;
- cur2.clean_out_dated_batches();
- }
- }
- }
-
- if l_equal {
- // stream left side
- while !cur1.finished() && cur1.cur_key() ==
cur2.key(r_key_idx) {
- for &ridx in &equal_rindices {
- self.lindices.push(cur1.cur_idx());
- self.rindices.push(ridx);
- }
- cur_forward!(cur1);
- if self.should_flush() ||
cur1.num_buffered_batches() > 1 {
+ for &lidx in &equal_lindices {
+ for &ridx in &equal_rindices {
+ self.lindices.push(lidx);
+ self.rindices.push(ridx);
+ if self.should_flush() {
self.as_mut().flush(cur1, cur2).await?;
Review Comment:
The flush condition is incomplete and missing cleanup calls that are
critical for memory management during joins with many duplicate keys:
1. Missing check for `cur1.num_buffered_batches() > 1 ||
cur2.num_buffered_batches() > 1` (present at lines 99-102)
2. Missing calls to `clean_out_dated_batches()` after flushing (used
consistently elsewhere at lines 104-105, 174-175, 186, 196)
Without these, memory will accumulate unbounded during cartesian product
generation when there are many duplicate keys, defeating the purpose of this PR.
Should be:
```rust
if self.should_flush() || cur1.num_buffered_batches() > 1 ||
cur2.num_buffered_batches() > 1 {
self.as_mut().flush(cur1, cur2).await?;
cur1.clean_out_dated_batches();
cur2.clean_out_dated_batches();
}
```
```suggestion
if self.should_flush()
|| cur1.num_buffered_batches() > 1
|| cur2.num_buffered_batches() > 1
{
self.as_mut().flush(cur1, cur2).await?;
cur1.clean_out_dated_batches();
cur2.clean_out_dated_batches();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]