adriangb commented on code in PR #17197: URL: https://github.com/apache/datafusion/pull/17197#discussion_r2288344580
########## datafusion/physical-plan/src/joins/hash_join.rs: ########## @@ -1695,12 +1858,91 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); + // Handle dynamic filter bounds accumulation + // + // This coordination ensures the dynamic filter contains complete bounds information + // from all relevant partitions before being applied to probe-side scans. + // + // Process: + // 1. Store this partition's bounds in the shared accumulator + // 2. Atomically increment the completion counter + // 3. If we're the last partition to complete, merge all bounds and update the filter + // + // Note: In CollectLeft mode, multiple partitions may access the SAME build data + // (shared via OnceFut), but each partition must report separately to ensure proper + // coordination across all output partitions. + // + // The consequences of not doing this synchronization properly would be that a filter + // with incomplete bounds would be pushed down resulting in incorrect results (missing rows). + if let Some(dynamic_filter) = &self.dynamic_filter { + // Store bounds in the accumulator - this runs once per partition + if let Some(bounds) = &left_data.bounds { + // Only push actual bounds if they exist + self.bounds_accumulator.bounds.lock().push(bounds.clone()); Review Comment: Let's see how 01757fad7 looks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org