mbutrovich commented on code in PR #21184:
URL: https://github.com/apache/datafusion/pull/21184#discussion_r2997174788


##########
datafusion/physical-plan/src/joins/sort_merge_join/filter.rs:
##########
@@ -282,314 +255,131 @@ pub fn get_corrected_filter_mask(
     let mut seen_true = false;
 
     match join_type {
-        JoinType::Left | JoinType::Right => {
-            // For outer joins: Keep first matching row per input row,
-            // convert rest to nulls, add null-joined rows for unmatched
-            for i in 0..row_indices_length {
-                let last_index =
-                    last_index_for_row(i, row_indices, batch_ids, 
row_indices_length);
-                if filter_mask.value(i) {
-                    seen_true = true;
-                    corrected_mask.append_value(true);
-                } else if seen_true || !filter_mask.value(i) && !last_index {
-                    corrected_mask.append_null(); // to be ignored and not set 
to output
-                } else {
-                    corrected_mask.append_value(false); // to be converted to 
null joined row
-                }
-
-                if last_index {
-                    seen_true = false;
-                }
-            }
-
-            // Generate null joined rows for records which have no matching 
join key
-            corrected_mask.append_n(expected_size - corrected_mask.len(), 
false);
-            Some(corrected_mask.finish())
-        }
-        JoinType::LeftMark | JoinType::RightMark => {
-            // For mark joins: Like outer but only keep first match, mark with 
boolean
+        JoinType::Left | JoinType::Right | JoinType::Full => {
+            // For each input row group: keep first filter-passing row,
+            // discard (null) remaining matches, null-join if none passed.
+            // Null metadata entries are already-null-joined rows that
+            // flow through unchanged to preserve output ordering.
             for i in 0..row_indices_length {
                 let last_index =
                     last_index_for_row(i, row_indices, batch_ids, 
row_indices_length);
-                if filter_mask.value(i) && !seen_true {
-                    seen_true = true;
-                    corrected_mask.append_value(true);
-                } else if seen_true || !filter_mask.value(i) && !last_index {
-                    corrected_mask.append_null(); // to be ignored and not set 
to output
-                } else {
-                    corrected_mask.append_value(false); // to be converted to 
null joined row
-                }
-
-                if last_index {
-                    seen_true = false;
-                }
-            }
-
-            // Generate null joined rows for records which have no matching 
join key
-            corrected_mask.append_n(expected_size - corrected_mask.len(), 
false);
-            Some(corrected_mask.finish())
-        }
-        JoinType::LeftSemi | JoinType::RightSemi => {
-            // For semi joins: Keep only first matching row per input row, 
discard rest
-            for i in 0..row_indices_length {
-                let last_index =
-                    last_index_for_row(i, row_indices, batch_ids, 
row_indices_length);
-                if filter_mask.value(i) && !seen_true {
-                    seen_true = true;
-                    corrected_mask.append_value(true);
-                } else {
-                    corrected_mask.append_null(); // to be ignored and not set 
to output
-                }
-
-                if last_index {
-                    seen_true = false;
-                }
-            }
-
-            Some(corrected_mask.finish())
-        }
-        JoinType::LeftAnti | JoinType::RightAnti => {
-            // For anti joins: Keep row only if NO matches passed the filter
-            for i in 0..row_indices_length {
-                let last_index =
-                    last_index_for_row(i, row_indices, batch_ids, 
row_indices_length);
-
-                if filter_mask.value(i) {
-                    seen_true = true;
-                }
-
-                if last_index {
-                    if !seen_true {
-                        corrected_mask.append_value(true);
-                    } else {
-                        corrected_mask.append_null();
-                    }
-
-                    seen_true = false;
-                } else {
-                    corrected_mask.append_null();
-                }
-            }
-            // Generate null joined rows for records which have no matching 
join key,
-            // for LeftAnti non-matched considered as true
-            corrected_mask.append_n(expected_size - corrected_mask.len(), 
true);
-            Some(corrected_mask.finish())
-        }
-        JoinType::Full => {
-            // For full joins: Similar to outer but handle both sides
-            for i in 0..row_indices_length {
-                let last_index =
-                    last_index_for_row(i, row_indices, batch_ids, 
row_indices_length);
-
                 if filter_mask.is_null(i) {
-                    // null joined
                     corrected_mask.append_value(true);
                 } else if filter_mask.value(i) {
                     seen_true = true;
                     corrected_mask.append_value(true);
                 } else if seen_true || !filter_mask.value(i) && !last_index {
-                    corrected_mask.append_null(); // to be ignored and not set 
to output
+                    corrected_mask.append_null();
                 } else {
-                    corrected_mask.append_value(false); // to be converted to 
null joined row
+                    corrected_mask.append_value(false);
                 }
 
                 if last_index {
                     seen_true = false;
                 }
             }
-            // Generate null joined rows for records which have no matching 
join key
+
             corrected_mask.append_n(expected_size - corrected_mask.len(), 
false);
             Some(corrected_mask.finish())
         }
-        JoinType::Inner => {
-            // Inner joins don't need deferred filtering
-            None
+        JoinType::LeftMark
+        | JoinType::RightMark
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::RightAnti => {
+            // Semi/anti/mark joins are handled by 
SemiAntiMarkSortMergeJoinStream
+            unreachable!(
+                "Semi/anti/mark joins should not reach 
get_corrected_filter_mask; \
+                 they are handled by SemiAntiMarkSortMergeJoinStream"
+            )
         }
+        JoinType::Inner => None,
     }
 }
 
 /// Applies corrected filter mask to record batch based on join type
 ///
-/// Different join types require different handling of filtered results:
-/// - Outer joins: Add null-joined rows for false mask values
-/// - Semi/Anti joins: May need projection to remove right columns
-/// - Full joins: Add null-joined rows for both sides
+/// The corrected mask has three possible values per row:
+/// - `true`: Keep the row as-is (matched and passed filter)
+/// - `false`: Convert to null-joined row (all filter matches failed for this 
input row)
+/// - `null`: Discard the row entirely (duplicate match for an already-output 
input row)
+///
+/// This function preserves input row ordering by processing each row in place
+/// rather than separating matched/unmatched rows.
 pub fn filter_record_batch_by_join_type(
     record_batch: &RecordBatch,
     corrected_mask: &BooleanArray,
     join_type: JoinType,
     schema: &SchemaRef,
-    streamed_schema: &SchemaRef,
     buffered_schema: &SchemaRef,
 ) -> Result<RecordBatch> {
-    let filtered_record_batch = filter_record_batch(record_batch, 
corrected_mask)?;
-
     match join_type {
-        JoinType::Left | JoinType::LeftMark => {
-            // For left joins, add null-joined rows where mask is false
-            let null_mask = compute::not(corrected_mask)?;
-            let null_joined_batch = filter_record_batch(record_batch, 
&null_mask)?;
+        JoinType::Left | JoinType::Right | JoinType::Full => {
+            // Discard null-masked rows (keep true + false only)
+            let keep_mask = compute::is_not_null(corrected_mask)?;
+            let kept_batch = filter_record_batch(record_batch, &keep_mask)?;
+            let kept_corrected = compute::filter(corrected_mask, &keep_mask)?;
+            let kept_corrected = kept_corrected
+                .as_any()
+                .downcast_ref::<BooleanArray>()
+                .unwrap();
+
+            if kept_batch.num_rows() == 0 {
+                return Ok(kept_batch);
+            }
 
-            if null_joined_batch.num_rows() == 0 {
-                return Ok(filtered_record_batch);
+            // All rows passed the filter — no null-joining needed
+            if kept_corrected.true_count() == kept_corrected.len() {
+                return Ok(kept_batch);
             }
 
-            // Create null columns for right side
-            let null_joined_streamed_batch = create_null_joined_batch(
-                &null_joined_batch,
-                buffered_schema,
-                JoinSide::Left,
-                join_type,
-                schema,
-            )?;
-
-            Ok(concat_batches(
-                schema,
-                &[filtered_record_batch, null_joined_streamed_batch],
-            )?)
-        }
-        JoinType::LeftSemi
-        | JoinType::LeftAnti
-        | JoinType::RightSemi
-        | JoinType::RightAnti => {
-            // For semi/anti joins, project to only include the outer side 
columns
-            // Both Left and Right semi/anti use streamed_schema.len() because:
-            // - For Left: columns are [left, right], so we take first 
streamed_schema.len()
-            // - For Right: columns are [right, left], and streamed side is 
right, so we take first streamed_schema.len()
-            let output_column_indices: Vec<usize> =
-                (0..streamed_schema.fields().len()).collect();
-            Ok(filtered_record_batch.project(&output_column_indices)?)
-        }
-        JoinType::Right | JoinType::RightMark => {
-            // For right joins, add null-joined rows where mask is false
-            let null_mask = compute::not(corrected_mask)?;
-            let null_joined_batch = filter_record_batch(record_batch, 
&null_mask)?;
+            // For false entries: replace the non-preserved side with nulls.
+            // This preserves row ordering unlike filter+concat.
+            let (null_side_start, null_side_len) = match join_type {
+                JoinType::Left => {
+                    // Left join: null out right (buffered) columns
+                    let left_cols =
+                        schema.fields().len() - buffered_schema.fields().len();
+                    (left_cols, buffered_schema.fields().len())
+                }
+                JoinType::Right => {
+                    // Right join: null out left (buffered) columns
+                    (0, buffered_schema.fields().len())
+                }
+                JoinType::Full => {
+                    // Full join: null out right (buffered) columns for
+                    // streamed-side unmatched rows

Review Comment:
   I added a clarifying comment. The old comment "add null-joined rows for both 
sides" was describing the full join behavior holistically across the whole 
algorithm, while the new comment is scoped to what this specific code block 
does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to