Dandandan commented on code in PR #17452:
URL: https://github.com/apache/datafusion/pull/17452#discussion_r2760006713


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -253,36 +248,44 @@ impl SharedBoundsAccumulator {
     /// bounds from the current partition, increments the completion counter, 
and when all
     /// partitions have reported, creates an OR'd filter from individual 
partition bounds.
     ///
+    /// This method is async and uses a [`tokio::sync::Barrier`] to wait for 
all partitions
+    /// to report their bounds. Once that occurs, the method will resolve for 
all callers and the
+    /// dynamic filter will be updated exactly once.
+    ///
+    /// # Note
+    ///
+    /// As barriers are reusable, it is likely an error to call this method 
more times than the
+    /// total number of partitions - as it can lead to pending futures that 
never resolve. We rely
+    /// on correct usage from the caller rather than imposing additional 
checks here. If this is a concern,
+    /// consider making the resulting future shared so the ready result can be 
reused.
+    ///
     /// # Arguments
+    /// * `partition` - The partition identifier reporting its bounds
     /// * `partition_bounds` - The bounds computed by this partition (if any)
     ///
     /// # Returns
     /// * `Result<()>` - Ok if successful, Err if filter update failed
-    pub(crate) fn report_partition_bounds(
+    pub(crate) async fn report_partition_bounds(
         &self,
         partition: usize,
         partition_bounds: Option<Vec<ColumnBounds>>,
     ) -> Result<()> {
-        let mut inner = self.inner.lock();
-
         // Store bounds in the accumulator - this runs once per partition
         if let Some(bounds) = partition_bounds {
-            // Only push actual bounds if they exist
-            inner.bounds.push(PartitionBounds::new(partition, bounds));
+            self.inner
+                .lock()
+                .bounds
+                .push(PartitionBounds::new(partition, bounds));
         }
 
-        // Increment the completion counter
-        // Even empty partitions must report to ensure proper termination
-        inner.completed_partitions += 1;
-        let completed = inner.completed_partitions;
-        let total_partitions = self.total_partitions;
-
-        // Critical synchronization point: Only update the filter when ALL 
partitions are complete
-        // Troubleshooting: If you see "completed > total_partitions", check 
partition
-        // count calculation in new_from_partition_mode() - it may not match 
actual execution calls
-        if completed == total_partitions && !inner.bounds.is_empty() {
-            let filter_expr = 
self.create_filter_from_partition_bounds(&inner.bounds)?;
-            self.dynamic_filter.update(filter_expr)?;
+        if self.barrier.wait().await.is_leader() {

Review Comment:
   This unfortunately removes parallelism for `Partitioned` joins, something 
that the in-memory TPC-H benchmark doesn't catch (as it will mostly use 
`CollectLeft`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to