comphead commented on code in PR #20806:
URL: https://github.com/apache/datafusion/pull/20806#discussion_r2989458647


##########
datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs:
##########
@@ -0,0 +1,1265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort-merge join stream specialized for semi/anti joins.
+//!
+//! Instantiated by 
[`SortMergeJoinExec`](crate::joins::sort_merge_join::SortMergeJoinExec)
+//! when the join type is `LeftSemi`, `LeftAnti`, `RightSemi`, or `RightAnti`.
+//!
+//! # Motivation
+//!
+//! The general-purpose `SortMergeJoinStream`
+//! handles semi/anti joins by materializing `(outer, inner)` row pairs,
+//! applying a filter, then using a "corrected filter mask" to deduplicate.
+//! Semi/anti joins only need a boolean per outer row (does a match exist?),
+//! not pairs. The pair-based approach incurs unnecessary memory allocation
+//! and intermediate batches.
+//!
+//! This stream instead tracks matches with a per-outer-batch bitset,
+//! avoiding all pair materialization.
+//!
+//! # "Outer Side" vs "Inner Side"
+//!
+//! For `Left*` join types, left is outer and right is inner.
+//! For `Right*` join types, right is outer and left is inner.
+//! The output schema always equals the outer side's schema.
+//!
+//! # Algorithm
+//!
+//! Both inputs must be sorted by the join keys. The stream performs a merge
+//! scan across the two sorted inputs:
+//!
+//! ```text
+//!   outer cursor ──►  [1, 2, 2, 3, 5, 5, 7]
+//!   inner cursor ──►  [2, 2, 4, 5, 6, 7, 7]
+//!                       ▲
+//!                   compare keys at cursors
+//! ```
+//!
+//! At each step, the keys at the outer and inner cursors are compared:
+//!
+//! - **outer < inner**: Skip the outer key group (no match exists).
+//! - **outer > inner**: Skip the inner key group.
+//! - **outer == inner**: Process the match (see below).
+//!
+//! Key groups are contiguous runs of equal keys within one side. The scan
+//! advances past entire groups at each step.
+//!
+//! ## Processing a key match
+//!
+//! **Without filter**: All outer rows in the key group are marked as matched.
+//!
+//! **With filter**: The inner key group is buffered (may span multiple inner
+//! batches). For each buffered inner row, the filter is evaluated against the
+//! outer key group as a batch. Results are OR'd into the matched bitset. A
+//! short-circuit exits early when all outer rows in the group are matched.
+//!
+//! ```text
+//!   matched bitset:  [0, 0, 1, 0, 0, ...]
+//!                     ▲── one bit per outer row ──▲
+//!
+//!   On emit:
+//!     Semi  → filter_record_batch(outer_batch, &matched)
+//!     Anti  → filter_record_batch(outer_batch, &NOT(matched))
+//! ```
+//!
+//! ## Batch boundaries
+//!
+//! Key groups can span batch boundaries on either side. The stream handles
+//! this by detecting when a group extends to the end of a batch, loading the
+//! next batch, and continuing if the key matches. The [`PendingBoundary`] enum
+//! preserves loop context across async `Poll::Pending` re-entries.
+//!
+//! # Memory
+//!
+//! Memory usage is bounded and independent of total input size:
+//! - One outer batch at a time (not tracked by reservation — single batch,
+//!   cannot be spilled since it's needed for filter evaluation)
+//! - One inner batch at a time (streaming)
+//! - `matched` bitset: one bit per outer row, re-allocated per batch
+//! - Inner key group buffer: only for filtered joins, one key group at a time.
+//!   Tracked via `MemoryReservation`; spilled to disk when the memory pool
+//!   limit is exceeded.
+//! - `BatchCoalescer`: output buffering to target batch size
+//!
+//! # Degenerate cases
+//!
+//! **Highly skewed key (filtered joins only):** When a filter is present,
+//! the inner key group is buffered so each inner row can be evaluated
+//! against the outer group. If one join key has N inner rows, all N rows
+//! are held in memory simultaneously (or spilled to disk if the memory
+//! pool limit is reached). With uniform key distribution this is small
+//! (inner_rows / num_distinct_keys), but a single hot key can buffer
+//! arbitrarily many rows. The no-filter path does not buffer inner
+//! rows — it only advances the cursor — so it is unaffected.
+//!
+//! **Scalar broadcast during filter evaluation:** Each inner row is
+//! broadcast to match the outer group length for filter evaluation,
+//! allocating one array per inner row × filter column. This is inherent
+//! to the `PhysicalExpr::evaluate(RecordBatch)` API, which does not
+//! support scalar inputs directly. The total work is
+//! O(inner_group × outer_group) per key, but with much lower constant
+//! factor than the pair-materialization approach.
+
+use std::cmp::Ordering;
+use std::fs::File;
+use std::io::BufReader;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::RecordBatchStream;
+use crate::joins::utils::{JoinFilter, compare_join_arrays};
+use crate::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
+};
+use crate::spill::spill_manager::SpillManager;
+use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, 
RecordBatch};
+use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not};
+use arrow::datatypes::SchemaRef;
+use arrow::ipc::reader::StreamReader;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
+use arrow::util::bit_util::apply_bitwise_binary_op;
+use datafusion_common::{
+    JoinSide, JoinType, NullEquality, Result, ScalarValue, internal_err,
+};
+use datafusion_execution::SendableRecordBatchStream;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::memory_pool::MemoryReservation;
+use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
+
+use futures::{Stream, StreamExt, ready};
+
+/// Evaluates join key expressions against a batch, returning one array per 
key.
+fn evaluate_join_keys(
+    batch: &RecordBatch,
+    on: &[PhysicalExprRef],
+) -> Result<Vec<ArrayRef>> {
+    on.iter()
+        .map(|expr| {
+            let num_rows = batch.num_rows();
+            let val = expr.evaluate(batch)?;
+            val.into_array(num_rows)
+        })
+        .collect()
+}
+
+/// Find the first index in `key_arrays` starting from `from` where the key
+/// differs from the key at `from`. Uses `compare_join_arrays` for zero-alloc
+/// ordinal comparison.
+///
+/// Optimized for join workloads: checks adjacent and boundary keys before
+/// falling back to binary search, since most key groups are small (often 1).
+fn find_key_group_end(
+    key_arrays: &[ArrayRef],
+    from: usize,
+    len: usize,
+    sort_options: &[SortOptions],
+    null_equality: NullEquality,
+) -> Result<usize> {
+    let next = from + 1;
+    if next >= len {
+        return Ok(len);
+    }
+
+    // Fast path: single-row group (common with unique keys).
+    if compare_join_arrays(
+        key_arrays,
+        from,
+        key_arrays,
+        next,
+        sort_options,
+        null_equality,
+    )? != Ordering::Equal
+    {
+        return Ok(next);
+    }
+
+    // Check if the entire remaining batch shares this key.
+    let last = len - 1;
+    if compare_join_arrays(
+        key_arrays,
+        from,
+        key_arrays,
+        last,
+        sort_options,
+        null_equality,
+    )? == Ordering::Equal
+    {
+        return Ok(len);
+    }
+
+    // Binary search the interior: key at `next` matches, key at `last` 
doesn't.
+    let mut lo = next + 1;
+    let mut hi = last;
+    while lo < hi {
+        let mid = lo + (hi - lo) / 2;
+        if compare_join_arrays(
+            key_arrays,
+            from,
+            key_arrays,
+            mid,
+            sort_options,
+            null_equality,
+        )? == Ordering::Equal
+        {
+            lo = mid + 1;
+        } else {
+            hi = mid;
+        }
+    }
+    Ok(lo)
+}
+
+/// When an outer key group spans a batch boundary, the boundary loop emits
+/// the current batch, then polls for the next. If that poll returns Pending,
+/// `ready!` exits `poll_join` and we re-enter from the top on the next call.
+/// Without this state, the new batch would be processed fresh by the
+/// merge-scan — but inner already advanced past this key, so the matching
+/// outer rows would be skipped via `Ordering::Less` and never marked.
+///
+/// This enum carries the last key (as single-row sliced arrays) from the
+/// previous batch so we can check whether the next batch continues the same
+/// key group. Stored as `Option<PendingBoundary>`: `None` means normal
+/// processing.
+#[derive(Debug)]
+enum PendingBoundary {
+    /// Resuming a no-filter boundary loop.
+    NoFilter { saved_keys: Vec<ArrayRef> },
+    /// Resuming a filtered boundary loop. Inner key data remains in the
+    /// buffer (or spill file) for the resumed loop.
+    Filtered { saved_keys: Vec<ArrayRef> },
+}
+
+pub(crate) struct SemiAntiSortMergeJoinStream {
+    /// true for semi (emit matched), false for anti (emit unmatched)
+    is_semi: bool,
+
+    // Input streams — in the nested-loop model that sort-merge join
+    // implements, "outer" is the driving loop and "inner" is probed for
+    // matches. The existing SortMergeJoinStream calls these "streamed"
+    // and "buffered" respectively. For Left* joins, outer=left; for
+    // Right* joins, outer=right. Output schema equals the outer side.
+    outer: SendableRecordBatchStream,
+    inner: SendableRecordBatchStream,
+
+    // Current batches and cursor positions
+    outer_batch: Option<RecordBatch>,
+    outer_offset: usize,

Review Comment:
   is it a position inside current `outer_batch`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to