adriangb opened a new pull request, #21641:
URL: https://github.com/apache/datafusion/pull/21641

   ## Which issue does this PR close?
   
   Closes #21625.
   
   ## Rationale for this change
   
   PR #21632 fixed the TPCH Q18 hang by disabling the #21068 empty-build 
short-circuit whenever a build accumulator is present. That addressed the 
symptom but regressed the #21068 optimization in exactly the workloads where it 
matters (many partitions + skewed builds + dynamic filter pushdown on).
   
   Investigation (instrumented traces + bisection on the fix itself) showed the 
actual root cause is different: it is an interaction between 
`tokio::sync::Barrier` and `RepartitionExec`'s global backpressure gate, not 
empty partitions failing to report.
   
   ### The deadlock
   
   - `SharedBuildAccumulator` parks every partition on a `Barrier` sized to the 
total partition count so the leader can publish the dynamic filter atomically.
   - `RepartitionExec`'s `distributor_channels` uses a global "all channels 
non-empty → gate closed" backpressure rule. Crucially, a receiver dropped while 
its channel still has buffered data does **not** free a gate slot — 
`empty_channels` is only decremented when a channel drains to empty *before* 
being closed.
   - When N−k partitions reach `collect_build_side`, drop their build-side 
receivers, and park on the barrier, their "ghost" non-empty channels keep the 
gate counter pinned. The `RepartitionExec` input task can't push more data, so 
the remaining k partitions starve on their build-side inputs, never arrive at 
the barrier, and the N−k parked partitions block forever.
   
   On Q18 at `DATAFUSION_EXECUTION_TARGET_PARTITIONS=24`, the correlated 
subquery produces ~57 distinct `l_orderkey` values. Hashed into 24 buckets, 3 
are empty. Those 3 partitions are exactly the ones that starve.
   
   Disabling `datafusion.optimizer.enable_dynamic_filter_pushdown` unhangs the 
query because `is_used()` returns false, no accumulator is created, no 
partition parks, the gate never stays closed.
   
   `git bisect` blamed #21068, but that PR only shifted scheduling into the 
pattern that exposes the latent composition bug between the barrier and the 
gate. The barrier-vs-gate interaction has been there since both primitives 
coexisted.
   
   ## What changes are included in this PR?
   
   Replace the `tokio::sync::Barrier` in `SharedBuildAccumulator` with a 
non-blocking "last to report is leader" counter:
   
   - New `remaining: AtomicUsize` field, initialized to `expected_calls`.
   - `report_build_data` is now synchronous: it stores the partition's 
build-side slice under the existing `parking_lot::Mutex`, then `fetch_sub(1, 
AcqRel)`. The `AcqRel` ordering is what sequences the leader's read of `inner` 
after every other partition's store.
   - The caller that brings the counter to zero runs the leader body (create 
combined filter → `dynamic_filter.update()` → `mark_complete()`). All other 
callers return immediately.
   - No partition ever parks. The gate stays drainable. The pipeline flows.
   
   **Trade-off:** a small number of probe batches may evaluate against the 
placeholder `lit(true)` filter before the leader publishes the real one. 
`DynamicFilterPhysicalExpr` is explicitly designed to be read live 
(`.current()` is called per batch on the scan side), and the filter is an 
optimization rather than a correctness constraint, so this is safe.
   
   **Dead code removed:**
   - `HashJoinStreamState::WaitPartitionBoundsReport`
   - `HashJoinStream::build_waiter: Option<OnceFut<()>>`
   - `HashJoinStream::wait_for_partition_bounds_report`
   - The `coordinated && is_initial_collect` guard inside 
`state_after_build_ready` added by #21632 — the #21068 short-circuit is now 
always safe for coordinated joins because every partition still reports 
(synchronously) before returning.
   - The `process_probe_batch` `empty_build_side_produces_empty_result` 
fallback added by #21632 — `debug_assert!` is restored.
   
   ## Are these changes tested?
   
   - **Regression:** TPCH Q18 at `DATAFUSION_EXECUTION_TARGET_PARTITIONS=24` 
with dynamic filter pushdown **on** — returns 57 rows in ~0.9s (debug build). 
Previously hung indefinitely.
   - **Unit:** `cargo test -p datafusion-physical-plan joins::hash_join` — 376 
passed, 0 failed.
   - **Lint:** `cargo clippy -p datafusion-physical-plan --all-targets -- -D 
warnings` — clean.
   
   A dedicated regression test (Partitioned mode + stacked hash joins + some 
empty shards + dynamic filter on) is still TODO — the current Q18 end-to-end 
validation is what unblocked the fix, but a self-contained minimal reproducer 
in `exec.rs` would be a better guardrail. Draft status is so we can decide how 
to shape that test before merging.
   
   ## Are there any user-facing changes?
   
   No API changes. Behavioral change: when the build side is empty and the join 
type guarantees empty output (Inner, Left, LeftSemi, LeftAnti, LeftMark, 
RightSemi), the probe side is once again skipped in dynamic-filter-enabled 
plans. This restores the #21068 optimization that #21632 had to disable.
   
   ## LLM-generated code disclosure
   
   This PR includes LLM-generated code and comments. All LLM-generated content 
has been manually reviewed and tested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to