andygrove opened a new pull request, #3871: URL: https://github.com/apache/datafusion-comet/pull/3871
## Which issue does this PR close? N/A - New experimental feature ## Rationale for this change This PR introduces a Comet-owned sort merge join operator (`CometSortMergeJoinExec`) that replaces DataFusion's `SortMergeJoinExec`. The motivations are: 1. **Memory efficiency** — Spills entire buffered batches (including join key arrays) to Arrow IPC, fixing DataFusion's gap where join key arrays stay in memory during spills 2. **Performance** — Implements Spark's key-reuse optimization: when consecutive streamed rows share the same join key, the buffered match group is reused without re-scanning 3. **Decoupling** — Owns the join operator so Comet is not coupled to DataFusion's evolving SMJ API and can evolve independently A configuration toggle (`spark.comet.exec.sortMergeJoin.useNative`, default `true`) allows switching between the new and DataFusion implementations for A/B benchmarking. **This is experimental.** The implementation passes all existing tests but has known limitations listed below. ## What changes are included in this PR? ### New files (`native/core/src/execution/joins/`) - **`sort_merge_join.rs`** — `CometSortMergeJoinExec` implementing DataFusion's `ExecutionPlan` trait - **`sort_merge_join_stream.rs`** — Streaming state machine (Init, PollStreamed, PollBuffered, Comparing, CollectingBuffered, Joining, OutputReady, DrainUnmatched, DrainBuffered, Exhausted) - **`buffered_batch.rs`** — `BufferedMatchGroup` with batch-level Arrow IPC spilling via `SpillManager` - **`output_builder.rs`** — Batch materialization using Arrow `take`/`concat` kernels - **`filter.rs`** — Join filter evaluation with corrected masks for outer/semi/anti joins - **`metrics.rs`** — Metrics matching `CometMetricNode.scala` (input/output rows/batches, join_time, peak_mem, spill counts) - **`tests.rs`** — 11 unit tests covering all join types and spilling ### Modified files - **`CometConf.scala`** — New `spark.comet.exec.sortMergeJoin.useNative` config - **`planner.rs`** — Conditional operator construction based on config - **`jni_api.rs`** — Pass spark config through to planner - **`spark_config.rs`** — Config constant ### Supported join types All 6: Inner, LeftOuter, RightOuter, FullOuter, LeftSemi, LeftAnti ### Key design decisions - **Streamed/buffered assignment** matches Spark (RightOuter swaps sides, all others: left=streamed, right=buffered) - **Null handling** matches Spark (`NullEqualsNothing` — null keys never match for inner/semi, emit with null counterpart for outer/anti) - **Batch-level spilling** via DataFusion's `SpillManager` and Arrow IPC when `MemoryReservation::try_grow()` fails - **Key-reuse** caches streamed key as `OwnedRow` via Arrow's `RowConverter` ### Known limitations / future work - **Full outer + filter cross-group tracking**: Buffered rows from earlier key groups that were unmatched across all streamed rows will not be null-joined. Requires tracking unmatched buffered rows across key group boundaries. - **No codegen**: Spark has whole-stage codegen for SMJ; this implementation uses interpreted evaluation. - **A/B comparison test**: Automated Scala test comparing native vs DataFusion output not yet added. ## How are these changes tested? - **11 Rust unit tests** covering all 6 join types (inner, left/right/full outer, left semi, left anti), null key handling, duplicate keys (many-to-many), empty results, and forced spilling under 1KB memory limit - **Existing `CometJoinSuite`** (10 tests) passes with the new implementation as a drop-in replacement, including SortMergeJoin with and without join filters across all join types -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
