kosiew commented on code in PR #17197:
URL: https://github.com/apache/datafusion/pull/17197#discussion_r2282550804


##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1640,12 +1813,91 @@ impl HashJoinStream {
             .get_shared(cx))?;
         build_timer.done();
 
+        // Handle dynamic filter bounds accumulation
+        //
+        // This coordination ensures the dynamic filter contains complete 
bounds information
+        // from all relevant partitions before being applied to probe-side 
scans.
+        //
+        // Process:
+        // 1. Store this partition's bounds in the shared accumulator
+        // 2. Atomically increment the completion counter
+        // 3. If we're the last partition to complete, merge all bounds and 
update the filter
+        //
+        // Note: In CollectLeft mode, multiple partitions may access the SAME 
build data
+        // (shared via OnceFut), but each partition must report separately to 
ensure proper
+        // coordination across all output partitions.
+        //
+        // The consequences of not doing this syncronization properly would be 
that a filter

Review Comment:
   ```suggestion
           // The consequences of not doing this synchronization properly would 
be that a filter
   ```



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -98,6 +98,151 @@ use parking_lot::Mutex;
 const HASH_JOIN_SEED: RandomState =
     RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
 
+/// Coordinates dynamic filter bounds collection across multiple partitions
+///
+/// This structure ensures that dynamic filters are built with complete 
information from all
+/// relevant partitions before being applied to probe-side scans. Incomplete 
filters would
+/// incorrectly eliminate valid join results.
+///
+/// ## Synchronization Strategy
+///
+/// 1. Each partition computes bounds from its build-side data
+/// 2. Bounds are stored in the shared HashMap (indexed by partition_id)  
+/// 3. A counter tracks how many partitions have reported their bounds
+/// 4. When the last partition reports (completed == total), bounds are merged 
and filter is updated
+///
+/// ## Partition Counting
+///
+/// The `total_partitions` count represents how many times 
`collect_build_side` will be called:
+/// - **CollectLeft**: Number of output partitions (each accesses shared build 
data)
+/// - **Partitioned**: Number of input partitions (each builds independently)
+///
+/// ## Thread Safety
+///
+/// All fields use atomic operations or mutexes to ensure correct coordination 
between concurrent
+/// partition executions.
+struct SharedBoundsAccumulator {
+    /// Bounds from completed partitions.
+    bounds: Mutex<Vec<Vec<(ScalarValue, ScalarValue)>>>,
+    /// Number of partitions that have reported completion.
+    completed_partitions: AtomicUsize,
+    /// Total number of partitions.
+    /// Need to know this so that we can update the dynamic filter once we are 
done
+    /// building *all* of the hash tables.
+    total_partitions: usize,
+}
+
+impl SharedBoundsAccumulator {
+    /// Creates a new SharedBoundsAccumulator configured for the given 
partition mode
+    ///
+    /// This method calculates how many times `collect_build_side` will be 
called based on the
+    /// partition mode's execution pattern. This count is critical for 
determining when we have
+    /// complete information from all partitions to build the dynamic filter.
+    ///
+    /// ## Partition Mode Execution Patterns
+    ///
+    /// - **CollectLeft**: Build side is collected ONCE from partition 0 and 
shared via `OnceFut`
+    ///   across all output partitions. Each output partition calls 
`collect_build_side` to access
+    ///   the shared build data. Expected calls = number of output partitions.
+    ///
+    /// - **Partitioned**: Each partition independently builds its own hash 
table by calling
+    ///   `collect_build_side` once. Expected calls = number of build 
partitions.
+    ///
+    /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as 
safe default since
+    ///   the actual mode will be determined and a new bounds_accumulator 
created before execution.
+    ///
+    /// ## Why This Matters
+    ///
+    /// We cannot build a partial filter from some partitions - it would 
incorrectly eliminate
+    /// valid join results. We must wait until we have complete bounds 
information from ALL
+    /// relevant partitions before updating the dynamic filter.
+    fn new_from_partition_mode(
+        partition_mode: PartitionMode,
+        left_child: &dyn ExecutionPlan,
+        right_child: &dyn ExecutionPlan,
+    ) -> Self {
+        // Troubleshooting: If partition counts are incorrect, verify this 
logic matches
+        // the actual execution pattern in collect_build_side()
+        let expected_calls = match partition_mode {
+            // Each output partition accesses shared build data
+            PartitionMode::CollectLeft => {
+                right_child.output_partitioning().partition_count()

Review Comment:
   Isn't this is fragile?
   
   right_child.output_partitioning().partition_count() reflects the plan at 
planning time. If any Repartition/Coalesce/EnforceDistribution/etc. gets 
inserted between planning and execution (or an upstream operator behaves 
differently at runtime), the actual number of streams created at execute() can 
differ. That can break coordination that relies on a compile-time partition 
count (like your SharedBoundsAccumulator.total_partitions).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to