This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 51f13d7435 perf: specialized SemiAntiSortMergeJoinStream (#20806)
51f13d7435 is described below
commit 51f13d7435ccfd5ca38aa2d2d23ef0a04382ab74
Author: Matt Butrovich <[email protected]>
AuthorDate: Wed Mar 25 16:11:27 2026 -0400
perf: specialized SemiAntiSortMergeJoinStream (#20806)
## Which issue does this PR close?
- N/A.
## Rationale for this change
DataFusion's `SortMergeJoinExec` handles semi/anti joins by
materializing `(outer, inner)` row pairs, applying a filter, then
deduplicating with a corrected filter mask. Semi/anti joins only need a
boolean per outer row — not pairs. The pair-based approach allocates
unnecessary intermediate batches and index arrays to materialize output.
Recent PRs have improved SMJ performance within the existing pair-based
framework — [#18875](https://github.com/apache/datafusion/pull/18875)
(BatchCoalescer to reduce concatenation overhead),
[#20463](https://github.com/apache/datafusion/pull/20463) (zero-copy
slice instead of take for contiguous indices),
[#20478](https://github.com/apache/datafusion/pull/20478) (cached row
counts to avoid O(n) recalculation) — but the fundamental mismatch
remains: semi/anti joins don't need pairs at all. I think we're hitting
diminishing returns on filtered semi/anti sort-merge joins (TPC-H Q21)
and need a specialized stream.
## What changes are included in this PR?
A new `SemiAntiSortMergeJoinStream` used internally by
`SortMergeJoinExec` for `LeftSemi`, `LeftAnti`, `RightSemi`, and
`RightAnti` joins. When `SortMergeJoinExec::execute()` encounters a
semi/anti join type, it instantiates this stream instead of the existing
`SortMergeJoinStream`. This is transparent to the rest of the system —
no planner changes, no config flags, no new operators.
Instead of materializing row pairs, the stream maintains a
per-outer-batch bitset (`BooleanBufferBuilder`) recording which outer
rows have a matching inner row, then emits output via
`filter_record_batch`.
**Algorithm:** Merge-scan across two sorted inputs. On key match without
filter, set matched bits for the outer key group. With filter, buffer
the inner key group and evaluate the filter as outer_slice ×
inner_scalar, OR-ing results into the bitset with
`apply_bitwise_binary_op` (64 bits per iteration). Short-circuit when
all outer rows in the group are matched.
**Memory management:** The inner key group buffer is tracked via
`MemoryReservation` and spilled to disk (via `SpillManager`) when the
memory pool limit is exceeded, matching existing `SortMergeJoinExec`
behavior. Metrics include `peak_mem_used`, `spill_count`,
`spilled_bytes`, and `spilled_rows`.
**Benchmark results** (best of 3, `dfbench smj`):
| Query | Type | Old (ms) | New (ms) | Speedup |
|-------|------|----------|----------|---------|
| Q10 | LEFT SEMI, no filter | 4.79 | 4.27 | 1.12x |
| Q11 | LEFT SEMI, 1% filter | 3.00 | 2.30 | 1.31x |
| Q12 | LEFT SEMI, 50% filter | 38.1 | 5.52 | 6.9x |
| Q13 | LEFT SEMI, 90% filter | 66.9 | 10.2 | 6.6x |
| Q14 | LEFT ANTI, no filter | 4.96 | 4.13 | 1.20x |
| Q15 | LEFT ANTI, partial match | 4.80 | 4.22 | 1.14x |
| Q16 | LEFT ANTI, stress | 1.63 | 1.64 | 1.00x |
| Q18 | LEFT SEMI, 2% filter | 7.61 | 5.34 | 1.42x |
| Q19 | LEFT ANTI, partial match | 24.1 | 21.8 | 1.10x |
Non-semi/anti queries are unaffected (same stream as before).
## Are these changes tested?
- 5 stream-level unit tests covering async re-entry (`PendingStream`),
batch boundary handling, filtered joins, and spill-to-disk edge cases
- Fuzz tests (`join_fuzz.rs`) compare `SortMergeJoinExec` output against
`HashJoinExec` for all semi/anti join types, with and without filters,
across multiple batch sizes and sort key combinations. Ran 1000+
iterations locally with random seeds.
- Existing SMJ unit tests in `sort_merge_join/tests.rs` continue to
exercise semi/anti join types through `SortMergeJoinExec`, now hitting
the new stream
- Existing `sort_merge_join.slt` sqllogic tests pass (the stream change
is transparent to the SQL layer)
- I ported the operator to Comet
https://github.com/apache/datafusion-comet/pull/3648 and saw good
speedup with TPC-H Q21 (SF10 on my laptop):
**Current operator:**
<img width="1414" height="1158" alt="Screenshot 2026-03-08 at 5 38 17
PM"
src="https://github.com/user-attachments/assets/399c97e3-ef90-49c8-9c8d-220427636840"
/>
**PR #20806:**
<img width="1340" height="890" alt="Screenshot 2026-03-08 at 5 38 10 PM"
src="https://github.com/user-attachments/assets/f57d331b-9894-4290-85b3-6526b04a8a61"
/>
(metrics and spilling were not hooked up in the version I ported to
Comet, but this query does not spill anyway)
## Are there any user-facing changes?
No. The new stream is used automatically for semi/anti sort-merge joins.
There are no new config flags, no API changes, and no changes to query
plans.
**Known limitations to address in follow-up PRs:**
- Remove semi/anti-specific logic from the existing
`SortMergeJoinStream` (dead code for those join types now).
- I am keeping an eye on
https://github.com/apache/datafusion/pull/20455.
---
benchmarks/src/smj.rs | 1 +
datafusion/physical-plan/src/joins/mod.rs | 1 +
.../src/joins/semi_anti_sort_merge_join/mod.rs | 25 +
.../src/joins/semi_anti_sort_merge_join/stream.rs | 1276 ++++++++++++++++++++
.../src/joins/semi_anti_sort_merge_join/tests.rs | 801 ++++++++++++
.../src/joins/sort_merge_join/exec.rs | 74 +-
.../src/joins/sort_merge_join/tests.rs | 30 +-
7 files changed, 2178 insertions(+), 30 deletions(-)
diff --git a/benchmarks/src/smj.rs b/benchmarks/src/smj.rs
index 5056fd5096..f0ee1dd0cd 100644
--- a/benchmarks/src/smj.rs
+++ b/benchmarks/src/smj.rs
@@ -277,6 +277,7 @@ const SMJ_QUERIES: &[&str] = &[
WHERE EXISTS (
SELECT 1 FROM t2_sorted
WHERE t2_sorted.key = t1_sorted.key
+ AND t2_sorted.data <> t1_sorted.data
AND t2_sorted.data % 10 <> 0
)
"#,
diff --git a/datafusion/physical-plan/src/joins/mod.rs
b/datafusion/physical-plan/src/joins/mod.rs
index 2cdfa1e6ac..0a198b81a9 100644
--- a/datafusion/physical-plan/src/joins/mod.rs
+++ b/datafusion/physical-plan/src/joins/mod.rs
@@ -34,6 +34,7 @@ mod cross_join;
mod hash_join;
mod nested_loop_join;
mod piecewise_merge_join;
+pub(crate) mod semi_anti_sort_merge_join;
mod sort_merge_join;
mod stream_join_utils;
mod symmetric_hash_join;
diff --git
a/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/mod.rs
b/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/mod.rs
new file mode 100644
index 0000000000..5c5a2727b6
--- /dev/null
+++ b/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Specialized Sort Merge Join stream for Semi/Anti joins.
+//!
+//! Used internally by `SortMergeJoinExec` for semi/anti join types.
+
+pub(crate) mod stream;
+
+#[cfg(test)]
+mod tests;
diff --git
a/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs
b/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs
new file mode 100644
index 0000000000..40e2022d41
--- /dev/null
+++ b/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs
@@ -0,0 +1,1276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sort-merge join stream specialized for semi/anti joins.
+//!
+//! Instantiated by
[`SortMergeJoinExec`](crate::joins::sort_merge_join::SortMergeJoinExec)
+//! when the join type is `LeftSemi`, `LeftAnti`, `RightSemi`, or `RightAnti`.
+//!
+//! # Motivation
+//!
+//! The general-purpose `SortMergeJoinStream`
+//! handles semi/anti joins by materializing `(outer, inner)` row pairs,
+//! applying a filter, then using a "corrected filter mask" to deduplicate.
+//! Semi/anti joins only need a boolean per outer row (does a match exist?),
+//! not pairs. The pair-based approach incurs unnecessary memory allocation
+//! and intermediate batches.
+//!
+//! This stream instead tracks matches with a per-outer-batch bitset,
+//! avoiding all pair materialization.
+//!
+//! # "Outer Side" vs "Inner Side"
+//!
+//! For `Left*` join types, left is outer and right is inner.
+//! For `Right*` join types, right is outer and left is inner.
+//! The output schema always equals the outer side's schema.
+//!
+//! # Algorithm
+//!
+//! Both inputs must be sorted by the join keys. The stream performs a merge
+//! scan across the two sorted inputs:
+//!
+//! ```text
+//! outer cursor ──► [1, 2, 2, 3, 5, 5, 7]
+//! inner cursor ──► [2, 2, 4, 5, 6, 7, 7]
+//! ▲
+//! compare keys at cursors
+//! ```
+//!
+//! At each step, the keys at the outer and inner cursors are compared:
+//!
+//! - **outer < inner**: Skip the outer key group (no match exists).
+//! - **outer > inner**: Skip the inner key group.
+//! - **outer == inner**: Process the match (see below).
+//!
+//! Key groups are contiguous runs of equal keys within one side. The scan
+//! advances past entire groups at each step.
+//!
+//! ## Processing a key match
+//!
+//! **Without filter**: All outer rows in the key group are marked as matched.
+//!
+//! **With filter**: The inner key group is buffered (may span multiple inner
+//! batches). For each buffered inner row, the filter is evaluated against the
+//! outer key group as a batch. Results are OR'd into the matched bitset. A
+//! short-circuit exits early when all outer rows in the group are matched.
+//!
+//! ```text
+//! matched bitset: [0, 0, 1, 0, 0, ...]
+//! ▲── one bit per outer row ──▲
+//!
+//! On emit:
+//! Semi → filter_record_batch(outer_batch, &matched)
+//! Anti → filter_record_batch(outer_batch, &NOT(matched))
+//! ```
+//!
+//! ## Batch boundaries
+//!
+//! Key groups can span batch boundaries on either side. The stream handles
+//! this by detecting when a group extends to the end of a batch, loading the
+//! next batch, and continuing if the key matches. The [`PendingBoundary`] enum
+//! preserves loop context across async `Poll::Pending` re-entries.
+//!
+//! # Memory
+//!
+//! Memory usage is bounded and independent of total input size:
+//! - One outer batch at a time (not tracked by reservation — single batch,
+//! cannot be spilled since it's needed for filter evaluation)
+//! - One inner batch at a time (streaming)
+//! - `matched` bitset: one bit per outer row, re-allocated per batch
+//! - Inner key group buffer: only for filtered joins, one key group at a time.
+//! Tracked via `MemoryReservation`; spilled to disk when the memory pool
+//! limit is exceeded.
+//! - `BatchCoalescer`: output buffering to target batch size
+//!
+//! # Degenerate cases
+//!
+//! **Highly skewed key (filtered joins only):** When a filter is present,
+//! the inner key group is buffered so each inner row can be evaluated
+//! against the outer group. If one join key has N inner rows, all N rows
+//! are held in memory simultaneously (or spilled to disk if the memory
+//! pool limit is reached). With uniform key distribution this is small
+//! (inner_rows / num_distinct_keys), but a single hot key can buffer
+//! arbitrarily many rows. The no-filter path does not buffer inner
+//! rows — it only advances the cursor — so it is unaffected.
+//!
+//! **Scalar broadcast during filter evaluation:** Each inner row is
+//! broadcast to match the outer group length for filter evaluation,
+//! allocating one array per inner row × filter column. This is inherent
+//! to the `PhysicalExpr::evaluate(RecordBatch)` API, which does not
+//! support scalar inputs directly. The total work is
+//! O(inner_group × outer_group) per key, but with much lower constant
+//! factor than the pair-materialization approach.
+
+use std::cmp::Ordering;
+use std::fs::File;
+use std::io::BufReader;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::RecordBatchStream;
+use crate::joins::utils::{JoinFilter, compare_join_arrays};
+use crate::metrics::{
+ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
+};
+use crate::spill::spill_manager::SpillManager;
+use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder,
RecordBatch};
+use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not};
+use arrow::datatypes::SchemaRef;
+use arrow::ipc::reader::StreamReader;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
+use arrow::util::bit_util::apply_bitwise_binary_op;
+use datafusion_common::{
+ JoinSide, JoinType, NullEquality, Result, ScalarValue, internal_err,
+};
+use datafusion_execution::SendableRecordBatchStream;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::memory_pool::MemoryReservation;
+use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
+
+use futures::{Stream, StreamExt, ready};
+
+/// Evaluates join key expressions against a batch, returning one array per
key.
+fn evaluate_join_keys(
+ batch: &RecordBatch,
+ on: &[PhysicalExprRef],
+) -> Result<Vec<ArrayRef>> {
+ on.iter()
+ .map(|expr| {
+ let num_rows = batch.num_rows();
+ let val = expr.evaluate(batch)?;
+ val.into_array(num_rows)
+ })
+ .collect()
+}
+
+/// Find the first index in `key_arrays` starting from `from` where the key
+/// differs from the key at `from`. Uses `compare_join_arrays` for zero-alloc
+/// ordinal comparison.
+///
+/// Optimized for join workloads: checks adjacent and boundary keys before
+/// falling back to binary search, since most key groups are small (often 1).
+fn find_key_group_end(
+ key_arrays: &[ArrayRef],
+ from: usize,
+ len: usize,
+ sort_options: &[SortOptions],
+ null_equality: NullEquality,
+) -> Result<usize> {
+ let next = from + 1;
+ if next >= len {
+ return Ok(len);
+ }
+
+ // Fast path: single-row group (common with unique keys).
+ if compare_join_arrays(
+ key_arrays,
+ from,
+ key_arrays,
+ next,
+ sort_options,
+ null_equality,
+ )? != Ordering::Equal
+ {
+ return Ok(next);
+ }
+
+ // Check if the entire remaining batch shares this key.
+ let last = len - 1;
+ if compare_join_arrays(
+ key_arrays,
+ from,
+ key_arrays,
+ last,
+ sort_options,
+ null_equality,
+ )? == Ordering::Equal
+ {
+ return Ok(len);
+ }
+
+ // Binary search the interior: key at `next` matches, key at `last`
doesn't.
+ let mut lo = next + 1;
+ let mut hi = last;
+ while lo < hi {
+ let mid = lo + (hi - lo) / 2;
+ if compare_join_arrays(
+ key_arrays,
+ from,
+ key_arrays,
+ mid,
+ sort_options,
+ null_equality,
+ )? == Ordering::Equal
+ {
+ lo = mid + 1;
+ } else {
+ hi = mid;
+ }
+ }
+ Ok(lo)
+}
+
+/// When an outer key group spans a batch boundary, the boundary loop emits
+/// the current batch, then polls for the next. If that poll returns Pending,
+/// `ready!` exits `poll_join` and we re-enter from the top on the next call.
+/// Without this state, the new batch would be processed fresh by the
+/// merge-scan — but inner already advanced past this key, so the matching
+/// outer rows would be skipped via `Ordering::Less` and never marked.
+///
+/// This enum carries the last key (as single-row sliced arrays) from the
+/// previous batch so we can check whether the next batch continues the same
+/// key group. Stored as `Option<PendingBoundary>`: `None` means normal
+/// processing.
+#[derive(Debug)]
+enum PendingBoundary {
+ /// Resuming a no-filter boundary loop.
+ NoFilter { saved_keys: Vec<ArrayRef> },
+ /// Resuming a filtered boundary loop. Inner key data remains in the
+ /// buffer (or spill file) for the resumed loop.
+ Filtered { saved_keys: Vec<ArrayRef> },
+}
+
+pub(crate) struct SemiAntiSortMergeJoinStream {
+ // Decomposed from JoinType to avoid matching on 4 variants in hot paths.
+ // true for semi (emit matched), false for anti (emit unmatched).
+ is_semi: bool,
+
+ // Input streams — in the nested-loop model that sort-merge join
+ // implements, "outer" is the driving loop and "inner" is probed for
+ // matches. The existing SortMergeJoinStream calls these "streamed"
+ // and "buffered" respectively. For Left* joins, outer=left; for
+ // Right* joins, outer=right. Output schema equals the outer side.
+ outer: SendableRecordBatchStream,
+ inner: SendableRecordBatchStream,
+
+ // Current batches and cursor positions within them
+ outer_batch: Option<RecordBatch>,
+ /// Row index into `outer_batch` — the next unprocessed outer row.
+ outer_offset: usize,
+ outer_key_arrays: Vec<ArrayRef>,
+ inner_batch: Option<RecordBatch>,
+ /// Row index into `inner_batch` — the next unprocessed inner row.
+ inner_offset: usize,
+ inner_key_arrays: Vec<ArrayRef>,
+
+ // Per-outer-batch match tracking, reused across batches.
+ // Bit-packed (not Vec<bool>) so that:
+ // - emit: finish() yields a BooleanBuffer directly (no packing iteration)
+ // - OR: apply_bitwise_binary_op ORs filter results in u64 chunks
+ // - count: UnalignedBitChunk::count_ones uses popcnt
+ matched: BooleanBufferBuilder,
+
+ // Inner key group buffer: all inner rows sharing the current join key.
+ // Only populated when a filter is present. Unbounded — a single key
+ // with many inner rows will buffer them all. See "Degenerate cases"
+ // in exec.rs. Spilled to disk when memory reservation fails.
+ inner_key_buffer: Vec<RecordBatch>,
+ inner_key_spill: Option<RefCountedTempFile>,
+
+ // True when buffer_inner_key_group returned Pending after partially
+ // filling inner_key_buffer. On re-entry, buffer_inner_key_group
+ // must skip clear() and resume from poll_next_inner_batch (the
+ // current inner_batch was already sliced and pushed before Pending).
+ buffering_inner_pending: bool,
+
+ // Boundary re-entry state — see PendingBoundary doc comment.
+ pending_boundary: Option<PendingBoundary>,
+
+ // Join ON expressions, evaluated against each new batch to produce
+ // the key arrays used for sorted key comparisons.
+ on_outer: Vec<PhysicalExprRef>,
+ on_inner: Vec<PhysicalExprRef>,
+ filter: Option<JoinFilter>,
+ sort_options: Vec<SortOptions>,
+ null_equality: NullEquality,
+ // Decomposed from JoinType: when RightSemi/RightAnti, outer=right,
+ // inner=left, so we swap sides when building the filter batch.
+ outer_is_left: bool,
+
+ // Output
+ coalescer: BatchCoalescer,
+ schema: SchemaRef,
+
+ // Metrics
+ join_time: crate::metrics::Time,
+ input_batches: Count,
+ input_rows: Count,
+ baseline_metrics: BaselineMetrics,
+ peak_mem_used: Gauge,
+
+ // Memory / spill — only the inner key buffer is tracked via reservation,
+ // matching existing SMJ (which tracks only the buffered side). The outer
+ // batch is a single batch at a time and cannot be spilled.
+ reservation: MemoryReservation,
+ spill_manager: SpillManager,
+ runtime_env: Arc<datafusion_execution::runtime_env::RuntimeEnv>,
+ inner_buffer_size: usize,
+
+ // True once the current outer batch has been emitted. The Equal
+ // branch's inner loops call emit then `ready!(poll_next_outer_batch)`.
+ // If that poll returns Pending, poll_join re-enters from the top
+ // on the next poll — with outer_batch still Some and outer_offset
+ // past the end. The main loop's step 3 would re-emit without this
+ // guard. Cleared when poll_next_outer_batch loads a new batch.
+ batch_emitted: bool,
+}
+
+impl SemiAntiSortMergeJoinStream {
+ #[expect(clippy::too_many_arguments)]
+ pub fn try_new(
+ schema: SchemaRef,
+ sort_options: Vec<SortOptions>,
+ null_equality: NullEquality,
+ outer: SendableRecordBatchStream,
+ inner: SendableRecordBatchStream,
+ on_outer: Vec<PhysicalExprRef>,
+ on_inner: Vec<PhysicalExprRef>,
+ filter: Option<JoinFilter>,
+ join_type: JoinType,
+ batch_size: usize,
+ partition: usize,
+ metrics: &ExecutionPlanMetricsSet,
+ reservation: MemoryReservation,
+ peak_mem_used: Gauge,
+ spill_manager: SpillManager,
+ runtime_env: Arc<datafusion_execution::runtime_env::RuntimeEnv>,
+ ) -> Result<Self> {
+ let is_semi = matches!(join_type, JoinType::LeftSemi |
JoinType::RightSemi);
+ let outer_is_left = matches!(join_type, JoinType::LeftSemi |
JoinType::LeftAnti);
+
+ let join_time = MetricBuilder::new(metrics).subset_time("join_time",
partition);
+ let input_batches =
+ MetricBuilder::new(metrics).counter("input_batches", partition);
+ let input_rows = MetricBuilder::new(metrics).counter("input_rows",
partition);
+ let baseline_metrics = BaselineMetrics::new(metrics, partition);
+
+ Ok(Self {
+ is_semi,
+ outer,
+ inner,
+ outer_batch: None,
+ outer_offset: 0,
+ outer_key_arrays: vec![],
+ inner_batch: None,
+ inner_offset: 0,
+ inner_key_arrays: vec![],
+ matched: BooleanBufferBuilder::new(0),
+ inner_key_buffer: vec![],
+ inner_key_spill: None,
+ buffering_inner_pending: false,
+ pending_boundary: None,
+ on_outer,
+ on_inner,
+ filter,
+ sort_options,
+ null_equality,
+ outer_is_left,
+ coalescer: BatchCoalescer::new(Arc::clone(&schema), batch_size)
+ .with_biggest_coalesce_batch_size(Some(batch_size / 2)),
+ schema,
+ join_time,
+ input_batches,
+ input_rows,
+ baseline_metrics,
+ peak_mem_used,
+ reservation,
+ spill_manager,
+ runtime_env,
+ inner_buffer_size: 0,
+ batch_emitted: false,
+ })
+ }
+
+ /// Resize the memory reservation to match current tracked usage.
+ fn try_resize_reservation(&mut self) -> Result<()> {
+ let needed = self.inner_buffer_size;
+ self.reservation.try_resize(needed)?;
+ self.peak_mem_used.set_max(self.reservation.size());
+ Ok(())
+ }
+
+ /// Spill the in-memory inner key buffer to disk and clear it.
+ fn spill_inner_key_buffer(&mut self) -> Result<()> {
+ let spill_file = self
+ .spill_manager
+ .spill_record_batch_and_finish(
+ &self.inner_key_buffer,
+ "semi_anti_smj_inner_key_spill",
+ )?
+ .expect("inner_key_buffer is non-empty when spilling");
+ self.inner_key_buffer.clear();
+ self.inner_buffer_size = 0;
+ self.inner_key_spill = Some(spill_file);
+ // Should succeed now — inner buffer has been spilled.
+ self.try_resize_reservation()
+ }
+
+ /// Clear inner key group state after processing. Does not resize the
+ /// reservation — the next key group will resize when buffering, or
+ /// the stream's Drop will free it. This avoids unnecessary memory
+ /// pool interactions (see apache/datafusion#20729).
+ fn clear_inner_key_group(&mut self) {
+ self.inner_key_buffer.clear();
+ self.inner_key_spill = None;
+ self.inner_buffer_size = 0;
+ }
+
+ /// Poll for the next outer batch. Returns true if a batch was loaded.
+ fn poll_next_outer_batch(&mut self, cx: &mut Context<'_>) ->
Poll<Result<bool>> {
+ loop {
+ match ready!(self.outer.poll_next_unpin(cx)) {
+ None => return Poll::Ready(Ok(false)),
+ Some(Err(e)) => return Poll::Ready(Err(e)),
+ Some(Ok(batch)) => {
+ let batch_num_rows = batch.num_rows();
+ self.input_batches.add(1);
+ self.input_rows.add(batch_num_rows);
+ if batch_num_rows == 0 {
+ continue;
+ }
+ let keys = evaluate_join_keys(&batch, &self.on_outer)?;
+ self.outer_batch = Some(batch);
+ self.outer_offset = 0;
+ self.outer_key_arrays = keys;
+ self.batch_emitted = false;
+ self.matched = BooleanBufferBuilder::new(batch_num_rows);
+ self.matched.append_n(batch_num_rows, false);
+ return Poll::Ready(Ok(true));
+ }
+ }
+ }
+ }
+
+ /// Poll for the next inner batch. Returns true if a batch was loaded.
+ fn poll_next_inner_batch(&mut self, cx: &mut Context<'_>) ->
Poll<Result<bool>> {
+ loop {
+ match ready!(self.inner.poll_next_unpin(cx)) {
+ None => return Poll::Ready(Ok(false)),
+ Some(Err(e)) => return Poll::Ready(Err(e)),
+ Some(Ok(batch)) => {
+ let batch_num_rows = batch.num_rows();
+ self.input_batches.add(1);
+ self.input_rows.add(batch_num_rows);
+ if batch_num_rows == 0 {
+ continue;
+ }
+ let keys = evaluate_join_keys(&batch, &self.on_inner)?;
+ self.inner_batch = Some(batch);
+ self.inner_offset = 0;
+ self.inner_key_arrays = keys;
+ return Poll::Ready(Ok(true));
+ }
+ }
+ }
+ }
+
+ /// Emit the current outer batch through the coalescer, applying the
+ /// matched bitset as a selection mask. No-op if already emitted
+ /// (see `batch_emitted` field).
+ fn emit_outer_batch(&mut self) -> Result<()> {
+ if self.batch_emitted {
+ return Ok(());
+ }
+ self.batch_emitted = true;
+
+ let batch = self.outer_batch.as_ref().unwrap();
+
+ // finish() converts the bit-packed builder directly to a
+ // BooleanBuffer — no iteration or repacking needed.
+ let selection = BooleanArray::new(self.matched.finish(), None);
+
+ let selection = if self.is_semi {
+ selection
+ } else {
+ not(&selection)?
+ };
+
+ let filtered = filter_record_batch(batch, &selection)?;
+ if filtered.num_rows() > 0 {
+ self.coalescer.push_batch(filtered)?;
+ }
+ Ok(())
+ }
+
+ /// Process a key match between outer and inner sides (no filter).
+ /// Sets matched bits for all outer rows sharing the current key.
+ fn process_key_match_no_filter(&mut self) -> Result<()> {
+ let outer_batch = self.outer_batch.as_ref().unwrap();
+ let num_outer = outer_batch.num_rows();
+
+ let outer_group_end = find_key_group_end(
+ &self.outer_key_arrays,
+ self.outer_offset,
+ num_outer,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+
+ for i in self.outer_offset..outer_group_end {
+ self.matched.set_bit(i, true);
+ }
+
+ self.outer_offset = outer_group_end;
+ Ok(())
+ }
+
+ /// Advance inner past the current key group. Returns Ok(true) if inner
+ /// is exhausted.
+ fn advance_inner_past_key_group(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<bool>> {
+ loop {
+ let inner_batch = match &self.inner_batch {
+ Some(b) => b,
+ None => return Poll::Ready(Ok(true)),
+ };
+ let num_inner = inner_batch.num_rows();
+
+ let group_end = find_key_group_end(
+ &self.inner_key_arrays,
+ self.inner_offset,
+ num_inner,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+
+ if group_end < num_inner {
+ self.inner_offset = group_end;
+ return Poll::Ready(Ok(false));
+ }
+
+ // Key group extends to end of batch — need to check next batch
+ let saved_inner_keys = slice_keys(&self.inner_key_arrays,
num_inner - 1);
+
+ match ready!(self.poll_next_inner_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => {
+ return Poll::Ready(Ok(true));
+ }
+ Ok(true) => {
+ if keys_match(
+ &saved_inner_keys,
+ &self.inner_key_arrays,
+ &self.sort_options,
+ self.null_equality,
+ )? {
+ continue;
+ } else {
+ return Poll::Ready(Ok(false));
+ }
+ }
+ }
+ }
+ }
+
+ /// Buffer inner key group for filter evaluation. Collects all inner rows
+ /// with the current key across batch boundaries.
+ ///
+ /// If poll_next_inner_batch returns Pending, we save progress via
+ /// buffering_inner_pending. On re-entry (from the Equal branch in
+ /// poll_join), we skip clear() and the slice+push for the current
+ /// batch (which was already buffered before Pending), and go directly
+ /// to polling for the next inner batch.
+ fn buffer_inner_key_group(&mut self, cx: &mut Context<'_>) ->
Poll<Result<bool>> {
+ // On re-entry after Pending: don't clear the partially-filled
+ // buffer. The current inner_batch was already sliced and pushed
+ // before Pending, so jump to polling for the next batch.
+ let mut resume_from_poll = false;
+ if self.buffering_inner_pending {
+ self.buffering_inner_pending = false;
+ resume_from_poll = true;
+ } else {
+ self.clear_inner_key_group();
+ }
+
+ loop {
+ let inner_batch = match &self.inner_batch {
+ Some(b) => b,
+ None => return Poll::Ready(Ok(true)),
+ };
+ let num_inner = inner_batch.num_rows();
+ let group_end = find_key_group_end(
+ &self.inner_key_arrays,
+ self.inner_offset,
+ num_inner,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+
+ if !resume_from_poll {
+ let slice =
+ inner_batch.slice(self.inner_offset, group_end -
self.inner_offset);
+ self.inner_buffer_size += slice.get_array_memory_size();
+ self.inner_key_buffer.push(slice);
+
+ // Reserve memory for the newly buffered slice. If the pool
+ // is exhausted, spill the entire buffer to disk.
+ if self.try_resize_reservation().is_err() {
+ if self.runtime_env.disk_manager.tmp_files_enabled() {
+ self.spill_inner_key_buffer()?;
+ } else {
+ // Re-attempt to get the error message
+ self.try_resize_reservation().map_err(|e| {
+
datafusion_common::DataFusionError::Execution(format!(
+ "{e}. Disk spilling disabled."
+ ))
+ })?;
+ }
+ }
+
+ if group_end < num_inner {
+ self.inner_offset = group_end;
+ return Poll::Ready(Ok(false));
+ }
+ }
+ resume_from_poll = false;
+
+ // Key group extends to end of batch — check next
+ let saved_inner_keys = slice_keys(&self.inner_key_arrays,
num_inner - 1);
+
+ // If poll returns Pending, the current batch is already
+ // in inner_key_buffer.
+ self.buffering_inner_pending = true;
+ match ready!(self.poll_next_inner_batch(cx)) {
+ Err(e) => {
+ self.buffering_inner_pending = false;
+ return Poll::Ready(Err(e));
+ }
+ Ok(false) => {
+ self.buffering_inner_pending = false;
+ return Poll::Ready(Ok(true));
+ }
+ Ok(true) => {
+ self.buffering_inner_pending = false;
+ if keys_match(
+ &saved_inner_keys,
+ &self.inner_key_arrays,
+ &self.sort_options,
+ self.null_equality,
+ )? {
+ continue;
+ } else {
+ return Poll::Ready(Ok(false));
+ }
+ }
+ }
+ }
+ }
+
+ /// Process a key match with a filter. For each inner row in the buffered
+ /// key group, evaluates the filter against the outer key group and ORs
+ /// the results into the matched bitset using u64-chunked bitwise ops.
+ fn process_key_match_with_filter(&mut self) -> Result<()> {
+ let filter = self.filter.as_ref().unwrap();
+ let outer_batch = self.outer_batch.as_ref().unwrap();
+ let num_outer = outer_batch.num_rows();
+
+ // buffer_inner_key_group must be called before this function
+ debug_assert!(
+ !self.inner_key_buffer.is_empty() ||
self.inner_key_spill.is_some(),
+ "process_key_match_with_filter called with no inner key data"
+ );
+ debug_assert!(
+ self.outer_offset < num_outer,
+ "outer_offset must be within the current batch"
+ );
+ debug_assert!(
+ self.matched.len() == num_outer,
+ "matched vector must be sized for the current outer batch"
+ );
+
+ let outer_group_end = find_key_group_end(
+ &self.outer_key_arrays,
+ self.outer_offset,
+ num_outer,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+ let outer_group_len = outer_group_end - self.outer_offset;
+ let outer_slice = outer_batch.slice(self.outer_offset,
outer_group_len);
+
+ // Count already-matched bits using popcnt on u64 chunks (zero-copy).
+ let mut matched_count = UnalignedBitChunk::new(
+ self.matched.as_slice(),
+ self.outer_offset,
+ outer_group_len,
+ )
+ .count_ones();
+
+ // Process spilled inner batches first (read back from disk).
+ if let Some(spill_file) = &self.inner_key_spill {
+ let file = BufReader::new(File::open(spill_file.path())?);
+ let reader = StreamReader::try_new(file, None)?;
+ for batch_result in reader {
+ let inner_slice = batch_result?;
+ matched_count = eval_filter_for_inner_slice(
+ self.outer_is_left,
+ filter,
+ &outer_slice,
+ &inner_slice,
+ &mut self.matched,
+ self.outer_offset,
+ outer_group_len,
+ matched_count,
+ )?;
+ if matched_count == outer_group_len {
+ break;
+ }
+ }
+ }
+
+ // Then process in-memory inner batches.
+ // evaluate_filter_for_inner_row is a free function (not &self method)
+ // so that Rust can split the struct borrow: &mut self.matched coexists
+ // with &self.inner_key_buffer and &self.filter inside this loop.
+ if matched_count < outer_group_len {
+ 'outer: for inner_slice in &self.inner_key_buffer {
+ matched_count = eval_filter_for_inner_slice(
+ self.outer_is_left,
+ filter,
+ &outer_slice,
+ inner_slice,
+ &mut self.matched,
+ self.outer_offset,
+ outer_group_len,
+ matched_count,
+ )?;
+ if matched_count == outer_group_len {
+ break 'outer;
+ }
+ }
+ }
+
+ self.outer_offset = outer_group_end;
+ Ok(())
+ }
+
+ /// Continue processing an outer key group that spans multiple outer
+ /// batches. Returns `true` if this outer batch was fully consumed
+ /// by the key group and the caller should load another.
+ fn resume_boundary(&mut self) -> Result<bool> {
+ debug_assert!(
+ self.outer_batch.is_some(),
+ "caller must load outer_batch first"
+ );
+ match self.pending_boundary.take() {
+ Some(PendingBoundary::NoFilter { saved_keys }) => {
+ let same_key = keys_match(
+ &saved_keys,
+ &self.outer_key_arrays,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+ if same_key {
+ self.process_key_match_no_filter()?;
+ let num_outer =
self.outer_batch.as_ref().unwrap().num_rows();
+ if self.outer_offset >= num_outer {
+ self.pending_boundary = Some(PendingBoundary::NoFilter
{
+ saved_keys: slice_keys(&self.outer_key_arrays,
num_outer - 1),
+ });
+ self.emit_outer_batch()?;
+ self.outer_batch = None;
+ return Ok(true);
+ }
+ }
+ }
+ Some(PendingBoundary::Filtered { saved_keys }) => {
+ debug_assert!(
+ !self.inner_key_buffer.is_empty() ||
self.inner_key_spill.is_some(),
+ "Filtered pending boundary entered but no inner key data
exists"
+ );
+ let same_key = keys_match(
+ &saved_keys,
+ &self.outer_key_arrays,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+ if same_key {
+ self.process_key_match_with_filter()?;
+ let num_outer =
self.outer_batch.as_ref().unwrap().num_rows();
+ if self.outer_offset >= num_outer {
+ self.pending_boundary = Some(PendingBoundary::Filtered
{
+ saved_keys: slice_keys(&self.outer_key_arrays,
num_outer - 1),
+ });
+ self.emit_outer_batch()?;
+ self.outer_batch = None;
+ return Ok(true);
+ }
+ }
+ self.clear_inner_key_group();
+ }
+ None => {}
+ }
+ Ok(false)
+ }
+
+ /// Main loop: drive the merge-scan to produce output batches.
+ fn poll_join(&mut self, cx: &mut Context<'_>) ->
Poll<Result<Option<RecordBatch>>> {
+ let join_time = self.join_time.clone();
+ let _timer = join_time.timer();
+
+ loop {
+ // 1. Ensure we have an outer batch
+ if self.outer_batch.is_none() {
+ match ready!(self.poll_next_outer_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => {
+ // Outer exhausted — flush coalescer
+ self.pending_boundary = None;
+ self.coalescer.finish_buffered_batch()?;
+ if let Some(batch) =
self.coalescer.next_completed_batch() {
+ return Poll::Ready(Ok(Some(batch)));
+ }
+ return Poll::Ready(Ok(None));
+ }
+ Ok(true) => {
+ if self.resume_boundary()? {
+ continue;
+ }
+ }
+ }
+ }
+
+ // 2. Ensure we have an inner batch (unless inner is exhausted).
+ // Skip this when resuming a pending boundary — inner was already
+ // advanced past the key group before the boundary loop started.
+ if self.inner_batch.is_none() && self.pending_boundary.is_none() {
+ match ready!(self.poll_next_inner_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => {
+ // Inner exhausted — emit remaining outer batches.
+ // For semi: no more matches possible.
+ // For anti: all remaining outer rows are unmatched.
+ self.emit_outer_batch()?;
+ self.outer_batch = None;
+
+ loop {
+ match ready!(self.poll_next_outer_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => break,
+ Ok(true) => {
+ self.emit_outer_batch()?;
+ self.outer_batch = None;
+ }
+ }
+ }
+
+ self.coalescer.finish_buffered_batch()?;
+ if let Some(batch) =
self.coalescer.next_completed_batch() {
+ return Poll::Ready(Ok(Some(batch)));
+ }
+ return Poll::Ready(Ok(None));
+ }
+ Ok(true) => {}
+ }
+ }
+
+ // 3. Main merge-scan loop
+ let outer_batch = self.outer_batch.as_ref().unwrap();
+ let num_outer = outer_batch.num_rows();
+
+ if self.outer_offset >= num_outer {
+ self.emit_outer_batch()?;
+ self.outer_batch = None;
+
+ if let Some(batch) = self.coalescer.next_completed_batch() {
+ return Poll::Ready(Ok(Some(batch)));
+ }
+ continue;
+ }
+
+ let inner_batch = match &self.inner_batch {
+ Some(b) => b,
+ None => {
+ self.emit_outer_batch()?;
+ self.outer_batch = None;
+ continue;
+ }
+ };
+ let num_inner = inner_batch.num_rows();
+
+ if self.inner_offset >= num_inner {
+ match ready!(self.poll_next_inner_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => {
+ self.inner_batch = None;
+ continue;
+ }
+ Ok(true) => continue,
+ }
+ }
+
+ // 4. Compare keys at current positions
+ let cmp = compare_join_arrays(
+ &self.outer_key_arrays,
+ self.outer_offset,
+ &self.inner_key_arrays,
+ self.inner_offset,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+
+ match cmp {
+ Ordering::Less => {
+ let group_end = find_key_group_end(
+ &self.outer_key_arrays,
+ self.outer_offset,
+ num_outer,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+ self.outer_offset = group_end;
+ }
+ Ordering::Greater => {
+ let group_end = find_key_group_end(
+ &self.inner_key_arrays,
+ self.inner_offset,
+ num_inner,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+ if group_end >= num_inner {
+ let saved_keys =
+ slice_keys(&self.inner_key_arrays, num_inner - 1);
+ match ready!(self.poll_next_inner_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => {
+ self.inner_batch = None;
+ continue;
+ }
+ Ok(true) => {
+ if keys_match(
+ &saved_keys,
+ &self.inner_key_arrays,
+ &self.sort_options,
+ self.null_equality,
+ )? {
+ match
ready!(self.advance_inner_past_key_group(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(_) => continue,
+ }
+ }
+ continue;
+ }
+ }
+ } else {
+ self.inner_offset = group_end;
+ }
+ }
+ Ordering::Equal => {
+ if self.filter.is_some() {
+ // Buffer inner key group (may span batches)
+ match ready!(self.buffer_inner_key_group(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(_inner_exhausted) => {}
+ }
+
+ // Process outer rows against buffered inner group
+ // (may need to handle outer batch boundary)
+ loop {
+ self.process_key_match_with_filter()?;
+
+ let outer_batch =
self.outer_batch.as_ref().unwrap();
+ if self.outer_offset >= outer_batch.num_rows() {
+ let saved_keys = slice_keys(
+ &self.outer_key_arrays,
+ outer_batch.num_rows() - 1,
+ );
+
+ self.emit_outer_batch()?;
+ debug_assert!(
+ !self.inner_key_buffer.is_empty()
+ || self.inner_key_spill.is_some(),
+ "Filtered pending boundary requires inner
key data in buffer or spill"
+ );
+ self.pending_boundary =
+ Some(PendingBoundary::Filtered {
saved_keys });
+
+ match ready!(self.poll_next_outer_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => {
+ self.pending_boundary = None;
+ self.outer_batch = None;
+ break;
+ }
+ Ok(true) => {
+ let Some(PendingBoundary::Filtered {
+ saved_keys,
+ }) = self.pending_boundary.take()
+ else {
+ unreachable!()
+ };
+ let same = keys_match(
+ &saved_keys,
+ &self.outer_key_arrays,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+ if same {
+ continue;
+ }
+ break;
+ }
+ }
+ } else {
+ break;
+ }
+ }
+
+ self.clear_inner_key_group();
+ } else {
+ // No filter: advance inner past key group, then
+ // mark all outer rows with this key as matched.
+ match ready!(self.advance_inner_past_key_group(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(_inner_exhausted) => {}
+ }
+
+ loop {
+ self.process_key_match_no_filter()?;
+
+ let num_outer =
self.outer_batch.as_ref().unwrap().num_rows();
+ if self.outer_offset >= num_outer {
+ let saved_keys =
+ slice_keys(&self.outer_key_arrays,
num_outer - 1);
+
+ self.emit_outer_batch()?;
+ self.pending_boundary =
+ Some(PendingBoundary::NoFilter {
saved_keys });
+
+ match ready!(self.poll_next_outer_batch(cx)) {
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(false) => {
+ self.pending_boundary = None;
+ self.outer_batch = None;
+ break;
+ }
+ Ok(true) => {
+ let Some(PendingBoundary::NoFilter {
+ saved_keys,
+ }) = self.pending_boundary.take()
+ else {
+ unreachable!()
+ };
+ let same_key = keys_match(
+ &saved_keys,
+ &self.outer_key_arrays,
+ &self.sort_options,
+ self.null_equality,
+ )?;
+ if same_key {
+ continue;
+ }
+ break;
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ // Check for completed coalescer batch
+ if let Some(batch) = self.coalescer.next_completed_batch() {
+ return Poll::Ready(Ok(Some(batch)));
+ }
+ }
+ }
+}
+
+/// Evaluate the filter for all rows in an inner slice against the outer group,
+/// OR-ing results into the matched bitset. Returns the updated matched count.
+/// Extracted as a free function so Rust can split borrows on the stream
struct.
+#[expect(clippy::too_many_arguments)]
+fn eval_filter_for_inner_slice(
+ outer_is_left: bool,
+ filter: &JoinFilter,
+ outer_slice: &RecordBatch,
+ inner_slice: &RecordBatch,
+ matched: &mut BooleanBufferBuilder,
+ outer_offset: usize,
+ outer_group_len: usize,
+ // Passed in to avoid recounting bits we just counted at the call site.
+ mut matched_count: usize,
+) -> Result<usize> {
+ debug_assert_eq!(
+ matched_count,
+ UnalignedBitChunk::new(matched.as_slice(), outer_offset,
outer_group_len)
+ .count_ones()
+ );
+ for inner_row in 0..inner_slice.num_rows() {
+ if matched_count == outer_group_len {
+ break;
+ }
+
+ let filter_result = evaluate_filter_for_inner_row(
+ outer_is_left,
+ filter,
+ outer_slice,
+ inner_slice,
+ inner_row,
+ )?;
+
+ // OR filter results into the matched bitset. Both sides are
+ // bit-packed [u8] buffers, so apply_bitwise_binary_op
+ // processes 64 bits per loop iteration (not 1 bit at a time).
+ //
+ // The offsets handle alignment: outer_offset is the bit
+ // position within matched where this key group starts,
+ // and filter_buf.offset() is the BooleanBuffer's internal
+ // bit offset (usually 0, but not guaranteed by Arrow).
+ let filter_buf = filter_result.values();
+ apply_bitwise_binary_op(
+ matched.as_slice_mut(),
+ outer_offset,
+ filter_buf.inner().as_slice(),
+ filter_buf.offset(),
+ outer_group_len,
+ |a, b| a | b,
+ );
+
+ // Recount matched bits after the OR. UnalignedBitChunk is
+ // zero-copy — it reads the bytes in place and uses popcnt.
+ matched_count =
+ UnalignedBitChunk::new(matched.as_slice(), outer_offset,
outer_group_len)
+ .count_ones();
+ }
+ Ok(matched_count)
+}
+
+/// Slice each key array to a single row at `idx`.
+fn slice_keys(keys: &[ArrayRef], idx: usize) -> Vec<ArrayRef> {
+ keys.iter().map(|a| a.slice(idx, 1)).collect()
+}
+
+/// Compare the first row of two key arrays using sort options to determine
+/// equality. The left side is expected to be single-row slices (from
+/// `slice_keys`); the right side can be any length (row 0 is compared).
+fn keys_match(
+ left_arrays: &[ArrayRef],
+ right_arrays: &[ArrayRef],
+ sort_options: &[SortOptions],
+ null_equality: NullEquality,
+) -> Result<bool> {
+ debug_assert!(left_arrays.iter().all(|a| a.len() == 1));
+ let cmp = compare_join_arrays(
+ left_arrays,
+ 0,
+ right_arrays,
+ 0,
+ sort_options,
+ null_equality,
+ )?;
+ Ok(cmp == Ordering::Equal)
+}
+
+/// Evaluate the join filter for one inner row against a slice of outer rows.
+///
+/// Free function (not a method on SemiAntiSortMergeJoinStream) so that Rust
+/// can split the struct borrow in process_key_match_with_filter: the caller
+/// holds &mut self.matched and &self.inner_key_buffer simultaneously, which
+/// is impossible if this borrows all of &self.
+fn evaluate_filter_for_inner_row(
+ outer_is_left: bool,
+ filter: &JoinFilter,
+ outer_slice: &RecordBatch,
+ inner_batch: &RecordBatch,
+ inner_idx: usize,
+) -> Result<BooleanArray> {
+ let num_outer_rows = outer_slice.num_rows();
+
+ // Build filter input columns in the order the filter expects
+ let mut columns: Vec<ArrayRef> =
Vec::with_capacity(filter.column_indices().len());
+ for col_idx in filter.column_indices() {
+ let (side_batch, side_idx) = if outer_is_left {
+ match col_idx.side {
+ JoinSide::Left => (outer_slice, None),
+ JoinSide::Right => (inner_batch, Some(inner_idx)),
+ JoinSide::None => {
+ return internal_err!("Unexpected JoinSide::None in
filter");
+ }
+ }
+ } else {
+ match col_idx.side {
+ JoinSide::Left => (inner_batch, Some(inner_idx)),
+ JoinSide::Right => (outer_slice, None),
+ JoinSide::None => {
+ return internal_err!("Unexpected JoinSide::None in
filter");
+ }
+ }
+ };
+
+ match side_idx {
+ None => {
+ columns.push(Arc::clone(side_batch.column(col_idx.index)));
+ }
+ Some(idx) => {
+ // Broadcasts inner scalar to N-element array. Arrow's
+ // BinaryExpr handles Scalar×Array natively via the Datum
+ // trait, but Column::evaluate always returns Array, so
+ // we'd need a custom expr to avoid this broadcast.
+ let scalar = ScalarValue::try_from_array(
+ side_batch.column(col_idx.index).as_ref(),
+ idx,
+ )?;
+ columns.push(scalar.to_array_of_size(num_outer_rows)?);
+ }
+ }
+ }
+
+ let filter_batch = RecordBatch::try_new(Arc::clone(filter.schema()),
columns)?;
+ let result = filter
+ .expression()
+ .evaluate(&filter_batch)?
+ .into_array(num_outer_rows)?;
+ let bool_arr = result
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .ok_or_else(|| {
+ datafusion_common::DataFusionError::Internal(
+ "Filter expression did not return BooleanArray".to_string(),
+ )
+ })?;
+ // Treat nulls as false
+ if bool_arr.null_count() > 0 {
+ Ok(arrow::compute::prep_null_mask_filter(bool_arr))
+ } else {
+ Ok(bool_arr.clone())
+ }
+}
+
+impl Stream for SemiAntiSortMergeJoinStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_join(cx).map(|result| result.transpose());
+ self.baseline_metrics.record_poll(poll)
+ }
+}
+
+impl RecordBatchStream for SemiAntiSortMergeJoinStream {
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
diff --git
a/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/tests.rs
b/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/tests.rs
new file mode 100644
index 0000000000..92dc78503a
--- /dev/null
+++ b/datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/tests.rs
@@ -0,0 +1,801 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use super::stream::SemiAntiSortMergeJoinStream;
+use crate::ExecutionPlan;
+use crate::RecordBatchStream;
+use crate::common;
+use crate::expressions::Column;
+use crate::joins::SortMergeJoinExec;
+use crate::joins::utils::{ColumnIndex, JoinFilter};
+use crate::metrics::ExecutionPlanMetricsSet;
+use crate::metrics::{MetricBuilder, SpillMetrics};
+use crate::spill::spill_manager::SpillManager;
+use crate::test::TestMemoryExec;
+
+use arrow::array::{Int32Array, RecordBatch};
+use arrow::compute::SortOptions;
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::JoinSide;
+use datafusion_common::JoinType::*;
+use datafusion_common::{NullEquality, Result};
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_expr::Operator;
+use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
+use futures::Stream;
+
+/// Create test memory/spill resources for stream-level tests.
+fn test_stream_resources(
+ inner_schema: SchemaRef,
+ metrics: &ExecutionPlanMetricsSet,
+) -> (
+ datafusion_execution::memory_pool::MemoryReservation,
+ crate::metrics::Gauge,
+ SpillManager,
+ Arc<datafusion_execution::runtime_env::RuntimeEnv>,
+) {
+ let ctx = TaskContext::default();
+ let runtime_env = ctx.runtime_env();
+ let reservation = MemoryConsumer::new("test").register(ctx.memory_pool());
+ let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", 0);
+ let spill_manager = SpillManager::new(
+ Arc::clone(&runtime_env),
+ SpillMetrics::new(metrics, 0),
+ inner_schema,
+ );
+ (reservation, peak_mem_used, spill_manager, runtime_env)
+}
+
+fn build_table(
+ a: (&str, &Vec<i32>),
+ b: (&str, &Vec<i32>),
+ c: (&str, &Vec<i32>),
+) -> Arc<dyn ExecutionPlan> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(a.0, DataType::Int32, false),
+ Field::new(b.0, DataType::Int32, false),
+ Field::new(c.0, DataType::Int32, false),
+ ]));
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(a.1.clone())),
+ Arc::new(Int32Array::from(b.1.clone())),
+ Arc::new(Int32Array::from(c.1.clone())),
+ ],
+ )
+ .unwrap();
+ TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
+}
+
+// ── Async re-entry tests using PendingStream ──────────────────────────────
+// reproduce bugs that only manifest when async input streams return
+// Poll::Pending at specific points.
+
+/// A RecordBatch stream that yields Poll::Pending once before delivering
+/// each batch at a specified index. This simulates the behavior of
+/// repartitioned tokio::sync::mpsc channels where data isn't immediately
+/// available.
+struct PendingStream {
+ batches: Vec<RecordBatch>,
+ index: usize,
+ /// If pending_before[i] is true, yield Pending once before delivering
+ /// the batch at index i.
+ pending_before: Vec<bool>,
+ /// True if we've already yielded Pending for the current index.
+ yielded_pending: bool,
+ schema: SchemaRef,
+}
+
+impl PendingStream {
+ fn new(batches: Vec<RecordBatch>, pending_before: Vec<bool>) -> Self {
+ assert_eq!(batches.len(), pending_before.len());
+ let schema = batches[0].schema();
+ Self {
+ batches,
+ index: 0,
+ pending_before,
+ yielded_pending: false,
+ schema,
+ }
+ }
+}
+
+impl Stream for PendingStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if self.index >= self.batches.len() {
+ return Poll::Ready(None);
+ }
+ if self.pending_before[self.index] && !self.yielded_pending {
+ self.yielded_pending = true;
+ cx.waker().wake_by_ref();
+ return Poll::Pending;
+ }
+ self.yielded_pending = false;
+ let batch = self.batches[self.index].clone();
+ self.index += 1;
+ Poll::Ready(Some(Ok(batch)))
+ }
+}
+
+impl RecordBatchStream for PendingStream {
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
+
+/// Helper: collect all output from a SemiAntiSortMergeJoinStream.
+async fn collect_stream(stream: SemiAntiSortMergeJoinStream) ->
Result<Vec<RecordBatch>> {
+ common::collect(Box::pin(stream)).await
+}
+
+/// Reproduces the buffer_inner_key_group re-entry bug:
+///
+/// When buffer_inner_key_group buffers inner rows across batch boundaries
+/// and poll_next_inner_batch returns Pending mid-way, the ready! macro
+/// exits poll_join. On re-entry, the merge-scan reaches Equal again and
+/// calls buffer_inner_key_group a second time — which starts with
+/// clear(), destroying the partially collected inner rows. Previously
+/// consumed batches are gone, so re-buffering misses them.
+///
+/// Setup:
+/// - Inner: 3 single-row batches, all with key=1, filter values c2=[10, 20,
30]
+/// - Outer: 1 row, key=1, filter value c1=10
+/// - Filter: c1 == c2 (only first inner row c2=10 matches)
+/// - Pending injected before 3rd inner batch
+///
+/// Without the bug: outer row emitted (match via c2=10)
+/// With the bug: outer row missing (c2=10 batch lost on re-entry)
+#[tokio::test]
+async fn filter_buffer_pending_loses_inner_rows() -> Result<()> {
+ let left_schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c1", DataType::Int32, false),
+ ]));
+ let right_schema = Arc::new(Schema::new(vec![
+ Field::new("a2", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]));
+
+ // Outer: 1 row, key=1, c1=10
+ let outer_batch = RecordBatch::try_new(
+ Arc::clone(&left_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![1])), // join key
+ Arc::new(Int32Array::from(vec![10])), // filter value
+ ],
+ )?;
+
+ // Inner: 3 single-row batches, key=1, c2=[10, 20, 30]
+ let inner_batch1 = RecordBatch::try_new(
+ Arc::clone(&right_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![100])),
+ Arc::new(Int32Array::from(vec![1])), // join key
+ Arc::new(Int32Array::from(vec![10])), // matches filter
+ ],
+ )?;
+ let inner_batch2 = RecordBatch::try_new(
+ Arc::clone(&right_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![200])),
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![20])), // doesn't match
+ ],
+ )?;
+ let inner_batch3 = RecordBatch::try_new(
+ Arc::clone(&right_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![300])),
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![30])), // doesn't match
+ ],
+ )?;
+
+ let outer: SendableRecordBatchStream = Box::pin(PendingStream::new(
+ vec![outer_batch],
+ vec![false], // outer delivers immediately
+ ));
+ let inner: SendableRecordBatchStream = Box::pin(PendingStream::new(
+ vec![inner_batch1, inner_batch2, inner_batch3],
+ vec![false, false, true], // Pending before 3rd batch
+ ));
+
+ // Filter: c1 == c2
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("c1", 0)),
+ Operator::Eq,
+ Arc::new(Column::new("c2", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ])),
+ );
+
+ let on_outer: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+ let on_inner: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+
+ let metrics = ExecutionPlanMetricsSet::new();
+ let inner_schema = inner.schema();
+ let (reservation, peak_mem_used, spill_manager, runtime_env) =
+ test_stream_resources(inner_schema, &metrics);
+ let stream = SemiAntiSortMergeJoinStream::try_new(
+ left_schema, // output schema = outer schema for semi
+ vec![SortOptions::default()],
+ NullEquality::NullEqualsNothing,
+ outer,
+ inner,
+ on_outer,
+ on_inner,
+ Some(filter),
+ LeftSemi,
+ 8192,
+ 0,
+ &metrics,
+ reservation,
+ peak_mem_used,
+ spill_manager,
+ runtime_env,
+ )?;
+
+ let batches = collect_stream(stream).await?;
+ let total: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(
+ total, 1,
+ "LeftSemi with filter: outer row should be emitted because \
+ inner row c2=10 matches filter c1==c2. Got {total} rows."
+ );
+ Ok(())
+}
+
+/// Reproduces the no-filter boundary Pending re-entry bug:
+///
+/// When an outer key group spans a batch boundary, the no-filter path
+/// emits the current batch, then polls for the next outer batch. If
+/// poll returns Pending, poll_join exits. On re-entry, without the
+/// PendingBoundary fix, the new batch is processed fresh by the
+/// merge-scan. Since inner already advanced past this key, the outer
+/// rows with the matching key are skipped via Ordering::Less.
+///
+/// Setup:
+/// - Outer: 2 single-row batches, both with key=1 (key group spans boundary)
+/// - Inner: 1 row with key=1
+/// - Pending injected on outer before 2nd batch
+///
+/// Without fix: only first outer row emitted (second lost on re-entry)
+/// With fix: both outer rows emitted
+#[tokio::test]
+async fn no_filter_boundary_pending_loses_outer_rows() -> Result<()> {
+ let left_schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c1", DataType::Int32, false),
+ ]));
+ let right_schema = Arc::new(Schema::new(vec![
+ Field::new("a2", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]));
+
+ // Outer: 2 single-row batches, both key=1
+ let outer_batch1 = RecordBatch::try_new(
+ Arc::clone(&left_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![10])),
+ ],
+ )?;
+ let outer_batch2 = RecordBatch::try_new(
+ Arc::clone(&left_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![2])),
+ Arc::new(Int32Array::from(vec![1])), // same key
+ Arc::new(Int32Array::from(vec![20])),
+ ],
+ )?;
+
+ // Inner: 1 row, key=1
+ let inner_batch = RecordBatch::try_new(
+ Arc::clone(&right_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![100])),
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![50])),
+ ],
+ )?;
+
+ let outer: SendableRecordBatchStream = Box::pin(PendingStream::new(
+ vec![outer_batch1, outer_batch2],
+ vec![false, true], // Pending before 2nd outer batch
+ ));
+ let inner: SendableRecordBatchStream =
+ Box::pin(PendingStream::new(vec![inner_batch], vec![false]));
+
+ let on_outer: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+ let on_inner: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+
+ let metrics = ExecutionPlanMetricsSet::new();
+ let inner_schema = inner.schema();
+ let (reservation, peak_mem_used, spill_manager, runtime_env) =
+ test_stream_resources(inner_schema, &metrics);
+ let stream = SemiAntiSortMergeJoinStream::try_new(
+ left_schema,
+ vec![SortOptions::default()],
+ NullEquality::NullEqualsNothing,
+ outer,
+ inner,
+ on_outer,
+ on_inner,
+ None, // no filter
+ LeftSemi,
+ 8192,
+ 0,
+ &metrics,
+ reservation,
+ peak_mem_used,
+ spill_manager,
+ runtime_env,
+ )?;
+
+ let batches = collect_stream(stream).await?;
+ let total: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(
+ total, 2,
+ "LeftSemi no filter: both outer rows (key=1) should be emitted \
+ because inner has key=1. Got {total} rows."
+ );
+ Ok(())
+}
+
+/// Tests the filtered boundary Pending re-entry: outer key group spans
+/// batches with a filter, and poll_next_outer_batch returns Pending.
+///
+/// Setup:
+/// - Outer: 2 single-row batches, both key=1, c1=[10, 20]
+/// - Inner: 1 row, key=1, c2=10
+/// - Filter: c1 == c2 (first outer row matches, second doesn't)
+/// - Pending before 2nd outer batch
+///
+/// Expected: 1 row (only the first outer row c1=10 passes the filter)
+#[tokio::test]
+async fn filtered_boundary_pending_outer_rows() -> Result<()> {
+ let left_schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c1", DataType::Int32, false),
+ ]));
+ let right_schema = Arc::new(Schema::new(vec![
+ Field::new("a2", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]));
+
+ let outer_batch1 = RecordBatch::try_new(
+ Arc::clone(&left_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![10])), // matches filter
+ ],
+ )?;
+ let outer_batch2 = RecordBatch::try_new(
+ Arc::clone(&left_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![2])),
+ Arc::new(Int32Array::from(vec![1])), // same key
+ Arc::new(Int32Array::from(vec![20])), // doesn't match
+ ],
+ )?;
+
+ let inner_batch = RecordBatch::try_new(
+ Arc::clone(&right_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![100])),
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![10])),
+ ],
+ )?;
+
+ let outer: SendableRecordBatchStream = Box::pin(PendingStream::new(
+ vec![outer_batch1, outer_batch2],
+ vec![false, true], // Pending before 2nd outer batch
+ ));
+ let inner: SendableRecordBatchStream =
+ Box::pin(PendingStream::new(vec![inner_batch], vec![false]));
+
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("c1", 0)),
+ Operator::Eq,
+ Arc::new(Column::new("c2", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ])),
+ );
+
+ let on_outer: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+ let on_inner: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+
+ let metrics = ExecutionPlanMetricsSet::new();
+ let inner_schema = inner.schema();
+ let (reservation, peak_mem_used, spill_manager, runtime_env) =
+ test_stream_resources(inner_schema, &metrics);
+ let stream = SemiAntiSortMergeJoinStream::try_new(
+ left_schema,
+ vec![SortOptions::default()],
+ NullEquality::NullEqualsNothing,
+ outer,
+ inner,
+ on_outer,
+ on_inner,
+ Some(filter),
+ LeftSemi,
+ 8192,
+ 0,
+ &metrics,
+ reservation,
+ peak_mem_used,
+ spill_manager,
+ runtime_env,
+ )?;
+
+ let batches = collect_stream(stream).await?;
+ let total: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(
+ total, 1,
+ "LeftSemi filtered boundary: only first outer row (c1=10) matches \
+ filter c1==c2. Got {total} rows."
+ );
+ Ok(())
+}
+
+// ==================== SPILL TESTS ====================
+
+/// Exercises inner key group spilling under memory pressure.
+///
+/// Uses a tiny memory limit (100 bytes) with disk spilling enabled. Since our
+/// operator only buffers inner rows when a filter is present, this test
includes
+/// a filter (c1 < c2, always true). Verifies:
+/// 1. Spill metrics are recorded (spill_count, spilled_bytes, spilled_rows >
0)
+/// 2. Results match a non-spilled run
+#[tokio::test]
+async fn spill_with_filter() -> Result<()> {
+ use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::disk_manager::{DiskManagerBuilder,
DiskManagerMode};
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+
+ let left = build_table(
+ ("a1", &vec![1, 2, 3, 4, 5, 6]),
+ ("b1", &vec![1, 2, 3, 4, 5, 6]),
+ ("c1", &vec![4, 5, 6, 7, 8, 9]),
+ );
+ let right = build_table(
+ ("a2", &vec![10, 20, 30, 40, 50]),
+ ("b1", &vec![1, 3, 4, 6, 8]),
+ ("c2", &vec![50, 60, 70, 80, 90]),
+ );
+ let on = vec![(
+ Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+ Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+ )];
+ let sort_options = vec![SortOptions::default(); on.len()];
+
+ // c1 < c2 is always true for matching keys
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("c1", 0)),
+ Operator::Lt,
+ Arc::new(Column::new("c2", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ])),
+ );
+
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .with_disk_manager_builder(
+
DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory),
+ )
+ .build_arc()?;
+
+ for batch_size in [1, 50] {
+ let session_config =
SessionConfig::default().with_batch_size(batch_size);
+
+ for join_type in [LeftSemi, LeftAnti, RightSemi, RightAnti] {
+ let task_ctx = Arc::new(
+ TaskContext::default()
+ .with_session_config(session_config.clone())
+ .with_runtime(Arc::clone(&runtime)),
+ );
+
+ let join = SortMergeJoinExec::try_new(
+ Arc::clone(&left),
+ Arc::clone(&right),
+ on.clone(),
+ Some(filter.clone()),
+ join_type,
+ sort_options.clone(),
+ NullEquality::NullEqualsNothing,
+ )?;
+ let stream = join.execute(0, task_ctx)?;
+ let spilled_result = common::collect(stream).await.unwrap();
+
+ assert!(
+ join.metrics().is_some(),
+ "metrics missing for {join_type:?}"
+ );
+ let metrics = join.metrics().unwrap();
+ assert!(
+ metrics.spill_count().unwrap() > 0,
+ "expected spill_count > 0 for {join_type:?},
batch_size={batch_size}"
+ );
+ assert!(
+ metrics.spilled_bytes().unwrap() > 0,
+ "expected spilled_bytes > 0 for {join_type:?},
batch_size={batch_size}"
+ );
+ assert!(
+ metrics.spilled_rows().unwrap() > 0,
+ "expected spilled_rows > 0 for {join_type:?},
batch_size={batch_size}"
+ );
+
+ // Run without spilling and compare results
+ let task_ctx_no_spill = Arc::new(
+
TaskContext::default().with_session_config(session_config.clone()),
+ );
+ let join_no_spill = SortMergeJoinExec::try_new(
+ Arc::clone(&left),
+ Arc::clone(&right),
+ on.clone(),
+ Some(filter.clone()),
+ join_type,
+ sort_options.clone(),
+ NullEquality::NullEqualsNothing,
+ )?;
+ let stream = join_no_spill.execute(0, task_ctx_no_spill)?;
+ let no_spill_result = common::collect(stream).await.unwrap();
+
+ let no_spill_metrics = join_no_spill.metrics().unwrap();
+ assert_eq!(
+ no_spill_metrics.spill_count(),
+ Some(0),
+ "unexpected spill for {join_type:?} without memory limit"
+ );
+
+ assert_eq!(
+ spilled_result, no_spill_result,
+ "spilled vs non-spilled results differ for {join_type:?},
batch_size={batch_size}"
+ );
+ }
+ }
+
+ Ok(())
+}
+
+/// Reproduces a bug where `resume_boundary` for the Filtered pending case
+/// only checks `inner_key_buffer.is_empty()` but ignores `inner_key_spill`.
+/// After spilling, the in-memory buffer is cleared while the spill file
+/// holds the data. If the outer key group spans a batch boundary, the
+/// second outer batch's rows are never evaluated against the inner group.
+///
+/// Setup:
+/// - Outer: 2 single-row batches, both key=1, c1=[10, 10]
+/// - Inner: 1 batch with many rows all key=1 (enough to trigger spill)
+/// - Filter: c1 == c2 (matches when c2=10)
+/// - Memory limit: tiny (100 bytes) to force spilling
+/// - Pending before 2nd outer batch to trigger boundary re-entry
+///
+/// Expected: both outer rows match (semi=2 rows, anti=0 rows)
+/// Bug: second outer row is skipped because resume_boundary sees empty
+/// inner_key_buffer and skips re-evaluation.
+#[tokio::test]
+async fn spill_filtered_boundary_loses_outer_rows() -> Result<()> {
+ use datafusion_execution::disk_manager::{DiskManagerBuilder,
DiskManagerMode};
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+
+ let left_schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c1", DataType::Int32, false),
+ ]));
+ let right_schema = Arc::new(Schema::new(vec![
+ Field::new("a2", DataType::Int32, false),
+ Field::new("b1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]));
+
+ // Two single-row outer batches with the same key — key group spans
boundary
+ let outer_batch1 = RecordBatch::try_new(
+ Arc::clone(&left_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(Int32Array::from(vec![1])), // key=1
+ Arc::new(Int32Array::from(vec![10])), // matches filter
+ ],
+ )?;
+ let outer_batch2 = RecordBatch::try_new(
+ Arc::clone(&left_schema),
+ vec![
+ Arc::new(Int32Array::from(vec![2])),
+ Arc::new(Int32Array::from(vec![1])), // same key=1
+ Arc::new(Int32Array::from(vec![10])), // also matches filter
+ ],
+ )?;
+
+ // Inner: many rows with key=1 to force spilling, followed by key=2.
+ // c2=10 so the filter c1==c2 passes for both outer rows.
+ // The key=2 row ensures the inner cursor advances past the key group
+ // (buffer_inner_key_group returns Ok(false) instead of Ok(true)).
+ let n_inner = 200;
+ let mut inner_a = vec![100; n_inner];
+ inner_a.push(101);
+ let mut inner_b = vec![1; n_inner];
+ inner_b.push(2); // different key — forces inner cursor past key=1
+ let mut inner_c = vec![10; n_inner];
+ inner_c.push(10);
+ let inner_batch = RecordBatch::try_new(
+ Arc::clone(&right_schema),
+ vec![
+ Arc::new(Int32Array::from(inner_a)),
+ Arc::new(Int32Array::from(inner_b)),
+ Arc::new(Int32Array::from(inner_c)),
+ ],
+ )?;
+
+ // Filter: c1 == c2
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("c1", 0)),
+ Operator::Eq,
+ Arc::new(Column::new("c2", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ])),
+ );
+
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .with_disk_manager_builder(
+
DiskManagerBuilder::default().with_mode(DiskManagerMode::OsTmpDirectory),
+ )
+ .build_arc()?;
+
+ let on_outer: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+ let on_inner: Vec<PhysicalExprRef> = vec![Arc::new(Column::new("b1", 1))];
+
+ for join_type in [LeftSemi, LeftAnti] {
+ let outer: SendableRecordBatchStream = Box::pin(PendingStream::new(
+ vec![outer_batch1.clone(), outer_batch2.clone()],
+ vec![false, true], // Pending before 2nd outer batch
+ ));
+ let inner: SendableRecordBatchStream =
+ Box::pin(PendingStream::new(vec![inner_batch.clone()],
vec![false]));
+
+ let metrics = ExecutionPlanMetricsSet::new();
+ let reservation =
MemoryConsumer::new("test").register(&runtime.memory_pool);
+ let peak_mem_used =
MetricBuilder::new(&metrics).gauge("peak_mem_used", 0);
+ let spill_manager = SpillManager::new(
+ Arc::clone(&runtime),
+ SpillMetrics::new(&metrics, 0),
+ Arc::clone(&right_schema),
+ );
+
+ let stream = SemiAntiSortMergeJoinStream::try_new(
+ Arc::clone(&left_schema),
+ vec![SortOptions::default()],
+ NullEquality::NullEqualsNothing,
+ outer,
+ inner,
+ on_outer.clone(),
+ on_inner.clone(),
+ Some(filter.clone()),
+ join_type,
+ 8192,
+ 0,
+ &metrics,
+ reservation,
+ peak_mem_used,
+ spill_manager,
+ Arc::clone(&runtime),
+ )?;
+
+ let batches = collect_stream(stream).await?;
+ let total: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+ match join_type {
+ LeftSemi => {
+ assert_eq!(
+ total, 2,
+ "LeftSemi spill+boundary: both outer rows match filter, \
+ expected 2 rows, got {total}"
+ );
+ }
+ LeftAnti => {
+ assert_eq!(
+ total, 0,
+ "LeftAnti spill+boundary: both outer rows match filter, \
+ expected 0 rows, got {total}"
+ );
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ Ok(())
+}
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
index ac077792f5..d5d1df4baf 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
@@ -25,6 +25,7 @@ use std::sync::Arc;
use crate::execution_plan::{EmissionType, boundedness_from_children};
use crate::expressions::PhysicalSortExpr;
+use
crate::joins::semi_anti_sort_merge_join::stream::SemiAntiSortMergeJoinStream;
use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics;
use crate::joins::sort_merge_join::stream::SortMergeJoinStream;
use crate::joins::utils::{
@@ -32,11 +33,12 @@ use crate::joins::utils::{
estimate_join_statistics, reorder_output_after_swap,
symmetric_join_output_partitioning,
};
-use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
SpillMetrics};
use crate::projection::{
ProjectionExec, join_allows_pushdown, join_table_borders,
new_join_children,
physical_to_column_exprs, update_join_on,
};
+use crate::spill::spill_manager::SpillManager;
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
PlanProperties, SendableRecordBatchStream, Statistics,
check_if_same_properties,
@@ -522,30 +524,62 @@ impl ExecutionPlan for SortMergeJoinExec {
let streamed = streamed.execute(partition, Arc::clone(&context))?;
let buffered = buffered.execute(partition, Arc::clone(&context))?;
- // create output buffer
let batch_size = context.session_config().batch_size();
-
- // create memory reservation
let reservation =
MemoryConsumer::new(format!("SMJStream[{partition}]"))
.register(context.memory_pool());
- // create join stream
- Ok(Box::pin(SortMergeJoinStream::try_new(
- context.session_config().spill_compression(),
- Arc::clone(&self.schema),
- self.sort_options.clone(),
- self.null_equality,
- streamed,
- buffered,
- on_streamed,
- on_buffered,
- self.filter.clone(),
+ if matches!(
self.join_type,
- batch_size,
- SortMergeJoinMetrics::new(partition, &self.metrics),
- reservation,
- context.runtime_env(),
- )?))
+ JoinType::LeftSemi
+ | JoinType::LeftAnti
+ | JoinType::RightSemi
+ | JoinType::RightAnti
+ ) {
+ let peak_mem_used =
+ MetricBuilder::new(&self.metrics).gauge("peak_mem_used",
partition);
+ let spill_manager = SpillManager::new(
+ context.runtime_env(),
+ SpillMetrics::new(&self.metrics, partition),
+ buffered.schema(),
+ )
+
.with_compression_type(context.session_config().spill_compression());
+
+ Ok(Box::pin(SemiAntiSortMergeJoinStream::try_new(
+ Arc::clone(&self.schema),
+ self.sort_options.clone(),
+ self.null_equality,
+ streamed,
+ buffered,
+ on_streamed,
+ on_buffered,
+ self.filter.clone(),
+ self.join_type,
+ batch_size,
+ partition,
+ &self.metrics,
+ reservation,
+ peak_mem_used,
+ spill_manager,
+ context.runtime_env(),
+ )?))
+ } else {
+ Ok(Box::pin(SortMergeJoinStream::try_new(
+ context.session_config().spill_compression(),
+ Arc::clone(&self.schema),
+ self.sort_options.clone(),
+ self.null_equality,
+ streamed,
+ buffered,
+ on_streamed,
+ on_buffered,
+ self.filter.clone(),
+ self.join_type,
+ batch_size,
+ SortMergeJoinMetrics::new(partition, &self.metrics),
+ reservation,
+ context.runtime_env(),
+ )?))
+ }
}
fn metrics(&self) -> Option<MetricsSet> {
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
index b16ad59abc..a1043a9d8b 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
@@ -1278,9 +1278,10 @@ async fn join_right_anti_output_two_batches() ->
Result<()> {
let (_, batches) =
join_collect_batch_size_equals_two(left, right, on, LeftAnti).await?;
- assert_eq!(batches.len(), 2);
- assert_eq!(batches[0].num_rows(), 2);
- assert_eq!(batches[1].num_rows(), 1);
+ // SemiAntiSortMergeJoinStream uses a coalescer, so batch boundaries differ
+ // from the old stream. Only assert data correctness.
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 3);
assert_snapshot!(batches_to_string(&batches), @r"
+----+----+----+
| a1 | b1 | c1 |
@@ -1566,9 +1567,10 @@ async fn join_right_semi_output_two_batches() ->
Result<()> {
"| 2 | 5 | 8 |",
"+----+----+----+",
];
- assert_eq!(batches.len(), 2);
- assert_eq!(batches[0].num_rows(), 2);
- assert_eq!(batches[1].num_rows(), 1);
+ // SemiAntiSortMergeJoinStream uses a coalescer, so batch boundaries differ
+ // from the old stream. Only assert data correctness.
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 3);
assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -2059,7 +2061,9 @@ async fn overallocation_single_batch_no_spill() ->
Result<()> {
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = vec![
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
+ // Semi/anti joins use SemiAntiSortMergeJoinStream which only tracks
+ // inner key buffer memory; tested in
semi_anti_sort_merge_join/tests.rs.
+ Inner, Left, Right, Full, LeftMark, RightMark,
];
// Disable DiskManager to prevent spilling
@@ -2140,7 +2144,9 @@ async fn overallocation_multi_batch_no_spill() ->
Result<()> {
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = vec![
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
+ // Semi/anti joins use SemiAntiSortMergeJoinStream which only tracks
+ // inner key buffer memory; tested in
semi_anti_sort_merge_join/tests.rs.
+ Inner, Left, Right, Full, LeftMark, RightMark,
];
// Disable DiskManager to prevent spilling
@@ -2200,7 +2206,9 @@ async fn overallocation_single_batch_spill() ->
Result<()> {
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = [
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
+ // Semi/anti joins use SemiAntiSortMergeJoinStream which only tracks
+ // inner key buffer memory; tested in
semi_anti_sort_merge_join/tests.rs.
+ Inner, Left, Right, Full, LeftMark, RightMark,
];
// Enable DiskManager to allow spilling
@@ -2304,7 +2312,9 @@ async fn overallocation_multi_batch_spill() -> Result<()>
{
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = [
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
+ // Semi/anti joins use SemiAntiSortMergeJoinStream which only tracks
+ // inner key buffer memory; tested in
semi_anti_sort_merge_join/tests.rs.
+ Inner, Left, Right, Full, LeftMark, RightMark,
];
// Enable DiskManager to allow spilling
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]