kosiew commented on code in PR #17197: URL: https://github.com/apache/datafusion/pull/17197#discussion_r2287661024
########## datafusion/physical-plan/src/joins/hash_join.rs: ########## @@ -365,7 +514,10 @@ pub struct HashJoinExec { /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side - dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>, + dynamic_filter: Arc<DynamicFilterPhysicalExpr>, + /// Shared bounds accumulator for coordinating dynamic filter updates across partitions + /// Lazily initialized at execution time to use actual runtime partition counts + bounds_accumulator: Arc<OnceLock<Arc<SharedBoundsAccumulator>>>, Review Comment: The double Arc in Arc<OnceLock<Arc<SharedBoundsAccumulator>>> layering increases complexity and obscures ownership expectations Can we replace with OnceLock<Arc<SharedBoundsAccumulator>> to eliminate the outer Arc wrapper? ########## datafusion/physical-plan/src/joins/hash_join.rs: ########## @@ -98,6 +98,151 @@ use parking_lot::Mutex; const HASH_JOIN_SEED: RandomState = RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +/// Coordinates dynamic filter bounds collection across multiple partitions +/// +/// This structure ensures that dynamic filters are built with complete information from all +/// relevant partitions before being applied to probe-side scans. Incomplete filters would +/// incorrectly eliminate valid join results. +/// +/// ## Synchronization Strategy +/// +/// 1. Each partition computes bounds from its build-side data +/// 2. Bounds are stored in the shared HashMap (indexed by partition_id) +/// 3. A counter tracks how many partitions have reported their bounds +/// 4. When the last partition reports (completed == total), bounds are merged and filter is updated +/// +/// ## Partition Counting +/// +/// The `total_partitions` count represents how many times `collect_build_side` will be called: +/// - **CollectLeft**: Number of output partitions (each accesses shared build data) +/// - **Partitioned**: Number of input partitions (each builds independently) +/// +/// ## Thread Safety +/// +/// All fields use atomic operations or mutexes to ensure correct coordination between concurrent +/// partition executions. +struct SharedBoundsAccumulator { + /// Bounds from completed partitions. + bounds: Mutex<Vec<Vec<(ScalarValue, ScalarValue)>>>, + /// Number of partitions that have reported completion. + completed_partitions: AtomicUsize, + /// Total number of partitions. + /// Need to know this so that we can update the dynamic filter once we are done + /// building *all* of the hash tables. + total_partitions: usize, +} + +impl SharedBoundsAccumulator { + /// Creates a new SharedBoundsAccumulator configured for the given partition mode + /// + /// This method calculates how many times `collect_build_side` will be called based on the + /// partition mode's execution pattern. This count is critical for determining when we have + /// complete information from all partitions to build the dynamic filter. + /// + /// ## Partition Mode Execution Patterns + /// + /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut` + /// across all output partitions. Each output partition calls `collect_build_side` to access + /// the shared build data. Expected calls = number of output partitions. + /// + /// - **Partitioned**: Each partition independently builds its own hash table by calling + /// `collect_build_side` once. Expected calls = number of build partitions. + /// + /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since + /// the actual mode will be determined and a new bounds_accumulator created before execution. + /// + /// ## Why This Matters + /// + /// We cannot build a partial filter from some partitions - it would incorrectly eliminate + /// valid join results. We must wait until we have complete bounds information from ALL + /// relevant partitions before updating the dynamic filter. + fn new_from_partition_mode( + partition_mode: PartitionMode, + left_child: &dyn ExecutionPlan, + right_child: &dyn ExecutionPlan, + ) -> Self { + // Troubleshooting: If partition counts are incorrect, verify this logic matches + // the actual execution pattern in collect_build_side() + let expected_calls = match partition_mode { + // Each output partition accesses shared build data + PartitionMode::CollectLeft => { + right_child.output_partitioning().partition_count() + } + // Each partition builds its own data + PartitionMode::Partitioned => { + left_child.output_partitioning().partition_count() + } + // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) + PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), + }; + Self { + bounds: Mutex::new(Vec::with_capacity(expected_calls)), + completed_partitions: AtomicUsize::new(0), + total_partitions: expected_calls, + } + } + + /// Merge all bounds from completed partitions into global min/max. + /// + /// Troubleshooting: If this returns None when you expect bounds, check: + /// 1. All partitions called collect_build_side with bounds data + /// 2. collect_left_input was called with should_compute_bounds=true + /// 3. The build side had at least one non-empty batch + fn merge_bounds(&self) -> Option<Vec<(ScalarValue, ScalarValue)>> { + let bounds = self.bounds.lock(); + let all_bounds: Vec<_> = bounds.iter().collect(); + + if all_bounds.is_empty() { + return None; + } Review Comment: Can we operate directly on the locked `bounds` and eliminate an unnecessary intermediate collection (all_bounds) to compute column-wise minima and maxima? ########## datafusion/physical-plan/src/joins/hash_join.rs: ########## @@ -1695,12 +1858,91 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); + // Handle dynamic filter bounds accumulation + // + // This coordination ensures the dynamic filter contains complete bounds information + // from all relevant partitions before being applied to probe-side scans. + // + // Process: + // 1. Store this partition's bounds in the shared accumulator + // 2. Atomically increment the completion counter + // 3. If we're the last partition to complete, merge all bounds and update the filter + // + // Note: In CollectLeft mode, multiple partitions may access the SAME build data + // (shared via OnceFut), but each partition must report separately to ensure proper + // coordination across all output partitions. + // + // The consequences of not doing this synchronization properly would be that a filter + // with incomplete bounds would be pushed down resulting in incorrect results (missing rows). + if let Some(dynamic_filter) = &self.dynamic_filter { + // Store bounds in the accumulator - this runs once per partition + if let Some(bounds) = &left_data.bounds { + // Only push actual bounds if they exist + self.bounds_accumulator.bounds.lock().push(bounds.clone()); + } + + // Atomically increment the completion counter + // Even empty partitions must report to ensure proper termination + let completed = self + .bounds_accumulator + .completed_partitions + .fetch_add(1, Ordering::SeqCst) + + 1; + let total_partitions = self.bounds_accumulator.total_partitions; + + // Critical synchronization point: Only the last partition updates the filter + // Troubleshooting: If you see "completed > total_partitions", check partition + // count calculation in try_new() - it may not match actual execution calls + if completed == total_partitions { + if let Some(merged_bounds) = self.bounds_accumulator.merge_bounds() { + let filter_expr = self.create_filter_from_bounds(merged_bounds)?; + dynamic_filter.update(filter_expr)?; + } + } Review Comment: The plan can be rerun whenever the same ExecutionPlan object is executed more than once—e.g., if a caller issues multiple collect/execute invocations on a prepared physical plan or reuses the plan across query runs. I think we should reset the bounds_accumulator to clear stale data before exiting this if block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org