tomz commented on issue #17267: URL: https://github.com/apache/datafusion/issues/17267#issuecomment-4410356794
# RFC: External Hash Join — refining the partitioned-build direction **Status:** draft, intended as a **comment on [#17267](https://github.com/apache/datafusion/issues/17267)** (the active proposal thread), not a fresh issue. **Posted date:** 2026-05-08 ## TL;DR A refinement of the hybrid-hash-join direction discussed in [#17267](https://github.com/apache/datafusion/issues/17267): 1. Code-level pointers (file:line in the current `HashJoinExec`) 2. A staged 4-PR plan 3. PR 1 in progress at [`tomz/datafusion:external-hash-join-pr1`](https://github.com/tomz/datafusion/tree/external-hash-join-pr1) — `partitioned_build` foundation module, 270 LoC + 7 tests. `cargo test -p datafusion-physical-plan --lib joins::hash_join`: 386/386 pass, zero diff in existing tests. 4. A proposed answer to the architectural question that stalled [#18589](https://github.com/apache/datafusion/pull/18589) ("separate operator vs. refactor HashJoinExec") Prior art: #1599 (2022), #12952 (2024), #17267 (2025), #18589 (2025, closed stale 2026-01). ## Why now DataFusion sessions configured with `with_memory_limit()` bound HashAggregate, ExternalSort, and external-merge-join. Hash joins register as `can_spill: false` and return `Resources exhausted` on overflow. DuckDB, PostgreSQL, Spark, and Photon implement partitioned external hash join. This is observable in: * Memory-bounded execution for hash-join-heavy workloads (TPC-DS — see "Real-world impact" below) * Embedded / WASM / edge deployments where RAM is bounded * Multi-tenant deployments with per-task memory limits * DataFusion-in-Spark integrations (datafusion-comet) that need external reclaim hooks ## Real-world impact (measured) We hit this while running TPC-DS sf=100 single-node in a dev environment on a VM with 8 cores, 62 GiB RAM, 223 GiB disk. **Concrete bench across 16 probes** captured 2026-05-08. ### Headline data — Q14 sf=100 budget sweep | Budget | Wall | Result | |---|---:|---| | 4 GiB | — | ❌ OOM (HashJoinInput) | | **8 GiB** | — | ❌ **OOM (HashJoinInput)** | | 16 GiB | 54.7 s | ✅ | | 32 GiB | 56.2 s | ✅ | | unlimited | 54.0 s | ✅ | Q14 fails at any budget below ~16 GiB. No spill path; the allocation returns `Resources exhausted`. Verbatim error: ``` Resources exhausted: Additional allocation failed for HashJoinInput[1] HashJoinInput[3]#108 (can spill: false) consumed 220.1 MB HashJoinInput[3]#226 (can spill: false) consumed 220.1 MB HashJoinInput[3]#361 (can spill: false) consumed 220.1 MB HashJoinInput[7]#386 (can spill: false) consumed 220.0 MB HashJoinInput[7]#249 (can spill: false) consumed 220.0 MB. Failed to allocate additional 99.0 KB for HashJoinInput[1] — 30.5 KB remain available for the total pool ``` 5 concurrent HashJoinInputs at 220 MB each = 1.1 GB of build-side state, plus DataFusion's other reservations, exceed the 8 GiB pool by 30 KB at the failure point. ### Multi-query sf=100 picture — query shape matters | Query | 8 GiB | 16 GiB | unlimited | Notes | |---|---|---|---:|---| | Q14 | ❌ OOM | ✅ 54.7 s | ✅ 54.0 s | HashJoinInput memory-cap | | Q23 | ❌ disk-cap | (n/a) | ✅ 65.0 s | spilled aggregate hit DiskManager 100 GiB cap | | Q4 | ✅ 77.4 s | (n/a) | ✅ 77.0 s | hash-heavy, but build sizes fit | | Q11 | ✅ 58.9 s | (n/a) | ✅ 59.4 s | hash-heavy, but build sizes fit | | Q72 | (n/a) | (n/a) | ⏱️ **>30min** | 9-way join, single-node-CPU bound | The OOM surface is wider than "build side too big". Q14 hits HashJoinInput memory exhaustion. Q23 spills via the existing aggregate path but exceeds DataFusion's default 100 GiB DiskManager temp cap — partition-aware spill writes only what's needed per partition, not unbounded per-batch. Q72 does not finish in 30 minutes single-node even with unlimited memory. Distributed execution completes Q72 sf=100 in ~61 s; spill is not the relevant fix for that shape. ### SortMergeJoin-fallback workaround data We carry a downstream `SortMergeJoinFallbackRule` to mitigate the OOM by swapping HashJoin → SortMergeJoin when build-side estimates exceed a threshold. The bench tested it explicitly: | Run | Wall | Result | |---|---:|---| | Q14 8 GiB no SMJ | — | ❌ OOM (HashJoinInput) | | Q14 8 GiB + SMJ thresh 1 GiB | — | ❌ **same OOM** | | Q14 15 GiB + SMJ thresh 1.5 GiB | 55.9 s | ✅ | | Q14 16 GiB + SMJ thresh 2 GiB | 56.1 s | ✅ | | Q14 16 GiB no SMJ | 54.7 s | ✅ | Two findings: 1. At 8 GiB the SMJ fallback cannot fire — each individual build is below threshold; the OOM is from concurrent reservations summing past budget. The threshold-based SMJ fallback does not address the multi-medium-builds case. 2. At budgets where HJ fits, SMJ is ~2.5 % slower (56.1 vs 54.7 s). The SMJ fallback is not equivalent to HashJoinExec spill in this regime. ### Implications for the design's perf target The 1.5× target the RFC sets (vs memory-fitting hash join) maps to: | Query | Target wall at tight budget | vs unlimited baseline | |---|---:|---:| | Q14 | ≤ 80 s at 8 GiB | (unlimited 54 s) | | Q23 | ≤ 100 s at 8 GiB | (unlimited 65 s) | | Q4 | ≤ 80 s at 8 GiB | (unlimited 77 s — already fits) | | Q11 | ≤ 70 s at 8 GiB | (unlimited 59 s — already fits) | These anchor the perf expectations for the spillable-HashJoin PR's benchmark suite. [Full bench writeup with raw logs.](https://github.com/your-org/spark-rust-jaaicode/blob/main/docs/datafusion-rfc-bench-results.md) ## Existing community work this builds on | Issue / PR | Status | Relationship | |---|---|---| | [#1599](https://github.com/apache/datafusion/issues/1599) *Memory Limited Joins (Externalized / Spill)* (2022-01) | OPEN | The original 4-year-old ask. SortMergeJoin half landed (#9359); HashJoin half is what this closes. | | [#12952](https://github.com/apache/datafusion/issues/12952) *Add spilling support for HashJoin* (2024-10) | OPEN | Direct restatement of the ask. | | [#17267](https://github.com/apache/datafusion/issues/17267) *PROPOSAL Hash Join Spilling Proposal* (2025-08, **21 comments**) | OPEN | **The active design thread.** This RFC is intended to land here as a comment. Same hybrid-hash-join target; this refines with code-level concreteness + a staged plan + an in-flight PR. | | [PR #18589](https://github.com/apache/datafusion/pull/18589) *Draft GraceHashJoin* by @osipovartem (2025-11) | CLOSED (stale, 2026-01) | **1,825 LoC working draft.** Closed as stale — not for technical reasons, but because reviewers couldn't agree on whether to do it as a separate operator or fold into HashJoinExec. We address that below. | | [PR #17444](https://github.com/apache/datafusion/pull/17444) by @adriangb (2025-09) | **MERGED** | Refactored HashJoinExec to progressively accumulate dynamic filter bounds during build "to unblock spilling". The dynamic-filter compatibility question is **already resolved upstream**. | | [PR #21882](https://github.com/apache/datafusion/pull/21882) *pluggable SpillFile trait* | OPEN | Introduces async-streaming spill backends (S3, BufFile, …). If we write spill code against this, we get backend portability for free. | | [PR #22035](https://github.com/apache/datafusion/pull/22035) *SubPool* | OPEN | Per-operator memory sub-pool. Designed exactly for the partitioned-hash-join's per-partition reservations. | | [PR #22043](https://github.com/apache/datafusion/pull/22043) *MemoryReclaimer* | OPEN | Cooperative async spill on OOM. Our partitioned hash join should register as one. | | [PR #21425](https://github.com/apache/datafusion/pull/21425) + [#21422](https://github.com/apache/datafusion/issues/21422) *external reclaim hooks* | OPEN | Spark/Comet integration; we should make our spill compatible. | | [#16065](https://github.com/apache/datafusion/issues/16065) *Epic: GSoC 2025 Spilling* | OPEN | Companion epic. Hash join spill fits as a natural follow-up. | | [#13123](https://github.com/apache/datafusion/issues/13123) *EPIC Improved Externalized Hash Aggregation* | OPEN | Aggregation-side companion. Establishes spill-pool patterns we reuse. | | [#18840](https://github.com/apache/datafusion/issues/18840) *Dynamic hash join order switching* | OPEN | Composes with this RFC: a partitioned build can also be left/right-swapped after Phase 1 completes. | | [#20847](https://github.com/apache/datafusion/issues/20847) *Dynamic switching to partitioned mode* | OPEN | Adjacent; this RFC's partitioning is finer-grained (per-build hash partitioning, distinct from `PartitionMode::Partitioned`'s shuffle-partition). | **Net assessment:** the feature has been requested in #1599 (2022), #12952 (2024), and #17267 (2025). #18589 reached 1,825 LoC of working draft before being closed stale. Open threads: operator architecture (separate vs. refactor), benchmarking, PR sequencing. ## Addressing the design questions that stalled #17267 / #18589 Two reviewer concerns surfaced that this RFC explicitly addresses: ### Concern 1: "SortMergeJoin spill is good enough" (@2010YOUY01) Quote from #17267: > Memory-limited equal join is a solved problem through sort > merge join, though there are some outstanding work to further > improve it. Grace Hash Join is trickier to configure because > we have to decide partition number up front, and it's not > obvious to me it's more performant. **Response:** at memory-fitting scale, hash join is faster than SMJ. SortMergeJoin spill helps when the build does not fit. When the build fits but is large, hash join is the faster path. Data points: * DuckDB ran TPC-H sf=300 on a 16 GB Raspberry Pi 5 using hybrid hash join. * Per @Omega359 on #18589: "I don't care how fast an algo is if I need a 512GB+ machine to run it." **Mitigation for the partition-count tuning concern** (this RFC's Open Question 1): default to 8 partitions, expose as `SessionConfig::hash_join_partitions`, recursively split on overflow (PR 4 in this RFC). Same approach DuckDB uses; their default is 8. ### Concern 2: "separate operator vs. refactor" (the #18589 stall) Quote from @osipovartem's PR description: > We should add a physical plan optimizer rule to replace > HashJoin by GraceHashJoin for inner join type only for now. > Also maybe add some config flag to enable/disable … The stall was "should this be `GraceHashJoinExec` (sibling) or should we fold spill into `HashJoinExec`?" **Resolution proposed here: fold into `HashJoinExec`, gated by config.** Concretely: ```rust // SessionConfig pub struct ExecutionOptions { /// When the build side exceeds this fraction of the memory /// pool, switch to partitioned execution with on-disk spill. /// Default 0.0 (disabled, current behavior). pub hash_join_spill_threshold: f64, /// Number of partitions for partitioned-build mode. /// Default 8. pub hash_join_partitions: usize, } ``` This avoids the duplication concern while still being opt-in for stability. Existing users see no behavior change unless they set `hash_join_spill_threshold > 0` or `with_memory_limit()` (under which the threshold auto-derives, similar to how aggregation spill works today). The 4-PR plan below makes this incremental: * PR 1 (already up): partition utility module — pure addition * PR 2: BuildSideState becomes per-partition (in-memory only) * PR 3: spill to disk on overflow * PR 4: skew + outer joins + dynamic filter integration Each PR is independently reviewable; behavior change is gated by config from PR 3 onward. ## What's already at `tomz/datafusion:external-hash-join-pr1` A 270-LoC `partitioned_build` module (`datafusion/physical-plan/src/joins/hash_join/partitioned_build.rs`) with: ```rust pub const DEFAULT_NUM_PARTITIONS: usize = 8; pub fn partition_batch_by_hash( batch: &RecordBatch, on_keys: &[Arc<dyn PhysicalExpr>], num_partitions: usize, random_state: &RandomState, ) -> Result<Vec<Option<RecordBatch>>>; pub fn partition_with_hashes( batch: &RecordBatch, hashes: &[u64], num_partitions: usize, ) -> Result<Vec<Option<RecordBatch>>>; ``` 7 unit tests covering hash distribution, multi-column compound keys, determinism, edge cases. Module is registered in `mod.rs` but not yet wired into `HashJoinExec`, so existing behavior is unchanged. `cargo test -p datafusion-physical-plan --lib joins::hash_join`: 386/386 pass. The module is reusable across both architectural paths (refactor vs. sibling operator) and across both partitioning sources (build keys vs. probe keys with caller-provided hashes). ### The hash-aggregate contrast (Q67) Q67 is a rollup with grouping sets — the workload DuckDB highlights in their March 2026 [cheap-MacBook-can-do-big-data post](https://duckdb.org/2026/03/11/big-data-on-the-cheapest-macbook). At TPC-DS sf=100 single-node, varying the memory budget on Q67: | Budget | Q67 wall | |---|---:| | unlimited | 48.8 s | | 8 GiB | 48.7 s | | 4 GiB | 49.9 s | | 2 GiB | 50.0 s | **2.5 % wall delta at the tightest budget.** Q67 spills via DataFusion's hash-aggregate spill path (#7400 / 2023). The contrast: hash-aggregate is memory-bounded with disk spill; hash-join is not. Q67 and Q14 are both memory-pressure-sensitive; Q67 spills, Q14 returns `Resources exhausted`. ### Note on distributed-scale data The single-node Q14 / Q23 / Q67 bench above demonstrates the HashJoin OOM, hash-agg spill, and SMJ-fallback behavior. Distributed sf=1000 runs would test whether sharding moves the boundary, but per-executor working sets still hit HashJoinInput memory caps. Distributed validation can be added post-RFC on top of the existing [Q72 sf=1000 cluster validation](https://github.com/your-org/spark-rust-jaaicode/blob/main/docs/bench-tpcds-sf1000-q72-utf8view-revalidation.md) that ran end-to-end at default (unlimited) executor memory. ## Workarounds tried * **Larger memory budget**: works at sf=100/single-node; doesn't scale to larger inputs. * **`SortMergeJoinExec` fallback** via a physical-optimizer rule that swaps based on stats ([`SortMergeJoinFallbackRule`](https://github.com/your-org/spark-rust-jaaicode/blob/main/crates/spark-dataframe/src/sort_merge_fallback.rs)). Functions as a correctness floor; SortMergeJoin is 1.5-3× slower than memory-fitting hash join (and per the bench above, does not fire in the multi-medium-build case). * **Distribute the query** across cluster nodes: works at scale; does not apply to single-node, embedded, or per-task-bounded deployments. ## Design — partitioned build with disk spill The standard textbook approach. Break the build side into N partitions by hash modulo N. Each partition gets `pool / N` of the budget. When a partition exceeds its share, spill it to disk. On the probe side, partition rows the same way; only one partition's build state is in memory at a time during probe. ### Phase 1 — partition-then-build ``` build_input ──► HashPartitioner(N=8, hash on join_key) │ ├── partition_0 → in-mem hash table (or spill if overflow) ├── partition_1 → in-mem hash table (or spill) ├── ... └── partition_7 → in-mem hash table (or spill) ``` When `MemoryReservation::try_grow` fails, victim-pick the largest in-memory partition, write it to disk via the `DiskManager`, free its memory, retry. ### Phase 2 — probe-then-merge ``` probe_input ──► HashPartitioner(N=8, same hash function) │ └── For each partition i: if build[i] is in memory: probe directly else: load from disk, probe, free memory ``` **Memory invariant**: at most one build partition's hash map is materialized at any moment during probe. Total memory = `pool / N` instead of full pool. ### Algorithm pseudocode ```rust // Build phase for batch in build_input { let p = hash_partition(batch, on_left, N); match reservation.try_grow(batch.size()) { Ok(_) => state.partitions[p].push(batch), Err(_) => { spill_largest_partition(&mut state)?; reservation.try_grow(batch.size())?; // should fit now state.partitions[p].push(batch); } } } // Probe phase for probe_batch in probe_input { let probed_partitions = partition_by_hash(probe_batch, on_right, N); for (p, p_batch) in probed_partitions { let build_data = state.partitions[p].load_for_probe(disk_manager)?; run_probe(p_batch, build_data.hash_map); // Optionally free build_data after probe completes. } } ``` ## Where this lands in the source tree Concrete file-level mapping (relative to `datafusion/physical-plan/src/joins/hash_join/`): | File | Change | LoC | |---|---|---:| | `exec.rs::BuildSideState` | replace `batches: Vec<RecordBatch>` with `partitions: [PartitionSlot; N]` | ~30 | | `exec.rs::collect_left_input` (~L1836) | partition each incoming batch via existing `create_hashes`; spill on `try_grow` failure | ~80 | | `exec.rs::HashJoinStream::poll_next_impl` | partition probe batches by hash; per-partition probe loop | ~150 | | `exec.rs` (line 1303 + 1323) | switch `MemoryConsumer::new("HashJoinInput")` to `.with_can_spill(true)` | ~5 | | `spill.rs` (NEW module) | `PartitionSlot { InMemory(Vec<RecordBatch>) \| Spilled { path, ... } }` + spill IPC writer/reader. Mirror `aggregates/group_values/spilling.rs`. | ~600 | | Tests in `core/tests/sql/joins.rs` | spill-fixture parquet builds + integration tests under tight memory budget | ~400 | **Total estimate: ~1,200-1,500 LoC + tests.** ### Concrete code skeletons **`BuildSideState` becomes partitioned:** ```rust const NUM_PARTITIONS: usize = 8; // tunable via SessionConfig struct BuildSideState { metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, on_left: Vec<PhysicalExprRef>, schema: SchemaRef, /// One slot per partition. In-memory until try_grow fails; /// then individual partitions get spilled to disk. partitions: [PartitionSlot; NUM_PARTITIONS], num_rows: usize, bounds_accumulators: Option<Vec<CollectLeftAccumulator>>, } enum PartitionSlot { InMemory(Vec<RecordBatch>), Spilled { path: RefCountedTempFile, // managed by DiskManager bytes_written: usize, rows: usize, }, } ``` **Spill trigger** — replaces today's `try_grow?` line at L1873: ```rust let batch_size = get_record_batch_memory_size(&batch); let p = hash_partition_idx(&batch, &state.on_left, NUM_PARTITIONS)?; if let Err(_) = state.reservation.try_grow(batch_size) { spill_largest_partition(&mut state, disk_manager)?; state.reservation.try_grow(batch_size)?; // should fit } state.partitions[p].push(batch); ``` **Probe-side** — replaces single-pass probe with per-partition: ```rust let probe_partitioned = partition_batch_by_hash( &probe_batch, &self.on_right, NUM_PARTITIONS, )?; let mut output = Vec::new(); for (p_idx, p_batch) in probe_partitioned { let build = self.load_build_partition(p_idx, disk_manager)?; output.push(self.run_probe(p_batch, &build.hash_map)?); // build dropped here, memory reclaimed } concat_batches(&self.schema, &output) ``` **DiskManager integration** mirrors existing `group_values/spilling.rs`: ```rust fn spill_partition( state: &mut BuildSideState, p_idx: usize, disk_manager: &DiskManager, ) -> Result<()> { let path = disk_manager.create_tmp_file("HashJoin_BuildSpill")?; let writer = StreamWriter::try_new(File::create(&path)?, &state.schema)?; let batches = std::mem::take(state.partitions[p_idx].as_in_memory_mut()); let bytes: usize = batches.iter().map(get_record_batch_memory_size).sum(); for b in &batches { writer.write(b)?; } writer.finish()?; state.reservation.shrink(bytes); state.partitions[p_idx] = PartitionSlot::Spilled { path, bytes_written: bytes, rows: batches.iter().map(|b| b.num_rows()).sum(), }; Ok(()) } ``` ## Edge cases 1. **Skewed builds.** All rows hash to one partition → recursive partitioning. When `spill_largest_partition` is called twice on the same partition, double NUM_PARTITIONS for that partition's children. 2. **Single-large-key.** All rows have the same key. Recursive partitioning doesn't help. Options: - Fall back to NestedLoopJoin for this build (slow but bounded); - Emit warning + use SortMergeJoinExec via the existing fallback rule (mirroring DuckDB's behavior). 3. **Outer joins.** The "matched" bitmap (visited-indices for FULL/LEFT joins) needs care. Two options: - Per-partition bitmap, merged at end. Memory cost: 1 bit per build row, fine even at sf=1000 (a few hundred MB max). - Disable spill for outer joins in v1; document as future work. 4. **Dynamic filter pushdown.** HashJoinExec creates a dynamic filter from build-side bounds and pushes to probe scan. With partitioned build: - Compute filter ONCE at end of build phase (after all partitions including spilled ones contribute their min/max); - OR skip dynamic filter for spilling joins (acceptable v1). 5. **`PartitionMode::CollectLeft` vs `Partitioned`.** Today's code has separate paths. The spill rewrite naturally consolidates: both modes become "partition the build by hash modulo N". CollectLeft becomes "1 input partition × N hash partitions"; Partitioned becomes "M input partitions × N hash partitions, distributed by input partition". ## Performance target | Workload | Today (memory-fitting) | This RFC (memory-bounded) | |---|---|---| | Build fits in pool (common case) | Hash-build O(n), hash-probe O(m) | Same — partition is cheap (O(n) hash, no I/O), in-memory probe identical | | Build exceeds pool by 2-4× | OOM | Spill 1-3 partitions, probe-time disk reads ≈ 1.2-1.5× wall vs memory-fitting | | Build exceeds pool by 10×+ | OOM | Most partitions spill, probe-time disk reads ≈ 2-3× wall | Target perf for spilling case: within 1.5× of memory-fitting hash join (SortMergeJoin is 1.5-3× per the bench above). ## Implementation plan / PR breakdown For upstream review-ability, suggest 4 PRs: | PR | Scope | LoC | Time | |---|---|---:|---:| | 1 | Partitioned BuildSide (in-mem only, no spill yet). Refactor `collect_left_input` to always partition. Probe pipeline changes. Tests verify behavioral equivalence. | ~400 + 200 | ~1 week | | 2 | Add spill via DiskManager. Victim-pick + spill on `try_grow` failure. Probe-time deserialization. Tests verify spill triggers + correct results under tight budget. | ~600 + 400 | ~1 week | | 3 | Recursive partitioning for skew. | ~200 + 150 | ~3 days | | 4 | Outer join handling + dynamic filter integration. | ~150 + 100 | ~2 days | Total: 3-4 weeks elapsed including review cycles. PR 1 is the architecture-only change — no behavior change. Subsequent PRs add features incrementally. ## Open questions for community 1. **Default `NUM_PARTITIONS` value** (raised by @2010YOUY01 on #17267). Proposal: default 8 (DuckDB's default), exposed as `datafusion.execution.hash_join.partitions`. Recursive split on overflow (PR 4) handles undersized cases. 2. **Recursive partitioning depth.** Cap at 3 levels (8 → 64 → 512 partitions max)? Beyond that, fall through to NestedLoopJoin or error? 3. **Probe-side partition reuse.** If the probe side is already `Partitioning::Hash(on_right, N')`, can we reuse that partitioning for the join (avoid the re-hash)? Requires matching N == N' and identical hash function. Optimizer-side integration; could be follow-up. 4. **Spill format / backend.** Arrow IPC stream matches existing `group_values/spilling.rs`. The pluggable `SpillFile` trait in #21882 would provide backend portability (S3, BufFile, etc.) at the cost of waiting for #21882 to merge or landing both PRs in lock-step. 5. **Dynamic filter integration** (raised on #17267). Resolved upstream by #17444 (merged Sep 2025): dynamic filter bounds accumulate progressively during build, which is spill-compatible. 6. **Memory pool integration.** Should partitioned reservations register through `SubPool` (PR #22035)? If `SubPool` lands, per-partition build reservations match its target use case: one `SubPool` per `HashJoinExec`, sub-reservation per partition. Addresses the "FairSpillPool penalises the active operator" concern from #22036. 7. **External reclaim.** Register spill as a `MemoryReclaimer` (PRs #21425 / #22043) so external Spark/Comet drivers can request spill on demand. Covered in PR 4. 8. **Operator architecture: refactor vs. sibling** (the #18589 stall point). Proposal: refactor `HashJoinExec`, gated by config. `hash_join_spill_threshold` defaults to 0.0 (disabled, current behavior); auto-derives under `with_memory_limit()`. 9. **Backwards compatibility.** The `PartitionMode` enum on `HashJoinExec` is public API. Behavior change: when spill is enabled, `CollectLeft` no longer collects to a single Vec; it partitions. Should be transparent to most downstream consumers, but worth flagging. ## What this enables downstream - DataFusion sessions with `with_memory_limit()` become memory-bounded for hash-join workloads (today they bound hash-aggregate, sort, and external-merge-join only). - TPC-DS at sf=1000 single-node on 64 GiB-class hardware. - Cloud-deployed DataFusion (AWS Lambda, Cloudflare Workers, Fly.io) with datasets larger than allocated RAM. - Embedded use cases (DataFusion-WASM, edge devices). ## Reference: similar implementations elsewhere - **DuckDB**: `src/execution/operator/join/perfect_hash_join_executor.cpp`, `physical_hash_join.cpp` — partitioned hash join with spill. - **PostgreSQL**: Hash join in `src/backend/executor/nodeHashjoin.c` + `nodeHash.c` — batch-based external hash join (very similar partitioning approach). - **DataBricks Photon**: closed-source, but their published papers describe a similar partitioned approach. ## Next steps This RFC is meant to land as a **comment on #17267**, not a new issue, to converge that thread toward implementation: 1. **Comment on #17267** with this body. Tag the participants (@jonathanc-n @2010YOUY01 @adriangb @alamb @comphead @osipovartem @Omega359 @Dandandan @camuel) and propose resolution: refactor HashJoinExec, opt-in via config. 2. **Reach out to @osipovartem** specifically about #18589. Their 1,825-LoC draft has substantial reusable code (`PartitionWriter`, `SpillLocation` enum, hybrid in-memory/disk fallback). Two paths: - Revive #18589 with the architectural change (refactor vs sibling) plus our `partitioned_build` foundation - Cherry-pick relevant pieces into a fresh PR series 3. **Open Draft PR 1** for the `partitioned_build` foundation ([already up](https://github.com/tomz/datafusion/tree/external-hash-join-pr1)) once the design direction is acknowledged on #17267. 4. **Iterate PRs 2-4** in sequence, each with its own RFC comment if the design changes. If significant design changes emerge from the #17267 discussion, the pseudocode and file pointers here are adjustable. ## Acknowledgments - @jonathanc-n for the #17267 proposal and hybrid-hash-join framing - @osipovartem for the #18589 draft (reusable code: `PartitionWriter`, `SpillLocation`, hybrid in-memory/disk fallback) - @adriangb for the dynamic-filter spill-compat refactor (#17444) - @2010YOUY01, @comphead, @alamb, @Omega359, @camuel, @Dandandan for the #17267 discussion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
