This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cea3ada934 fix determinism of HashJoinExec bounds filter creation 
(#17280)
cea3ada934 is described below

commit cea3ada934740750c0c5046934044ac229921e2f
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Fri Aug 22 06:14:41 2025 -0500

    fix determinism of HashJoinExec bounds filter creation (#17280)
---
 .../tests/physical_optimizer/filter_pushdown/mod.rs |  2 +-
 datafusion/physical-plan/src/joins/hash_join.rs     | 21 ++++++++++++++++-----
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index 0972283ffd..6752bc30bc 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -950,7 +950,7 @@ async fn 
test_hashjoin_dynamic_filter_pushdown_partitioned() {
     use datafusion_common::JoinType;
     use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
 
-    // Rouugh plan we're trying to recreate:
+    // Rough sketch of the MRE we're trying to recreate:
     // COPY (select i as k from generate_series(1, 10000000) as t(i))
     // TO 'test_files/scratch/push_down_filter/t1.parquet'
     // STORED AS PARQUET;
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index e749411377..80f1de5a5b 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -92,6 +92,7 @@ use 
datafusion_physical_expr_common::datum::compare_op_for_nested;
 use ahash::RandomState;
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
 use futures::{ready, Stream, StreamExt, TryStreamExt};
+use itertools::Itertools;
 use parking_lot::Mutex;
 
 /// Hard-coded seed to ensure hash values from the hash join differ from 
`RepartitionExec`, avoiding collisions.
@@ -118,14 +119,19 @@ impl ColumnBounds {
 /// This contains the min/max values computed from one partition's build-side 
data.
 #[derive(Debug, Clone)]
 struct PartitionBounds {
+    /// Partition identifier for debugging and determinism (not strictly 
necessary)
+    partition: usize,
     /// Min/max bounds for each join key column in this partition.
     /// Index corresponds to the join key expression index.
     column_bounds: Vec<ColumnBounds>,
 }
 
 impl PartitionBounds {
-    fn new(column_bounds: Vec<ColumnBounds>) -> Self {
-        Self { column_bounds }
+    fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> Self {
+        Self {
+            partition,
+            column_bounds,
+        }
     }
 
     fn len(&self) -> usize {
@@ -258,7 +264,7 @@ impl SharedBoundsAccumulator {
         // Create a predicate for each partition
         let mut partition_predicates = Vec::with_capacity(bounds.len());
 
-        for partition_bounds in bounds.iter() {
+        for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
             // Create range predicates for each join key in this partition
             let mut column_predicates = 
Vec::with_capacity(partition_bounds.len());
 
@@ -320,6 +326,7 @@ impl SharedBoundsAccumulator {
     /// * `Result<()>` - Ok if successful, Err if filter update failed
     fn report_partition_bounds(
         &self,
+        partition: usize,
         partition_bounds: Option<Vec<ColumnBounds>>,
     ) -> Result<()> {
         let mut inner = self.inner.lock();
@@ -327,7 +334,7 @@ impl SharedBoundsAccumulator {
         // Store bounds in the accumulator - this runs once per partition
         if let Some(bounds) = partition_bounds {
             // Only push actual bounds if they exist
-            inner.bounds.push(PartitionBounds::new(bounds));
+            inner.bounds.push(PartitionBounds::new(partition, bounds));
         }
 
         // Increment the completion counter
@@ -1254,6 +1261,7 @@ impl ExecutionPlan for HashJoinExec {
             .collect::<Vec<_>>();
 
         Ok(Box::pin(HashJoinStream {
+            partition,
             schema: self.schema(),
             on_right,
             filter: self.filter.clone(),
@@ -1769,6 +1777,8 @@ impl ProcessProbeBatchState {
 /// 2. Streams [RecordBatch]es as they arrive from the right input (probe) and 
joins
 ///    them with the contents of the hash table
 struct HashJoinStream {
+    /// Partition identifier for debugging and determinism
+    partition: usize,
     /// Input schema
     schema: Arc<Schema>,
     /// equijoin columns from the right (probe side)
@@ -1993,7 +2003,8 @@ impl HashJoinStream {
         // Dynamic filter coordination between partitions:
         // Report bounds to the accumulator which will handle synchronization 
and filter updates
         if let Some(ref bounds_accumulator) = self.bounds_accumulator {
-            
bounds_accumulator.report_partition_bounds(left_data.bounds.clone())?;
+            bounds_accumulator
+                .report_partition_bounds(self.partition, 
left_data.bounds.clone())?;
         }
 
         self.state = HashJoinStreamState::FetchProbeBatch;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to