tomz commented on issue #17267:
URL: https://github.com/apache/datafusion/issues/17267#issuecomment-4410356794

   # RFC: External Hash Join — refining the partitioned-build direction
   
   **Status:** draft, intended as a **comment on
   [#17267](https://github.com/apache/datafusion/issues/17267)**
   (the active proposal thread), not a fresh issue.
   **Posted date:** 2026-05-08
   
   ## TL;DR
   
   A refinement of the hybrid-hash-join direction discussed in
   [#17267](https://github.com/apache/datafusion/issues/17267):
   
   1. Code-level pointers (file:line in the current `HashJoinExec`)
   2. A staged 4-PR plan
   3. PR 1 in progress at
      
[`tomz/datafusion:external-hash-join-pr1`](https://github.com/tomz/datafusion/tree/external-hash-join-pr1)
      — `partitioned_build` foundation module, 270 LoC + 7 tests.
      `cargo test -p datafusion-physical-plan --lib joins::hash_join`:
      386/386 pass, zero diff in existing tests.
   4. A proposed answer to the architectural question that
      stalled [#18589](https://github.com/apache/datafusion/pull/18589)
      ("separate operator vs. refactor HashJoinExec")
   
   Prior art: #1599 (2022), #12952 (2024), #17267 (2025), #18589
   (2025, closed stale 2026-01).
   
   ## Why now
   
   DataFusion sessions configured with `with_memory_limit()` bound
   HashAggregate, ExternalSort, and external-merge-join. Hash joins
   register as `can_spill: false` and return `Resources exhausted`
   on overflow. DuckDB, PostgreSQL, Spark, and Photon implement
   partitioned external hash join.
   
   This is observable in:
   * Memory-bounded execution for hash-join-heavy workloads
     (TPC-DS — see "Real-world impact" below)
   * Embedded / WASM / edge deployments where RAM is bounded
   * Multi-tenant deployments with per-task memory limits
   * DataFusion-in-Spark integrations (datafusion-comet) that need
     external reclaim hooks
   
   ## Real-world impact (measured)
   
   We hit this while running TPC-DS sf=100 single-node in a dev
   environment on a VM with 8 cores, 62 GiB RAM, 223 GiB disk.
   **Concrete bench across 16 probes** captured 2026-05-08.
   
   ### Headline data — Q14 sf=100 budget sweep
   
   | Budget | Wall | Result |
   |---|---:|---|
   | 4 GiB | — | ❌ OOM (HashJoinInput) |
   | **8 GiB** | — | ❌ **OOM (HashJoinInput)** |
   | 16 GiB | 54.7 s | ✅ |
   | 32 GiB | 56.2 s | ✅ |
   | unlimited | 54.0 s | ✅ |
   
   Q14 fails at any budget below ~16 GiB. No spill path; the
   allocation returns `Resources exhausted`. Verbatim error:
   
   ```
   Resources exhausted: Additional allocation failed for HashJoinInput[1]
     HashJoinInput[3]#108 (can spill: false) consumed 220.1 MB
     HashJoinInput[3]#226 (can spill: false) consumed 220.1 MB
     HashJoinInput[3]#361 (can spill: false) consumed 220.1 MB
     HashJoinInput[7]#386 (can spill: false) consumed 220.0 MB
     HashJoinInput[7]#249 (can spill: false) consumed 220.0 MB.
   Failed to allocate additional 99.0 KB for HashJoinInput[1] —
   30.5 KB remain available for the total pool
   ```
   
   5 concurrent HashJoinInputs at 220 MB each = 1.1 GB of
   build-side state, plus DataFusion's other reservations, exceed
   the 8 GiB pool by 30 KB at the failure point.
   
   ### Multi-query sf=100 picture — query shape matters
   
   | Query | 8 GiB | 16 GiB | unlimited | Notes |
   |---|---|---|---:|---|
   | Q14 | ❌ OOM | ✅ 54.7 s | ✅ 54.0 s | HashJoinInput memory-cap |
   | Q23 | ❌ disk-cap | (n/a) | ✅ 65.0 s | spilled aggregate hit DiskManager 
100 GiB cap |
   | Q4  | ✅ 77.4 s | (n/a) | ✅ 77.0 s | hash-heavy, but build sizes fit |
   | Q11 | ✅ 58.9 s | (n/a) | ✅ 59.4 s | hash-heavy, but build sizes fit |
   | Q72 | (n/a) | (n/a) | ⏱️ **>30min** | 9-way join, single-node-CPU bound |
   
   The OOM surface is wider than "build side too big". Q14 hits
   HashJoinInput memory exhaustion. Q23 spills via the existing
   aggregate path but exceeds DataFusion's default 100 GiB
   DiskManager temp cap — partition-aware spill writes only what's
   needed per partition, not unbounded per-batch.
   
   Q72 does not finish in 30 minutes single-node even with
   unlimited memory. Distributed execution completes Q72 sf=100 in
   ~61 s; spill is not the relevant fix for that shape.
   
   ### SortMergeJoin-fallback workaround data
   
   We carry a downstream `SortMergeJoinFallbackRule` to mitigate
   the OOM by swapping HashJoin → SortMergeJoin when build-side
   estimates exceed a threshold. The bench tested it explicitly:
   
   | Run | Wall | Result |
   |---|---:|---|
   | Q14 8 GiB no SMJ | — | ❌ OOM (HashJoinInput) |
   | Q14 8 GiB + SMJ thresh 1 GiB | — | ❌ **same OOM** |
   | Q14 15 GiB + SMJ thresh 1.5 GiB | 55.9 s | ✅ |
   | Q14 16 GiB + SMJ thresh 2 GiB | 56.1 s | ✅ |
   | Q14 16 GiB no SMJ | 54.7 s | ✅ |
   
   Two findings:
   1. At 8 GiB the SMJ fallback cannot fire — each individual
      build is below threshold; the OOM is from concurrent
      reservations summing past budget. The threshold-based SMJ
      fallback does not address the multi-medium-builds case.
   2. At budgets where HJ fits, SMJ is ~2.5 % slower (56.1 vs
      54.7 s).
   
   The SMJ fallback is not equivalent to HashJoinExec spill in
   this regime.
   
   ### Implications for the design's perf target
   
   The 1.5× target the RFC sets (vs memory-fitting hash join)
   maps to:
   
   | Query | Target wall at tight budget | vs unlimited baseline |
   |---|---:|---:|
   | Q14 | ≤ 80 s at 8 GiB | (unlimited 54 s) |
   | Q23 | ≤ 100 s at 8 GiB | (unlimited 65 s) |
   | Q4  | ≤ 80 s at 8 GiB | (unlimited 77 s — already fits) |
   | Q11 | ≤ 70 s at 8 GiB | (unlimited 59 s — already fits) |
   
   These anchor the perf expectations for the spillable-HashJoin
   PR's benchmark suite.
   
   [Full bench writeup with raw 
logs.](https://github.com/your-org/spark-rust-jaaicode/blob/main/docs/datafusion-rfc-bench-results.md)
   
   ## Existing community work this builds on
   
   | Issue / PR | Status | Relationship |
   |---|---|---|
   | [#1599](https://github.com/apache/datafusion/issues/1599) *Memory Limited 
Joins (Externalized / Spill)* (2022-01) | OPEN | The original 4-year-old ask. 
SortMergeJoin half landed (#9359); HashJoin half is what this closes. |
   | [#12952](https://github.com/apache/datafusion/issues/12952) *Add spilling 
support for HashJoin* (2024-10) | OPEN | Direct restatement of the ask. |
   | [#17267](https://github.com/apache/datafusion/issues/17267) *PROPOSAL Hash 
Join Spilling Proposal* (2025-08, **21 comments**) | OPEN | **The active design 
thread.** This RFC is intended to land here as a comment. Same hybrid-hash-join 
target; this refines with code-level concreteness + a staged plan + an 
in-flight PR. |
   | [PR #18589](https://github.com/apache/datafusion/pull/18589) *Draft 
GraceHashJoin* by @osipovartem (2025-11) | CLOSED (stale, 2026-01) | **1,825 
LoC working draft.** Closed as stale — not for technical reasons, but because 
reviewers couldn't agree on whether to do it as a separate operator or fold 
into HashJoinExec. We address that below. |
   | [PR #17444](https://github.com/apache/datafusion/pull/17444) by @adriangb 
(2025-09) | **MERGED** | Refactored HashJoinExec to progressively accumulate 
dynamic filter bounds during build "to unblock spilling". The dynamic-filter 
compatibility question is **already resolved upstream**. |
   | [PR #21882](https://github.com/apache/datafusion/pull/21882) *pluggable 
SpillFile trait* | OPEN | Introduces async-streaming spill backends (S3, 
BufFile, …). If we write spill code against this, we get backend portability 
for free. |
   | [PR #22035](https://github.com/apache/datafusion/pull/22035) *SubPool* | 
OPEN | Per-operator memory sub-pool. Designed exactly for the 
partitioned-hash-join's per-partition reservations. |
   | [PR #22043](https://github.com/apache/datafusion/pull/22043) 
*MemoryReclaimer* | OPEN | Cooperative async spill on OOM. Our partitioned hash 
join should register as one. |
   | [PR #21425](https://github.com/apache/datafusion/pull/21425) + 
[#21422](https://github.com/apache/datafusion/issues/21422) *external reclaim 
hooks* | OPEN | Spark/Comet integration; we should make our spill compatible. |
   | [#16065](https://github.com/apache/datafusion/issues/16065) *Epic: GSoC 
2025 Spilling* | OPEN | Companion epic. Hash join spill fits as a natural 
follow-up. |
   | [#13123](https://github.com/apache/datafusion/issues/13123) *EPIC Improved 
Externalized Hash Aggregation* | OPEN | Aggregation-side companion. Establishes 
spill-pool patterns we reuse. |
   | [#18840](https://github.com/apache/datafusion/issues/18840) *Dynamic hash 
join order switching* | OPEN | Composes with this RFC: a partitioned build can 
also be left/right-swapped after Phase 1 completes. |
   | [#20847](https://github.com/apache/datafusion/issues/20847) *Dynamic 
switching to partitioned mode* | OPEN | Adjacent; this RFC's partitioning is 
finer-grained (per-build hash partitioning, distinct from 
`PartitionMode::Partitioned`'s shuffle-partition). |
   
   **Net assessment:** the feature has been requested in #1599
   (2022), #12952 (2024), and #17267 (2025). #18589 reached
   1,825 LoC of working draft before being closed stale. Open
   threads: operator architecture (separate vs. refactor),
   benchmarking, PR sequencing.
   
   ## Addressing the design questions that stalled #17267 / #18589
   
   Two reviewer concerns surfaced that this RFC explicitly addresses:
   
   ### Concern 1: "SortMergeJoin spill is good enough" (@2010YOUY01)
   
   Quote from #17267:
   
   > Memory-limited equal join is a solved problem through sort
   > merge join, though there are some outstanding work to further
   > improve it. Grace Hash Join is trickier to configure because
   > we have to decide partition number up front, and it's not
   > obvious to me it's more performant.
   
   **Response:** at memory-fitting scale, hash join is faster
   than SMJ. SortMergeJoin spill helps when the build does not
   fit. When the build fits but is large, hash join is the
   faster path. Data points:
   
   * DuckDB ran TPC-H sf=300 on a 16 GB Raspberry Pi 5 using
     hybrid hash join.
   * Per @Omega359 on #18589: "I don't care how fast an algo is
     if I need a 512GB+ machine to run it."
   
   **Mitigation for the partition-count tuning concern** (this
   RFC's Open Question 1): default to 8 partitions, expose as
   `SessionConfig::hash_join_partitions`, recursively split on
   overflow (PR 4 in this RFC). Same approach DuckDB uses; their
   default is 8.
   
   ### Concern 2: "separate operator vs. refactor" (the #18589 stall)
   
   Quote from @osipovartem's PR description:
   
   > We should add a physical plan optimizer rule to replace
   > HashJoin by GraceHashJoin for inner join type only for now.
   > Also maybe add some config flag to enable/disable …
   
   The stall was "should this be `GraceHashJoinExec` (sibling) or
   should we fold spill into `HashJoinExec`?"
   
   **Resolution proposed here: fold into `HashJoinExec`, gated by
   config.** Concretely:
   
   ```rust
   // SessionConfig
   pub struct ExecutionOptions {
       /// When the build side exceeds this fraction of the memory
       /// pool, switch to partitioned execution with on-disk spill.
       /// Default 0.0 (disabled, current behavior).
       pub hash_join_spill_threshold: f64,
       /// Number of partitions for partitioned-build mode.
       /// Default 8.
       pub hash_join_partitions: usize,
   }
   ```
   
   This avoids the duplication concern while still being opt-in
   for stability. Existing users see no behavior change unless
   they set `hash_join_spill_threshold > 0` or
   `with_memory_limit()` (under which the threshold auto-derives,
   similar to how aggregation spill works today).
   
   The 4-PR plan below makes this incremental:
   * PR 1 (already up): partition utility module — pure addition
   * PR 2: BuildSideState becomes per-partition (in-memory only)
   * PR 3: spill to disk on overflow
   * PR 4: skew + outer joins + dynamic filter integration
   
   Each PR is independently reviewable; behavior change is
   gated by config from PR 3 onward.
   
   ## What's already at `tomz/datafusion:external-hash-join-pr1`
   
   A 270-LoC `partitioned_build` module
   (`datafusion/physical-plan/src/joins/hash_join/partitioned_build.rs`)
   with:
   
   ```rust
   pub const DEFAULT_NUM_PARTITIONS: usize = 8;
   
   pub fn partition_batch_by_hash(
       batch: &RecordBatch,
       on_keys: &[Arc<dyn PhysicalExpr>],
       num_partitions: usize,
       random_state: &RandomState,
   ) -> Result<Vec<Option<RecordBatch>>>;
   
   pub fn partition_with_hashes(
       batch: &RecordBatch,
       hashes: &[u64],
       num_partitions: usize,
   ) -> Result<Vec<Option<RecordBatch>>>;
   ```
   
   7 unit tests covering hash distribution, multi-column compound
   keys, determinism, edge cases. Module is registered in `mod.rs`
   but not yet wired into `HashJoinExec`, so existing behavior is
   unchanged. `cargo test -p datafusion-physical-plan --lib
   joins::hash_join`: 386/386 pass.
   
   The module is reusable across both architectural paths
   (refactor vs. sibling operator) and across both partitioning
   sources (build keys vs. probe keys with caller-provided
   hashes).
   
   ### The hash-aggregate contrast (Q67)
   
   Q67 is a rollup with grouping sets — the workload DuckDB
   highlights in their March 2026
   [cheap-MacBook-can-do-big-data 
post](https://duckdb.org/2026/03/11/big-data-on-the-cheapest-macbook).
   At TPC-DS sf=100 single-node, varying the memory budget on
   Q67:
   
   | Budget | Q67 wall |
   |---|---:|
   | unlimited | 48.8 s |
   | 8 GiB | 48.7 s |
   | 4 GiB | 49.9 s |
   | 2 GiB | 50.0 s |
   
   **2.5 % wall delta at the tightest budget.** Q67 spills via
   DataFusion's hash-aggregate spill path (#7400 / 2023).
   
   The contrast: hash-aggregate is memory-bounded with disk
   spill; hash-join is not. Q67 and Q14 are both
   memory-pressure-sensitive; Q67 spills, Q14 returns
   `Resources exhausted`.
   
   ### Note on distributed-scale data
   
   The single-node Q14 / Q23 / Q67 bench above demonstrates the
   HashJoin OOM, hash-agg spill, and SMJ-fallback behavior.
   Distributed sf=1000 runs would test whether sharding moves
   the boundary, but per-executor working sets still hit
   HashJoinInput memory caps. Distributed validation can be
   added post-RFC on top of the existing
   [Q72 sf=1000 cluster 
validation](https://github.com/your-org/spark-rust-jaaicode/blob/main/docs/bench-tpcds-sf1000-q72-utf8view-revalidation.md)
   that ran end-to-end at default (unlimited) executor memory.
   
   ## Workarounds tried
   
   * **Larger memory budget**: works at sf=100/single-node;
     doesn't scale to larger inputs.
   * **`SortMergeJoinExec` fallback** via a physical-optimizer
     rule that swaps based on stats
     
([`SortMergeJoinFallbackRule`](https://github.com/your-org/spark-rust-jaaicode/blob/main/crates/spark-dataframe/src/sort_merge_fallback.rs)).
     Functions as a correctness floor; SortMergeJoin is 1.5-3×
     slower than memory-fitting hash join (and per the bench
     above, does not fire in the multi-medium-build case).
   * **Distribute the query** across cluster nodes: works at
     scale; does not apply to single-node, embedded, or
     per-task-bounded deployments.
   
   ## Design — partitioned build with disk spill
   
   The standard textbook approach. Break the build side into N
   partitions by hash modulo N. Each partition gets `pool / N` of
   the budget. When a partition exceeds its share, spill it to
   disk. On the probe side, partition rows the same way; only one
   partition's build state is in memory at a time during probe.
   
   ### Phase 1 — partition-then-build
   
   ```
   build_input ──► HashPartitioner(N=8, hash on join_key)
                     │
                     ├── partition_0 → in-mem hash table (or spill if overflow)
                     ├── partition_1 → in-mem hash table (or spill)
                     ├── ...
                     └── partition_7 → in-mem hash table (or spill)
   ```
   
   When `MemoryReservation::try_grow` fails, victim-pick the
   largest in-memory partition, write it to disk via the
   `DiskManager`, free its memory, retry.
   
   ### Phase 2 — probe-then-merge
   
   ```
   probe_input ──► HashPartitioner(N=8, same hash function)
                     │
                     └── For each partition i:
                           if build[i] is in memory: probe directly
                           else:                     load from disk,
                                                     probe, free memory
   ```
   
   **Memory invariant**: at most one build partition's hash map is
   materialized at any moment during probe. Total memory =
   `pool / N` instead of full pool.
   
   ### Algorithm pseudocode
   
   ```rust
   // Build phase
   for batch in build_input {
       let p = hash_partition(batch, on_left, N);
       match reservation.try_grow(batch.size()) {
           Ok(_) => state.partitions[p].push(batch),
           Err(_) => {
               spill_largest_partition(&mut state)?;
               reservation.try_grow(batch.size())?;  // should fit now
               state.partitions[p].push(batch);
           }
       }
   }
   
   // Probe phase
   for probe_batch in probe_input {
       let probed_partitions = partition_by_hash(probe_batch, on_right, N);
       for (p, p_batch) in probed_partitions {
           let build_data = state.partitions[p].load_for_probe(disk_manager)?;
           run_probe(p_batch, build_data.hash_map);
           // Optionally free build_data after probe completes.
       }
   }
   ```
   
   ## Where this lands in the source tree
   
   Concrete file-level mapping (relative to
   `datafusion/physical-plan/src/joins/hash_join/`):
   
   | File | Change | LoC |
   |---|---|---:|
   | `exec.rs::BuildSideState` | replace `batches: Vec<RecordBatch>` with 
`partitions: [PartitionSlot; N]` | ~30 |
   | `exec.rs::collect_left_input` (~L1836) | partition each incoming batch via 
existing `create_hashes`; spill on `try_grow` failure | ~80 |
   | `exec.rs::HashJoinStream::poll_next_impl` | partition probe batches by 
hash; per-partition probe loop | ~150 |
   | `exec.rs` (line 1303 + 1323) | switch 
`MemoryConsumer::new("HashJoinInput")` to `.with_can_spill(true)` | ~5 |
   | `spill.rs` (NEW module) | `PartitionSlot { InMemory(Vec<RecordBatch>) \| 
Spilled { path, ... } }` + spill IPC writer/reader. Mirror 
`aggregates/group_values/spilling.rs`. | ~600 |
   | Tests in `core/tests/sql/joins.rs` | spill-fixture parquet builds + 
integration tests under tight memory budget | ~400 |
   
   **Total estimate: ~1,200-1,500 LoC + tests.**
   
   ### Concrete code skeletons
   
   **`BuildSideState` becomes partitioned:**
   
   ```rust
   const NUM_PARTITIONS: usize = 8;  // tunable via SessionConfig
   
   struct BuildSideState {
       metrics: BuildProbeJoinMetrics,
       reservation: MemoryReservation,
       on_left: Vec<PhysicalExprRef>,
       schema: SchemaRef,
       /// One slot per partition. In-memory until try_grow fails;
       /// then individual partitions get spilled to disk.
       partitions: [PartitionSlot; NUM_PARTITIONS],
       num_rows: usize,
       bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
   }
   
   enum PartitionSlot {
       InMemory(Vec<RecordBatch>),
       Spilled {
           path: RefCountedTempFile,    // managed by DiskManager
           bytes_written: usize,
           rows: usize,
       },
   }
   ```
   
   **Spill trigger** — replaces today's `try_grow?` line at L1873:
   
   ```rust
   let batch_size = get_record_batch_memory_size(&batch);
   let p = hash_partition_idx(&batch, &state.on_left, NUM_PARTITIONS)?;
   if let Err(_) = state.reservation.try_grow(batch_size) {
       spill_largest_partition(&mut state, disk_manager)?;
       state.reservation.try_grow(batch_size)?;  // should fit
   }
   state.partitions[p].push(batch);
   ```
   
   **Probe-side** — replaces single-pass probe with per-partition:
   
   ```rust
   let probe_partitioned = partition_batch_by_hash(
       &probe_batch, &self.on_right, NUM_PARTITIONS,
   )?;
   let mut output = Vec::new();
   for (p_idx, p_batch) in probe_partitioned {
       let build = self.load_build_partition(p_idx, disk_manager)?;
       output.push(self.run_probe(p_batch, &build.hash_map)?);
       // build dropped here, memory reclaimed
   }
   concat_batches(&self.schema, &output)
   ```
   
   **DiskManager integration** mirrors existing
   `group_values/spilling.rs`:
   
   ```rust
   fn spill_partition(
       state: &mut BuildSideState,
       p_idx: usize,
       disk_manager: &DiskManager,
   ) -> Result<()> {
       let path = disk_manager.create_tmp_file("HashJoin_BuildSpill")?;
       let writer = StreamWriter::try_new(File::create(&path)?, &state.schema)?;
       let batches = std::mem::take(state.partitions[p_idx].as_in_memory_mut());
       let bytes: usize = 
batches.iter().map(get_record_batch_memory_size).sum();
       for b in &batches { writer.write(b)?; }
       writer.finish()?;
       state.reservation.shrink(bytes);
       state.partitions[p_idx] = PartitionSlot::Spilled {
           path, bytes_written: bytes,
           rows: batches.iter().map(|b| b.num_rows()).sum(),
       };
       Ok(())
   }
   ```
   
   ## Edge cases
   
   1. **Skewed builds.** All rows hash to one partition →
      recursive partitioning. When `spill_largest_partition` is
      called twice on the same partition, double NUM_PARTITIONS
      for that partition's children.
   
   2. **Single-large-key.** All rows have the same key. Recursive
      partitioning doesn't help. Options:
      - Fall back to NestedLoopJoin for this build (slow but
        bounded);
      - Emit warning + use SortMergeJoinExec via the existing
        fallback rule (mirroring DuckDB's behavior).
   
   3. **Outer joins.** The "matched" bitmap (visited-indices for
      FULL/LEFT joins) needs care. Two options:
      - Per-partition bitmap, merged at end. Memory cost: 1 bit per
        build row, fine even at sf=1000 (a few hundred MB max).
      - Disable spill for outer joins in v1; document as future work.
   
   4. **Dynamic filter pushdown.** HashJoinExec creates a dynamic
      filter from build-side bounds and pushes to probe scan. With
      partitioned build:
      - Compute filter ONCE at end of build phase (after all
        partitions including spilled ones contribute their min/max);
      - OR skip dynamic filter for spilling joins (acceptable v1).
   
   5. **`PartitionMode::CollectLeft` vs `Partitioned`.** Today's
      code has separate paths. The spill rewrite naturally
      consolidates: both modes become "partition the build by hash
      modulo N". CollectLeft becomes "1 input partition × N hash
      partitions"; Partitioned becomes "M input partitions × N hash
      partitions, distributed by input partition".
   
   ## Performance target
   
   | Workload | Today (memory-fitting) | This RFC (memory-bounded) |
   |---|---|---|
   | Build fits in pool (common case) | Hash-build O(n), hash-probe O(m) | Same 
— partition is cheap (O(n) hash, no I/O), in-memory probe identical |
   | Build exceeds pool by 2-4× | OOM | Spill 1-3 partitions, probe-time disk 
reads ≈ 1.2-1.5× wall vs memory-fitting |
   | Build exceeds pool by 10×+ | OOM | Most partitions spill, probe-time disk 
reads ≈ 2-3× wall |
   
   Target perf for spilling case: within 1.5× of memory-fitting
   hash join (SortMergeJoin is 1.5-3× per the bench above).
   
   ## Implementation plan / PR breakdown
   
   For upstream review-ability, suggest 4 PRs:
   
   | PR | Scope | LoC | Time |
   |---|---|---:|---:|
   | 1 | Partitioned BuildSide (in-mem only, no spill yet). Refactor 
`collect_left_input` to always partition. Probe pipeline changes. Tests verify 
behavioral equivalence. | ~400 + 200 | ~1 week |
   | 2 | Add spill via DiskManager. Victim-pick + spill on `try_grow` failure. 
Probe-time deserialization. Tests verify spill triggers + correct results under 
tight budget. | ~600 + 400 | ~1 week |
   | 3 | Recursive partitioning for skew. | ~200 + 150 | ~3 days |
   | 4 | Outer join handling + dynamic filter integration. | ~150 + 100 | ~2 
days |
   
   Total: 3-4 weeks elapsed including review cycles.
   
   PR 1 is the architecture-only change — no behavior change.
   Subsequent PRs add features incrementally.
   
   ## Open questions for community
   
   1. **Default `NUM_PARTITIONS` value** (raised by @2010YOUY01 on
      #17267). Proposal: default 8 (DuckDB's default), exposed as
      `datafusion.execution.hash_join.partitions`. Recursive split
      on overflow (PR 4) handles undersized cases.
   
   2. **Recursive partitioning depth.** Cap at 3 levels (8 → 64 →
      512 partitions max)? Beyond that, fall through to
      NestedLoopJoin or error?
   
   3. **Probe-side partition reuse.** If the probe side is already
      `Partitioning::Hash(on_right, N')`, can we reuse that
      partitioning for the join (avoid the re-hash)? Requires
      matching N == N' and identical hash function. Optimizer-side
      integration; could be follow-up.
   
   4. **Spill format / backend.** Arrow IPC stream matches existing
      `group_values/spilling.rs`. The pluggable `SpillFile` trait
      in #21882 would provide backend portability (S3, BufFile,
      etc.) at the cost of waiting for #21882 to merge or landing
      both PRs in lock-step.
   
   5. **Dynamic filter integration** (raised on #17267). Resolved
      upstream by #17444 (merged Sep 2025): dynamic filter bounds
      accumulate progressively during build, which is
      spill-compatible.
   
   6. **Memory pool integration.** Should partitioned reservations
      register through `SubPool` (PR #22035)? If `SubPool` lands,
      per-partition build reservations match its target use case:
      one `SubPool` per `HashJoinExec`, sub-reservation per
      partition. Addresses the "FairSpillPool penalises the active
      operator" concern from #22036.
   
   7. **External reclaim.** Register spill as a `MemoryReclaimer`
      (PRs #21425 / #22043) so external Spark/Comet drivers can
      request spill on demand. Covered in PR 4.
   
   8. **Operator architecture: refactor vs. sibling** (the #18589
      stall point). Proposal: refactor `HashJoinExec`, gated by
      config. `hash_join_spill_threshold` defaults to 0.0
      (disabled, current behavior); auto-derives under
      `with_memory_limit()`.
   
   9. **Backwards compatibility.** The `PartitionMode` enum on
      `HashJoinExec` is public API. Behavior change: when spill
      is enabled, `CollectLeft` no longer collects to a single
      Vec; it partitions. Should be transparent to most
      downstream consumers, but worth flagging.
   
   ## What this enables downstream
   
   - DataFusion sessions with `with_memory_limit()` become
     memory-bounded for hash-join workloads (today they bound
     hash-aggregate, sort, and external-merge-join only).
   - TPC-DS at sf=1000 single-node on 64 GiB-class hardware.
   - Cloud-deployed DataFusion (AWS Lambda, Cloudflare Workers,
     Fly.io) with datasets larger than allocated RAM.
   - Embedded use cases (DataFusion-WASM, edge devices).
   
   ## Reference: similar implementations elsewhere
   
   - **DuckDB**: `src/execution/operator/join/perfect_hash_join_executor.cpp`,
     `physical_hash_join.cpp` — partitioned hash join with spill.
   - **PostgreSQL**: Hash join in `src/backend/executor/nodeHashjoin.c` +
     `nodeHash.c` — batch-based external hash join (very similar
     partitioning approach).
   - **DataBricks Photon**: closed-source, but their published papers
     describe a similar partitioned approach.
   
   ## Next steps
   
   This RFC is meant to land as a **comment on #17267**, not a new
   issue, to converge that thread toward implementation:
   
   1. **Comment on #17267** with this body. Tag the participants
      (@jonathanc-n @2010YOUY01 @adriangb @alamb @comphead
      @osipovartem @Omega359 @Dandandan @camuel) and propose
      resolution: refactor HashJoinExec, opt-in via config.
   2. **Reach out to @osipovartem** specifically about #18589.
      Their 1,825-LoC draft has substantial reusable code
      (`PartitionWriter`, `SpillLocation` enum, hybrid
      in-memory/disk fallback). Two paths:
      - Revive #18589 with the architectural change (refactor vs
        sibling) plus our `partitioned_build` foundation
      - Cherry-pick relevant pieces into a fresh PR series
   3. **Open Draft PR 1** for the `partitioned_build` foundation
      ([already 
up](https://github.com/tomz/datafusion/tree/external-hash-join-pr1))
      once the design direction is acknowledged on #17267.
   4. **Iterate PRs 2-4** in sequence, each with its own RFC
      comment if the design changes.
   
   If significant design changes emerge from the #17267
   discussion, the pseudocode and file pointers here are
   adjustable.
   
   ## Acknowledgments
   
   - @jonathanc-n for the #17267 proposal and hybrid-hash-join
     framing
   - @osipovartem for the #18589 draft (reusable code:
     `PartitionWriter`, `SpillLocation`, hybrid in-memory/disk
     fallback)
   - @adriangb for the dynamic-filter spill-compat refactor
     (#17444)
   - @2010YOUY01, @comphead, @alamb, @Omega359, @camuel,
     @Dandandan for the #17267 discussion
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to