This is an automated email from the ASF dual-hosted git repository. adriangb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new cea3ada934 fix determinism of HashJoinExec bounds filter creation (#17280) cea3ada934 is described below commit cea3ada934740750c0c5046934044ac229921e2f Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> AuthorDate: Fri Aug 22 06:14:41 2025 -0500 fix determinism of HashJoinExec bounds filter creation (#17280) --- .../tests/physical_optimizer/filter_pushdown/mod.rs | 2 +- datafusion/physical-plan/src/joins/hash_join.rs | 21 ++++++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 0972283ffd..6752bc30bc 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -950,7 +950,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { use datafusion_common::JoinType; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; - // Rouugh plan we're trying to recreate: + // Rough sketch of the MRE we're trying to recreate: // COPY (select i as k from generate_series(1, 10000000) as t(i)) // TO 'test_files/scratch/push_down_filter/t1.parquet' // STORED AS PARQUET; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e749411377..80f1de5a5b 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -92,6 +92,7 @@ use datafusion_physical_expr_common::datum::compare_op_for_nested; use ahash::RandomState; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::{ready, Stream, StreamExt, TryStreamExt}; +use itertools::Itertools; use parking_lot::Mutex; /// Hard-coded seed to ensure hash values from the hash join differ from `RepartitionExec`, avoiding collisions. @@ -118,14 +119,19 @@ impl ColumnBounds { /// This contains the min/max values computed from one partition's build-side data. #[derive(Debug, Clone)] struct PartitionBounds { + /// Partition identifier for debugging and determinism (not strictly necessary) + partition: usize, /// Min/max bounds for each join key column in this partition. /// Index corresponds to the join key expression index. column_bounds: Vec<ColumnBounds>, } impl PartitionBounds { - fn new(column_bounds: Vec<ColumnBounds>) -> Self { - Self { column_bounds } + fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> Self { + Self { + partition, + column_bounds, + } } fn len(&self) -> usize { @@ -258,7 +264,7 @@ impl SharedBoundsAccumulator { // Create a predicate for each partition let mut partition_predicates = Vec::with_capacity(bounds.len()); - for partition_bounds in bounds.iter() { + for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) { // Create range predicates for each join key in this partition let mut column_predicates = Vec::with_capacity(partition_bounds.len()); @@ -320,6 +326,7 @@ impl SharedBoundsAccumulator { /// * `Result<()>` - Ok if successful, Err if filter update failed fn report_partition_bounds( &self, + partition: usize, partition_bounds: Option<Vec<ColumnBounds>>, ) -> Result<()> { let mut inner = self.inner.lock(); @@ -327,7 +334,7 @@ impl SharedBoundsAccumulator { // Store bounds in the accumulator - this runs once per partition if let Some(bounds) = partition_bounds { // Only push actual bounds if they exist - inner.bounds.push(PartitionBounds::new(bounds)); + inner.bounds.push(PartitionBounds::new(partition, bounds)); } // Increment the completion counter @@ -1254,6 +1261,7 @@ impl ExecutionPlan for HashJoinExec { .collect::<Vec<_>>(); Ok(Box::pin(HashJoinStream { + partition, schema: self.schema(), on_right, filter: self.filter.clone(), @@ -1769,6 +1777,8 @@ impl ProcessProbeBatchState { /// 2. Streams [RecordBatch]es as they arrive from the right input (probe) and joins /// them with the contents of the hash table struct HashJoinStream { + /// Partition identifier for debugging and determinism + partition: usize, /// Input schema schema: Arc<Schema>, /// equijoin columns from the right (probe side) @@ -1993,7 +2003,8 @@ impl HashJoinStream { // Dynamic filter coordination between partitions: // Report bounds to the accumulator which will handle synchronization and filter updates if let Some(ref bounds_accumulator) = self.bounds_accumulator { - bounds_accumulator.report_partition_bounds(left_data.bounds.clone())?; + bounds_accumulator + .report_partition_bounds(self.partition, left_data.bounds.clone())?; } self.state = HashJoinStreamState::FetchProbeBatch; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org