adriangb commented on code in PR #17197: URL: https://github.com/apache/datafusion/pull/17197#discussion_r2288174136
########## datafusion/physical-plan/src/joins/hash_join.rs: ########## @@ -1695,12 +1858,91 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); + // Handle dynamic filter bounds accumulation + // + // This coordination ensures the dynamic filter contains complete bounds information + // from all relevant partitions before being applied to probe-side scans. + // + // Process: + // 1. Store this partition's bounds in the shared accumulator + // 2. Atomically increment the completion counter + // 3. If we're the last partition to complete, merge all bounds and update the filter + // + // Note: In CollectLeft mode, multiple partitions may access the SAME build data + // (shared via OnceFut), but each partition must report separately to ensure proper + // coordination across all output partitions. + // + // The consequences of not doing this synchronization properly would be that a filter + // with incomplete bounds would be pushed down resulting in incorrect results (missing rows). + if let Some(dynamic_filter) = &self.dynamic_filter { + // Store bounds in the accumulator - this runs once per partition + if let Some(bounds) = &left_data.bounds { + // Only push actual bounds if they exist + self.bounds_accumulator.bounds.lock().push(bounds.clone()); + } + + // Atomically increment the completion counter + // Even empty partitions must report to ensure proper termination + let completed = self + .bounds_accumulator + .completed_partitions + .fetch_add(1, Ordering::SeqCst) + + 1; + let total_partitions = self.bounds_accumulator.total_partitions; + + // Critical synchronization point: Only the last partition updates the filter + // Troubleshooting: If you see "completed > total_partitions", check partition + // count calculation in try_new() - it may not match actual execution calls + if completed == total_partitions { + if let Some(merged_bounds) = self.bounds_accumulator.merge_bounds() { + let filter_expr = self.create_filter_from_bounds(merged_bounds)?; + dynamic_filter.update(filter_expr)?; + } + } Review Comment: `reset_state` is already implemented for `HashJoinExec` and it does reset `HashJoinExec::bounds_accumulator`. This bit of code we are commenting on is within `HashJoinStream` which _is_ created inside of `HashJoinExec::execute` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org