comphead commented on code in PR #20806: URL: https://github.com/apache/datafusion/pull/20806#discussion_r2989491999
########## datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs: ########## @@ -0,0 +1,1265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort-merge join stream specialized for semi/anti joins. +//! +//! Instantiated by [`SortMergeJoinExec`](crate::joins::sort_merge_join::SortMergeJoinExec) +//! when the join type is `LeftSemi`, `LeftAnti`, `RightSemi`, or `RightAnti`. +//! +//! # Motivation +//! +//! The general-purpose `SortMergeJoinStream` +//! handles semi/anti joins by materializing `(outer, inner)` row pairs, +//! applying a filter, then using a "corrected filter mask" to deduplicate. +//! Semi/anti joins only need a boolean per outer row (does a match exist?), +//! not pairs. The pair-based approach incurs unnecessary memory allocation +//! and intermediate batches. +//! +//! This stream instead tracks matches with a per-outer-batch bitset, +//! avoiding all pair materialization. +//! +//! # "Outer Side" vs "Inner Side" +//! +//! For `Left*` join types, left is outer and right is inner. +//! For `Right*` join types, right is outer and left is inner. +//! The output schema always equals the outer side's schema. +//! +//! # Algorithm +//! +//! Both inputs must be sorted by the join keys. The stream performs a merge +//! scan across the two sorted inputs: +//! +//! ```text +//! outer cursor ──► [1, 2, 2, 3, 5, 5, 7] +//! inner cursor ──► [2, 2, 4, 5, 6, 7, 7] +//! ▲ +//! compare keys at cursors +//! ``` +//! +//! At each step, the keys at the outer and inner cursors are compared: +//! +//! - **outer < inner**: Skip the outer key group (no match exists). +//! - **outer > inner**: Skip the inner key group. +//! - **outer == inner**: Process the match (see below). +//! +//! Key groups are contiguous runs of equal keys within one side. The scan +//! advances past entire groups at each step. +//! +//! ## Processing a key match +//! +//! **Without filter**: All outer rows in the key group are marked as matched. +//! +//! **With filter**: The inner key group is buffered (may span multiple inner +//! batches). For each buffered inner row, the filter is evaluated against the +//! outer key group as a batch. Results are OR'd into the matched bitset. A +//! short-circuit exits early when all outer rows in the group are matched. +//! +//! ```text +//! matched bitset: [0, 0, 1, 0, 0, ...] +//! ▲── one bit per outer row ──▲ +//! +//! On emit: +//! Semi → filter_record_batch(outer_batch, &matched) +//! Anti → filter_record_batch(outer_batch, &NOT(matched)) +//! ``` +//! +//! ## Batch boundaries +//! +//! Key groups can span batch boundaries on either side. The stream handles +//! this by detecting when a group extends to the end of a batch, loading the +//! next batch, and continuing if the key matches. The [`PendingBoundary`] enum +//! preserves loop context across async `Poll::Pending` re-entries. +//! +//! # Memory +//! +//! Memory usage is bounded and independent of total input size: +//! - One outer batch at a time (not tracked by reservation — single batch, +//! cannot be spilled since it's needed for filter evaluation) +//! - One inner batch at a time (streaming) +//! - `matched` bitset: one bit per outer row, re-allocated per batch +//! - Inner key group buffer: only for filtered joins, one key group at a time. +//! Tracked via `MemoryReservation`; spilled to disk when the memory pool +//! limit is exceeded. +//! - `BatchCoalescer`: output buffering to target batch size +//! +//! # Degenerate cases +//! +//! **Highly skewed key (filtered joins only):** When a filter is present, +//! the inner key group is buffered so each inner row can be evaluated +//! against the outer group. If one join key has N inner rows, all N rows +//! are held in memory simultaneously (or spilled to disk if the memory +//! pool limit is reached). With uniform key distribution this is small +//! (inner_rows / num_distinct_keys), but a single hot key can buffer +//! arbitrarily many rows. The no-filter path does not buffer inner +//! rows — it only advances the cursor — so it is unaffected. +//! +//! **Scalar broadcast during filter evaluation:** Each inner row is +//! broadcast to match the outer group length for filter evaluation, +//! allocating one array per inner row × filter column. This is inherent +//! to the `PhysicalExpr::evaluate(RecordBatch)` API, which does not +//! support scalar inputs directly. The total work is +//! O(inner_group × outer_group) per key, but with much lower constant +//! factor than the pair-materialization approach. + +use std::cmp::Ordering; +use std::fs::File; +use std::io::BufReader; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::RecordBatchStream; +use crate::joins::utils::{JoinFilter, compare_join_arrays}; +use crate::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, +}; +use crate::spill::spill_manager::SpillManager; +use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, RecordBatch}; +use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not}; +use arrow::datatypes::SchemaRef; +use arrow::ipc::reader::StreamReader; +use arrow::util::bit_chunk_iterator::UnalignedBitChunk; +use arrow::util::bit_util::apply_bitwise_binary_op; +use datafusion_common::{ + JoinSide, JoinType, NullEquality, Result, ScalarValue, internal_err, +}; +use datafusion_execution::SendableRecordBatchStream; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::memory_pool::MemoryReservation; +use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; + +use futures::{Stream, StreamExt, ready}; + +/// Evaluates join key expressions against a batch, returning one array per key. +fn evaluate_join_keys( + batch: &RecordBatch, + on: &[PhysicalExprRef], +) -> Result<Vec<ArrayRef>> { + on.iter() + .map(|expr| { + let num_rows = batch.num_rows(); + let val = expr.evaluate(batch)?; + val.into_array(num_rows) + }) + .collect() +} + +/// Find the first index in `key_arrays` starting from `from` where the key +/// differs from the key at `from`. Uses `compare_join_arrays` for zero-alloc +/// ordinal comparison. +/// +/// Optimized for join workloads: checks adjacent and boundary keys before +/// falling back to binary search, since most key groups are small (often 1). +fn find_key_group_end( + key_arrays: &[ArrayRef], + from: usize, + len: usize, + sort_options: &[SortOptions], + null_equality: NullEquality, +) -> Result<usize> { + let next = from + 1; + if next >= len { + return Ok(len); + } + + // Fast path: single-row group (common with unique keys). + if compare_join_arrays( + key_arrays, + from, + key_arrays, + next, + sort_options, + null_equality, + )? != Ordering::Equal + { + return Ok(next); + } + + // Check if the entire remaining batch shares this key. + let last = len - 1; + if compare_join_arrays( + key_arrays, + from, + key_arrays, + last, + sort_options, + null_equality, + )? == Ordering::Equal + { + return Ok(len); + } + + // Binary search the interior: key at `next` matches, key at `last` doesn't. + let mut lo = next + 1; + let mut hi = last; + while lo < hi { + let mid = lo + (hi - lo) / 2; + if compare_join_arrays( + key_arrays, + from, + key_arrays, + mid, + sort_options, + null_equality, + )? == Ordering::Equal + { + lo = mid + 1; + } else { + hi = mid; + } + } + Ok(lo) +} + +/// When an outer key group spans a batch boundary, the boundary loop emits +/// the current batch, then polls for the next. If that poll returns Pending, +/// `ready!` exits `poll_join` and we re-enter from the top on the next call. +/// Without this state, the new batch would be processed fresh by the +/// merge-scan — but inner already advanced past this key, so the matching +/// outer rows would be skipped via `Ordering::Less` and never marked. +/// +/// This enum carries the last key (as single-row sliced arrays) from the +/// previous batch so we can check whether the next batch continues the same +/// key group. Stored as `Option<PendingBoundary>`: `None` means normal +/// processing. +#[derive(Debug)] +enum PendingBoundary { + /// Resuming a no-filter boundary loop. + NoFilter { saved_keys: Vec<ArrayRef> }, + /// Resuming a filtered boundary loop. Inner key data remains in the + /// buffer (or spill file) for the resumed loop. + Filtered { saved_keys: Vec<ArrayRef> }, +} + +pub(crate) struct SemiAntiSortMergeJoinStream { + /// true for semi (emit matched), false for anti (emit unmatched) + is_semi: bool, + + // Input streams — in the nested-loop model that sort-merge join + // implements, "outer" is the driving loop and "inner" is probed for + // matches. The existing SortMergeJoinStream calls these "streamed" + // and "buffered" respectively. For Left* joins, outer=left; for + // Right* joins, outer=right. Output schema equals the outer side. + outer: SendableRecordBatchStream, + inner: SendableRecordBatchStream, + + // Current batches and cursor positions + outer_batch: Option<RecordBatch>, + outer_offset: usize, + outer_key_arrays: Vec<ArrayRef>, + inner_batch: Option<RecordBatch>, + inner_offset: usize, + inner_key_arrays: Vec<ArrayRef>, + + // Per-outer-batch match tracking, reused across batches. + // Bit-packed (not Vec<bool>) so that: + // - emit: finish() yields a BooleanBuffer directly (no packing iteration) + // - OR: apply_bitwise_binary_op ORs filter results in u64 chunks + // - count: UnalignedBitChunk::count_ones uses popcnt + matched: BooleanBufferBuilder, + + // Inner key group buffer: all inner rows sharing the current join key. + // Only populated when a filter is present. Unbounded — a single key + // with many inner rows will buffer them all. See "Degenerate cases" + // in exec.rs. Spilled to disk when memory reservation fails. + inner_key_buffer: Vec<RecordBatch>, + inner_key_spill: Option<RefCountedTempFile>, + + // True when buffer_inner_key_group returned Pending after partially + // filling inner_key_buffer. On re-entry, buffer_inner_key_group + // must skip clear() and resume from poll_next_inner_batch (the + // current inner_batch was already sliced and pushed before Pending). + buffering_inner_pending: bool, + + // Boundary re-entry state — see PendingBoundary doc comment. + pending_boundary: Option<PendingBoundary>, + + // Join condition + on_outer: Vec<PhysicalExprRef>, Review Comment: we still need ON physical expressions to recalculate inner/outer key arrays? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
