XorSum commented on code in PR #1701:
URL: https://github.com/apache/auron/pull/1701#discussion_r2591988457
##########
native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs:
##########
@@ -131,66 +131,37 @@ impl<const L_OUTER: bool, const R_OUTER: bool> Joiner for
FullJoiner<L_OUTER, R_
cur_forward!(cur2);
// iterate both stream, find smaller one, use it for
probing
- let mut has_multi_equal = false;
let mut l_equal = true;
let mut r_equal = true;
- while l_equal && r_equal {
+
+ while l_equal {
+ l_equal = !cur1.finished() && cur1.cur_key() ==
cur1.key(l_key_idx);
if l_equal {
- l_equal = !cur1.finished() && cur1.cur_key() ==
cur1.key(l_key_idx);
- if l_equal {
- has_multi_equal = true;
- equal_lindices.push(cur1.cur_idx());
- cur_forward!(cur1);
- }
+ equal_lindices.push(cur1.cur_idx());
+ cur_forward!(cur1);
}
+ }
+
+ while r_equal {
+ r_equal = !cur2.finished() && cur2.cur_key() ==
cur2.key(r_key_idx);
if r_equal {
- r_equal = !cur2.finished() && cur2.cur_key() ==
cur2.key(r_key_idx);
- if r_equal {
- has_multi_equal = true;
- equal_rindices.push(cur2.cur_idx());
- cur_forward!(cur2);
- }
+ equal_rindices.push(cur2.cur_idx());
+ cur_forward!(cur2);
}
}
// fast path for one-to-one join
- if !has_multi_equal {
+ if equal_lindices.len() <= 1 && equal_rindices.len() <= 1 {
self.lindices.push(l_key_idx);
self.rindices.push(r_key_idx);
continue;
}
-
- for (&lidx, &ridx) in
equal_lindices.iter().cartesian_product(&equal_rindices) {
- self.lindices.push(lidx);
- self.rindices.push(ridx);
- }
-
- if r_equal {
- // stream right side
- while !cur2.finished() && cur2.cur_key() ==
cur1.key(l_key_idx) {
- for &lidx in &equal_lindices {
- self.lindices.push(lidx);
- self.rindices.push(cur2.cur_idx());
- }
- cur_forward!(cur2);
- if self.should_flush() ||
cur2.num_buffered_batches() > 1 {
- self.as_mut().flush(cur1, cur2).await?;
- cur2.clean_out_dated_batches();
- }
- }
- }
-
- if l_equal {
- // stream left side
- while !cur1.finished() && cur1.cur_key() ==
cur2.key(r_key_idx) {
- for &ridx in &equal_rindices {
- self.lindices.push(cur1.cur_idx());
- self.rindices.push(ridx);
- }
- cur_forward!(cur1);
- if self.should_flush() ||
cur1.num_buffered_batches() > 1 {
+ for &lidx in &equal_lindices {
+ for &ridx in &equal_rindices {
+ self.lindices.push(lidx);
+ self.rindices.push(ridx);
+ if self.should_flush() {
self.as_mut().flush(cur1, cur2).await?;
Review Comment:
We should not call `cur1 .clean_out_dated_batches()`, because doing so will
break the index correlation between `cur1` and `equal_lindices`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]