Copilot commented on code in PR #1701:
URL: https://github.com/apache/auron/pull/1701#discussion_r2591948505


##########
native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs:
##########
@@ -131,66 +131,37 @@ impl<const L_OUTER: bool, const R_OUTER: bool> Joiner for 
FullJoiner<L_OUTER, R_
                     cur_forward!(cur2);
 
                     // iterate both stream, find smaller one, use it for 
probing
-                    let mut has_multi_equal = false;
                     let mut l_equal = true;
                     let mut r_equal = true;
-                    while l_equal && r_equal {
+
+                    while l_equal {
+                        l_equal = !cur1.finished() && cur1.cur_key() == 
cur1.key(l_key_idx);
                         if l_equal {
-                            l_equal = !cur1.finished() && cur1.cur_key() == 
cur1.key(l_key_idx);
-                            if l_equal {
-                                has_multi_equal = true;
-                                equal_lindices.push(cur1.cur_idx());
-                                cur_forward!(cur1);
-                            }
+                            equal_lindices.push(cur1.cur_idx());
+                            cur_forward!(cur1);
                         }
+                    }
+
+                    while r_equal {
+                        r_equal = !cur2.finished() && cur2.cur_key() == 
cur2.key(r_key_idx);
                         if r_equal {
-                            r_equal = !cur2.finished() && cur2.cur_key() == 
cur2.key(r_key_idx);
-                            if r_equal {
-                                has_multi_equal = true;
-                                equal_rindices.push(cur2.cur_idx());
-                                cur_forward!(cur2);
-                            }
+                            equal_rindices.push(cur2.cur_idx());
+                            cur_forward!(cur2);
                         }
                     }
 
                     // fast path for one-to-one join
-                    if !has_multi_equal {
+                    if equal_lindices.len() <= 1 && equal_rindices.len() <= 1 {
                         self.lindices.push(l_key_idx);
                         self.rindices.push(r_key_idx);
                         continue;
                     }
-
-                    for (&lidx, &ridx) in 
equal_lindices.iter().cartesian_product(&equal_rindices) {
-                        self.lindices.push(lidx);
-                        self.rindices.push(ridx);
-                    }
-
-                    if r_equal {
-                        // stream right side
-                        while !cur2.finished() && cur2.cur_key() == 
cur1.key(l_key_idx) {
-                            for &lidx in &equal_lindices {
-                                self.lindices.push(lidx);
-                                self.rindices.push(cur2.cur_idx());
-                            }
-                            cur_forward!(cur2);
-                            if self.should_flush() || 
cur2.num_buffered_batches() > 1 {
-                                self.as_mut().flush(cur1, cur2).await?;
-                                cur2.clean_out_dated_batches();
-                            }
-                        }
-                    }
-
-                    if l_equal {
-                        // stream left side
-                        while !cur1.finished() && cur1.cur_key() == 
cur2.key(r_key_idx) {
-                            for &ridx in &equal_rindices {
-                                self.lindices.push(cur1.cur_idx());
-                                self.rindices.push(ridx);
-                            }
-                            cur_forward!(cur1);
-                            if self.should_flush() || 
cur1.num_buffered_batches() > 1 {
+                    for &lidx in &equal_lindices {
+                        for &ridx in &equal_rindices {
+                            self.lindices.push(lidx);
+                            self.rindices.push(ridx);
+                            if self.should_flush() {
                                 self.as_mut().flush(cur1, cur2).await?;

Review Comment:
   The flush condition is incomplete and missing cleanup calls that are 
critical for memory management during joins with many duplicate keys:
   
   1. Missing check for `cur1.num_buffered_batches() > 1 || 
cur2.num_buffered_batches() > 1` (present at lines 99-102)
   2. Missing calls to `clean_out_dated_batches()` after flushing (used 
consistently elsewhere at lines 104-105, 174-175, 186, 196)
   
   Without these, memory will accumulate unbounded during cartesian product 
generation when there are many duplicate keys, defeating the purpose of this PR.
   
   Should be:
   ```rust
   if self.should_flush() || cur1.num_buffered_batches() > 1 || 
cur2.num_buffered_batches() > 1 {
       self.as_mut().flush(cur1, cur2).await?;
       cur1.clean_out_dated_batches();
       cur2.clean_out_dated_batches();
   }
   ```
   ```suggestion
                               if self.should_flush()
                                   || cur1.num_buffered_batches() > 1
                                   || cur2.num_buffered_batches() > 1
                               {
                                   self.as_mut().flush(cur1, cur2).await?;
                                   cur1.clean_out_dated_batches();
                                   cur2.clean_out_dated_batches();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to