adriangb commented on code in PR #17197:
URL: https://github.com/apache/datafusion/pull/17197#discussion_r2288344580
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1695,12 +1858,91 @@ impl HashJoinStream {
.get_shared(cx))?;
build_timer.done();
+ // Handle dynamic filter bounds accumulation
+ //
+ // This coordination ensures the dynamic filter contains complete
bounds information
+ // from all relevant partitions before being applied to probe-side
scans.
+ //
+ // Process:
+ // 1. Store this partition's bounds in the shared accumulator
+ // 2. Atomically increment the completion counter
+ // 3. If we're the last partition to complete, merge all bounds and
update the filter
+ //
+ // Note: In CollectLeft mode, multiple partitions may access the SAME
build data
+ // (shared via OnceFut), but each partition must report separately to
ensure proper
+ // coordination across all output partitions.
+ //
+ // The consequences of not doing this synchronization properly would
be that a filter
+ // with incomplete bounds would be pushed down resulting in incorrect
results (missing rows).
+ if let Some(dynamic_filter) = &self.dynamic_filter {
+ // Store bounds in the accumulator - this runs once per partition
+ if let Some(bounds) = &left_data.bounds {
+ // Only push actual bounds if they exist
+ self.bounds_accumulator.bounds.lock().push(bounds.clone());
Review Comment:
Let's see how 01757fad7 looks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]