adriangb opened a new pull request, #21641: URL: https://github.com/apache/datafusion/pull/21641
## Which issue does this PR close? Closes #21625. ## Rationale for this change PR #21632 fixed the TPCH Q18 hang by disabling the #21068 empty-build short-circuit whenever a build accumulator is present. That addressed the symptom but regressed the #21068 optimization in exactly the workloads where it matters (many partitions + skewed builds + dynamic filter pushdown on). Investigation (instrumented traces + bisection on the fix itself) showed the actual root cause is different: it is an interaction between `tokio::sync::Barrier` and `RepartitionExec`'s global backpressure gate, not empty partitions failing to report. ### The deadlock - `SharedBuildAccumulator` parks every partition on a `Barrier` sized to the total partition count so the leader can publish the dynamic filter atomically. - `RepartitionExec`'s `distributor_channels` uses a global "all channels non-empty → gate closed" backpressure rule. Crucially, a receiver dropped while its channel still has buffered data does **not** free a gate slot — `empty_channels` is only decremented when a channel drains to empty *before* being closed. - When N−k partitions reach `collect_build_side`, drop their build-side receivers, and park on the barrier, their "ghost" non-empty channels keep the gate counter pinned. The `RepartitionExec` input task can't push more data, so the remaining k partitions starve on their build-side inputs, never arrive at the barrier, and the N−k parked partitions block forever. On Q18 at `DATAFUSION_EXECUTION_TARGET_PARTITIONS=24`, the correlated subquery produces ~57 distinct `l_orderkey` values. Hashed into 24 buckets, 3 are empty. Those 3 partitions are exactly the ones that starve. Disabling `datafusion.optimizer.enable_dynamic_filter_pushdown` unhangs the query because `is_used()` returns false, no accumulator is created, no partition parks, the gate never stays closed. `git bisect` blamed #21068, but that PR only shifted scheduling into the pattern that exposes the latent composition bug between the barrier and the gate. The barrier-vs-gate interaction has been there since both primitives coexisted. ## What changes are included in this PR? Replace the `tokio::sync::Barrier` in `SharedBuildAccumulator` with a non-blocking "last to report is leader" counter: - New `remaining: AtomicUsize` field, initialized to `expected_calls`. - `report_build_data` is now synchronous: it stores the partition's build-side slice under the existing `parking_lot::Mutex`, then `fetch_sub(1, AcqRel)`. The `AcqRel` ordering is what sequences the leader's read of `inner` after every other partition's store. - The caller that brings the counter to zero runs the leader body (create combined filter → `dynamic_filter.update()` → `mark_complete()`). All other callers return immediately. - No partition ever parks. The gate stays drainable. The pipeline flows. **Trade-off:** a small number of probe batches may evaluate against the placeholder `lit(true)` filter before the leader publishes the real one. `DynamicFilterPhysicalExpr` is explicitly designed to be read live (`.current()` is called per batch on the scan side), and the filter is an optimization rather than a correctness constraint, so this is safe. **Dead code removed:** - `HashJoinStreamState::WaitPartitionBoundsReport` - `HashJoinStream::build_waiter: Option<OnceFut<()>>` - `HashJoinStream::wait_for_partition_bounds_report` - The `coordinated && is_initial_collect` guard inside `state_after_build_ready` added by #21632 — the #21068 short-circuit is now always safe for coordinated joins because every partition still reports (synchronously) before returning. - The `process_probe_batch` `empty_build_side_produces_empty_result` fallback added by #21632 — `debug_assert!` is restored. ## Are these changes tested? - **Regression:** TPCH Q18 at `DATAFUSION_EXECUTION_TARGET_PARTITIONS=24` with dynamic filter pushdown **on** — returns 57 rows in ~0.9s (debug build). Previously hung indefinitely. - **Unit:** `cargo test -p datafusion-physical-plan joins::hash_join` — 376 passed, 0 failed. - **Lint:** `cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings` — clean. A dedicated regression test (Partitioned mode + stacked hash joins + some empty shards + dynamic filter on) is still TODO — the current Q18 end-to-end validation is what unblocked the fix, but a self-contained minimal reproducer in `exec.rs` would be a better guardrail. Draft status is so we can decide how to shape that test before merging. ## Are there any user-facing changes? No API changes. Behavioral change: when the build side is empty and the join type guarantees empty output (Inner, Left, LeftSemi, LeftAnti, LeftMark, RightSemi), the probe side is once again skipped in dynamic-filter-enabled plans. This restores the #21068 optimization that #21632 had to disable. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
