This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22590-5073db19cfb7bfd21cb1a5e6493f115410df3092 in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 1465d6fa4e580638bb8e0987adc33d8892a31547 Author: Gene Bordegaray <[email protected]> AuthorDate: Fri Jun 5 15:44:20 2026 -0400 Add partitioning compatibility API (#22590) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #22589 - EPIC: #22395 - Relevant thread #21992 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Follow-up range partitioning work needs a way to ask whether two physical partitionings describe the same partition map. This is distinct from distribution satisfaction and is needed before optimizer rules can safely use partition-local behavior. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Adds `Partitioning::compatible_with`. - Adds `RangePartitioning::compatible_with`. - Adds tests for hash, range, round-robin, and unknown partitioning compatibility. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes added unit tests for all compatibility ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes. This adds public compatibility helper API on physical partitioning types. <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --- datafusion/physical-expr/src/partitioning.rs | 299 ++++++++++++++++++++++++--- 1 file changed, 276 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 616b4905b4..6009cd995e 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -284,6 +284,54 @@ impl RangePartitioning { self.split_points.len() + 1 } + /// Returns true when `self` and `other` describe the same range partition + /// map. + /// + /// Single-partition range partitionings are always compatible. Otherwise, + /// the two partitionings must have identical split points and equivalent + /// ordering expressions with the same sort options. + pub fn compatible_with( + &self, + other: &Self, + eq_properties: &EquivalenceProperties, + ) -> bool { + if self.partition_count() == 1 && other.partition_count() == 1 { + return true; + } + + if self.split_points != other.split_points + || self.ordering.len() != other.ordering.len() + { + return false; + } + + if !self + .ordering + .iter() + .zip(other.ordering.iter()) + .all(|(left, right)| left.options == right.options) + { + return false; + } + + let left_exprs = self + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::<Vec<_>>(); + let right_exprs = other + .ordering + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::<Vec<_>>(); + + equivalent_exprs(&left_exprs, &right_exprs, eq_properties) + } + + /// Calculates the range partitioning after applying the given projection. + /// + /// Returns `None` if any range key cannot be projected or if projection + /// collapses distinct range keys into duplicate output expressions. fn project( &self, mapping: &ProjectionMapping, @@ -416,6 +464,37 @@ fn compare_scalar_values_for_sort( } } +fn equivalent_exprs( + left: &[Arc<dyn PhysicalExpr>], + right: &[Arc<dyn PhysicalExpr>], + eq_properties: &EquivalenceProperties, +) -> bool { + if physical_exprs_equal(left, right) { + return true; + } + + let eq_groups = eq_properties.eq_group(); + if eq_groups.is_empty() { + return false; + } + + let normalized_left = normalize_exprs(left, eq_properties); + let normalized_right = normalize_exprs(right, eq_properties); + + physical_exprs_equal(&normalized_left, &normalized_right) +} + +fn normalize_exprs( + exprs: &[Arc<dyn PhysicalExpr>], + eq_properties: &EquivalenceProperties, +) -> Vec<Arc<dyn PhysicalExpr>> { + let eq_groups = eq_properties.eq_group(); + exprs + .iter() + .map(|expr| eq_groups.normalize_expr(Arc::clone(expr))) + .collect() +} + /// Represents how a [`Partitioning`] satisfies a [`Distribution`] requirement. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PartitioningSatisfaction { @@ -447,6 +526,42 @@ impl Partitioning { } } + /// Returns true when `self` and `other` describe compatible partition maps. + /// + /// Compatible partition maps can be used for partition-local behavior: if + /// this returns true, partition `i` from both partitionings can be treated + /// as covering the same partition domain. This is stricter than + /// [`Self::satisfaction`], which only answers whether this partitioning can + /// satisfy a required distribution. + pub fn compatible_with( + &self, + other: &Self, + eq_properties: &EquivalenceProperties, + ) -> bool { + if self.partition_count() == 1 && other.partition_count() == 1 { + return true; + } + + match (self, other) { + ( + Partitioning::Hash(left_exprs, left_count), + Partitioning::Hash(right_exprs, right_count), + ) => { + if left_count != right_count { + return false; + } + if left_exprs.is_empty() || right_exprs.is_empty() { + return false; + } + equivalent_exprs(left_exprs, right_exprs, eq_properties) + } + (Partitioning::Range(left), Partitioning::Range(right)) => { + left.compatible_with(right, eq_properties) + } + _ => false, + } + } + /// Returns true if `subset_exprs` is a subset of `exprs`. /// For example: Hash(a, b) is subset of Hash(a) since a partition with all occurrences of /// a distinct (a) must also contain all occurrences of a distinct (a, b) with the same (a). @@ -503,36 +618,23 @@ impl Partitioning { return PartitioningSatisfaction::NotSatisfied; } - // Fast path: exact match - if physical_exprs_equal(required_exprs, partition_exprs) { + if equivalent_exprs(required_exprs, partition_exprs, eq_properties) { return PartitioningSatisfaction::Exact; } - // Normalization path using equivalence groups let eq_groups = eq_properties.eq_group(); if !eq_groups.is_empty() { - let normalized_required_exprs = required_exprs - .iter() - .map(|e| eq_groups.normalize_expr(Arc::clone(e))) - .collect::<Vec<_>>(); - let normalized_partition_exprs = partition_exprs - .iter() - .map(|e| eq_groups.normalize_expr(Arc::clone(e))) - .collect::<Vec<_>>(); - if physical_exprs_equal( - &normalized_required_exprs, - &normalized_partition_exprs, - ) { - return PartitioningSatisfaction::Exact; - } - - if allow_subset - && Self::is_subset_partitioning( + if allow_subset { + let normalized_partition_exprs = + normalize_exprs(partition_exprs, eq_properties); + let normalized_required_exprs = + normalize_exprs(required_exprs, eq_properties); + if Self::is_subset_partitioning( &normalized_partition_exprs, &normalized_required_exprs, - ) - { - return PartitioningSatisfaction::Subset; + ) { + return PartitioningSatisfaction::Subset; + } } } else if allow_subset && Self::is_subset_partitioning(partition_exprs, required_exprs) @@ -1272,6 +1374,157 @@ mod tests { Ok(()) } + #[test] + fn test_range_partitioning_compatible_with() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let mut eq_properties = fixture.eq_properties.clone(); + eq_properties.add_equal_conditions(fixture.col(0), fixture.col(1))?; + + let split_points = vec![int_split_point([10]), int_split_point([20])]; + let range_a = fixture.range([0], split_points.clone()); + let range_a_same = fixture.range([0], split_points.clone()); + let range_b_equivalent = fixture.range([1], split_points.clone()); + let range_b_different_split = fixture.range([1], vec![int_split_point([30])]); + let range_a_desc = RangePartitioning::try_new( + [fixture.range_sort_expr(0, SortOptions::new(true, false))].into(), + vec![int_split_point([10])], + )?; + let single_partition_range_a = fixture.range([0], vec![]); + let single_partition_range_b = fixture.range([1], vec![]); + + assert!(range_a.compatible_with(&range_a_same, &fixture.eq_properties)); + assert!(range_a.compatible_with(&range_b_equivalent, &eq_properties)); + assert!(!range_a.compatible_with(&range_b_equivalent, &fixture.eq_properties)); + assert!(!range_a.compatible_with(&range_b_different_split, &eq_properties)); + assert!(!range_a.compatible_with(&range_a_desc, &eq_properties)); + assert!( + single_partition_range_a + .compatible_with(&single_partition_range_b, &fixture.eq_properties) + ); + + assert!( + fixture + .range_partitioning([0], vec![int_split_point([10])]) + .compatible_with( + &fixture.range_partitioning([1], vec![int_split_point([10])]), + &eq_properties + ) + ); + assert!( + !fixture + .range_partitioning([0], vec![int_split_point([10])]) + .compatible_with( + &fixture.range_partitioning([0], vec![int_split_point([20])]), + &fixture.eq_properties + ) + ); + assert!( + !fixture + .range_partitioning([0], vec![int_split_point([10])]) + .compatible_with( + &fixture.hash_partitioning([0], 2), + &fixture.eq_properties + ) + ); + + Ok(()) + } + + #[test] + fn test_hash_partitioning_compatible_with() -> Result<()> { + let fixture = PartitioningTestFixture::int64(&["a", "b"])?; + let mut eq_properties = fixture.eq_properties.clone(); + eq_properties.add_equal_conditions(fixture.col(0), fixture.col(1))?; + + assert!( + fixture.hash_partitioning([0], 2).compatible_with( + &fixture.hash_partitioning([0], 2), + &fixture.eq_properties + ) + ); + assert!( + fixture + .hash_partitioning([0], 2) + .compatible_with(&fixture.hash_partitioning([1], 2), &eq_properties) + ); + assert!( + !fixture.hash_partitioning([0], 2).compatible_with( + &fixture.hash_partitioning([1], 2), + &fixture.eq_properties + ) + ); + assert!( + !fixture.hash_partitioning([0], 2).compatible_with( + &fixture.hash_partitioning([0], 3), + &fixture.eq_properties + ) + ); + assert!(!fixture.hash_partitioning([0], 2).compatible_with( + &fixture.hash_partitioning([0, 1], 2), + &fixture.eq_properties + )); + assert!( + !Partitioning::Hash(vec![], 2) + .compatible_with(&Partitioning::Hash(vec![], 2), &fixture.eq_properties) + ); + assert!(!fixture.hash_partitioning([0], 2).compatible_with( + &fixture.range_partitioning([0], vec![int_split_point([10])]), + &fixture.eq_properties + )); + assert!( + fixture.hash_partitioning([0], 1).compatible_with( + &Partitioning::RoundRobinBatch(1), + &fixture.eq_properties + ) + ); + + Ok(()) + } + + #[test] + fn test_round_robin_partitioning_compatible_with() { + let eq_properties = EquivalenceProperties::new(Arc::new(Schema::empty())); + + assert!( + Partitioning::RoundRobinBatch(1) + .compatible_with(&Partitioning::RoundRobinBatch(1), &eq_properties) + ); + assert!( + !Partitioning::RoundRobinBatch(2) + .compatible_with(&Partitioning::RoundRobinBatch(2), &eq_properties) + ); + assert!( + Partitioning::RoundRobinBatch(1) + .compatible_with(&Partitioning::UnknownPartitioning(1), &eq_properties) + ); + assert!( + !Partitioning::RoundRobinBatch(2) + .compatible_with(&Partitioning::UnknownPartitioning(2), &eq_properties) + ); + } + + #[test] + fn test_unknown_partitioning_compatible_with() { + let eq_properties = EquivalenceProperties::new(Arc::new(Schema::empty())); + + assert!( + Partitioning::UnknownPartitioning(1) + .compatible_with(&Partitioning::UnknownPartitioning(1), &eq_properties) + ); + assert!( + !Partitioning::UnknownPartitioning(2) + .compatible_with(&Partitioning::UnknownPartitioning(2), &eq_properties) + ); + assert!( + Partitioning::UnknownPartitioning(1) + .compatible_with(&Partitioning::RoundRobinBatch(1), &eq_properties) + ); + assert!( + !Partitioning::UnknownPartitioning(2) + .compatible_with(&Partitioning::RoundRobinBatch(2), &eq_properties) + ); + } + #[test] fn test_multi_partition_range_does_not_satisfy_hash_distribution() -> Result<()> { let fixture = PartitioningTestFixture::int64(&["a", "b"])?; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
