This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9660c98743 perf: Use zero-copy slice instead of take kernel in sort 
merge join (#20463)
9660c98743 is described below

commit 9660c9874315354ff22245699785f5f77841be80
Author: Andy Grove <[email protected]>
AuthorDate: Sun Feb 22 07:43:55 2026 -0700

    perf: Use zero-copy slice instead of take kernel in sort merge join (#20463)
    
    ## Summary
    
    Follows on from https://github.com/apache/datafusion/pull/20464 which
    adds new criterion benchmarks.
    
    - When the join indices form a contiguous ascending range (e.g.
    `[3,4,5,6]`), replace the O(n) Arrow `take` kernel with O(1)
    `RecordBatch::slice` (zero-copy pointer arithmetic)
    - Applies to both the streamed (left) and buffered (right) sides of the
    sort merge join
    
    ## Rationale
    
    In SMJ, the streamed side cursor advances sequentially, so its indices
    are almost always contiguous. The buffered side is scanned sequentially
    within each key group, so its indices are also contiguous for 1:1 and
    1:few joins. The `take` kernel allocates new arrays and copies data even
    when a simple slice would suffice.
    
    ## Benchmark Results
    
    Criterion micro-benchmark (100K rows, pre-sorted, no sort/scan
    overhead):
    
    | Benchmark | Baseline | Optimized | Improvement |
    |-----------|----------|-----------|-------------|
    | inner_1to1 (unique keys) | 5.11 ms | 3.88 ms | **-24%** |
    | inner_1to10 (10K keys) | 17.64 ms | 16.29 ms | **-8%** |
    | left_1to1_unmatched (5% unmatched) | 4.80 ms | 3.87 ms | **-19%** |
    | left_semi_1to10 (10K keys) | 3.65 ms | 3.11 ms | **-15%** |
    | left_anti_partial (partial match) | 3.58 ms | 3.43 ms | **-4%** |
    
    All improvements are statistically significant (p < 0.05).
    
    TPC-H SF1 with SMJ forced (`prefer_hash_join=false`) shows no
    regressions across all 22 queries, with modest end-to-end improvements
    on join-heavy queries (Q3 -7%, Q19 -5%, Q21 -2%).
    
    ## Implementation
    
    - `is_contiguous_range()`: checks if a `UInt64Array` is a contiguous
    ascending range. Uses quick endpoint rejection then verifies every
    element sequentially.
    - `freeze_streamed()`: uses `slice` instead of `take` for streamed
    (left) columns when indices are contiguous.
    - `fetch_right_columns_from_batch_by_idxs()`: uses `slice` instead of
    `take` for buffered (right) columns when indices are contiguous.
    
    When indices are not contiguous (e.g. repeated indices in many-to-many
    joins), falls back to the existing `take` path.
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../src/joins/sort_merge_join/stream.rs            | 66 ++++++++++++++++------
 1 file changed, 49 insertions(+), 17 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index edbf542005..e0498821eb 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -46,15 +46,13 @@ use crate::{PhysicalExpr, RecordBatchStream, 
SendableRecordBatchStream};
 use arrow::array::{types::UInt64Type, *};
 use arrow::compute::{
     self, BatchCoalescer, SortOptions, concat_batches, filter_record_batch, 
is_not_null,
-    take,
+    take, take_arrays,
 };
 use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
-use arrow::error::ArrowError;
 use arrow::ipc::reader::StreamReader;
 use datafusion_common::config::SpillCompression;
 use datafusion_common::{
-    DataFusionError, HashSet, JoinType, NullEquality, Result, exec_err, 
internal_err,
-    not_impl_err,
+    HashSet, JoinType, NullEquality, Result, exec_err, internal_err, 
not_impl_err,
 };
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::MemoryReservation;
@@ -1248,13 +1246,19 @@ impl SortMergeJoinStream {
                 continue;
             }
 
-            let mut left_columns = self
-                .streamed_batch
-                .batch
-                .columns()
-                .iter()
-                .map(|column| take(column, &left_indices, None))
-                .collect::<Result<Vec<_>, ArrowError>>()?;
+            let mut left_columns = if let Some(range) = 
is_contiguous_range(&left_indices)
+            {
+                // When indices form a contiguous range (common for the 
streamed
+                // side which advances sequentially), use zero-copy slice 
instead
+                // of the O(n) take kernel.
+                self.streamed_batch
+                    .batch
+                    .slice(range.start, range.len())
+                    .columns()
+                    .to_vec()
+            } else {
+                take_arrays(self.streamed_batch.batch.columns(), 
&left_indices, None)?
+            };
 
             // The row indices of joined buffered batch
             let right_indices: UInt64Array = chunk.buffered_indices.finish();
@@ -1577,6 +1581,30 @@ fn produce_buffered_null_batch(
     )?))
 }
 
+/// Checks if a `UInt64Array` contains a contiguous ascending range (e.g. 
\[3,4,5,6\]).
+/// Returns `Some(start..start+len)` if so, `None` otherwise.
+/// This allows replacing an O(n) `take` with an O(1) `slice`.
+#[inline]
+fn is_contiguous_range(indices: &UInt64Array) -> Option<Range<usize>> {
+    if indices.is_empty() || indices.null_count() > 0 {
+        return None;
+    }
+    let values = indices.values();
+    let start = values[0];
+    let len = values.len() as u64;
+    // Quick rejection: if last element doesn't match expected, not contiguous
+    if values[values.len() - 1] != start + len - 1 {
+        return None;
+    }
+    // Verify every element is sequential (handles duplicates and gaps)
+    for i in 1..values.len() {
+        if values[i] != start + i as u64 {
+            return None;
+        }
+    }
+    Some(start as usize..(start + len) as usize)
+}
+
 /// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]` by 
specific column indices
 #[inline(always)]
 fn fetch_right_columns_by_idxs(
@@ -1597,12 +1625,16 @@ fn fetch_right_columns_from_batch_by_idxs(
 ) -> Result<Vec<ArrayRef>> {
     match &buffered_batch.batch {
         // In memory batch
-        BufferedBatchState::InMemory(batch) => Ok(batch
-            .columns()
-            .iter()
-            .map(|column| take(column, &buffered_indices, None))
-            .collect::<Result<Vec<_>, ArrowError>>()
-            .map_err(Into::<DataFusionError>::into)?),
+        // In memory batch
+        BufferedBatchState::InMemory(batch) => {
+            // When indices form a contiguous range (common in SMJ since the
+            // buffered side is scanned sequentially), use zero-copy slice.
+            if let Some(range) = is_contiguous_range(buffered_indices) {
+                Ok(batch.slice(range.start, range.len()).columns().to_vec())
+            } else {
+                Ok(take_arrays(batch.columns(), buffered_indices, None)?)
+            }
+        }
         // If the batch was spilled to disk, less likely
         BufferedBatchState::Spilled(spill_file) => {
             let mut buffered_cols: Vec<ArrayRef> =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to