This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b0d7cd0e98 Preserve ordering equivalencies on `with_reorder` (#13770)
b0d7cd0e98 is described below

commit b0d7cd0e98751a3d11a9d6c09bacb54813119d25
Author: Goksel Kabadayi <[email protected]>
AuthorDate: Fri Dec 20 12:51:18 2024 +0300

    Preserve ordering equivalencies on `with_reorder` (#13770)
    
    * Preserve ordering equivalencies on `with_reorder`
    
    * Add assertions
    
    * Return early if filtered_exprs is empty
    
    * Add clarify comment
    
    * Refactor
    
    * Add comprehensive test case
    
    * Add comment for exprs_equal
    
    * Cargo fmt
    
    * Clippy fix
    
    * Update properties.rs
    
    * Update exprs_equal and add tests
    
    * Update properties.rs
    
    ---------
    
    Co-authored-by: berkaysynnada <[email protected]>
---
 datafusion/physical-expr/src/equivalence/class.rs  | 211 ++++++++++++++-
 .../physical-expr/src/equivalence/properties.rs    | 287 ++++++++++++++++++++-
 2 files changed, 494 insertions(+), 4 deletions(-)

diff --git a/datafusion/physical-expr/src/equivalence/class.rs 
b/datafusion/physical-expr/src/equivalence/class.rs
index cc26d12fb0..03b3c7761a 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -626,6 +626,59 @@ impl EquivalenceGroup {
             JoinType::RightSemi | JoinType::RightAnti => 
right_equivalences.clone(),
         }
     }
+
+    /// Checks if two expressions are equal either directly or through 
equivalence classes.
+    /// For complex expressions (e.g. a + b), checks that the expression trees 
are structurally
+    /// identical and their leaf nodes are equivalent either directly or 
through equivalence classes.
+    pub fn exprs_equal(
+        &self,
+        left: &Arc<dyn PhysicalExpr>,
+        right: &Arc<dyn PhysicalExpr>,
+    ) -> bool {
+        // Direct equality check
+        if left.eq(right) {
+            return true;
+        }
+
+        // Check if expressions are equivalent through equivalence classes
+        // We need to check both directions since expressions might be in 
different classes
+        if let Some(left_class) = self.get_equivalence_class(left) {
+            if left_class.contains(right) {
+                return true;
+            }
+        }
+        if let Some(right_class) = self.get_equivalence_class(right) {
+            if right_class.contains(left) {
+                return true;
+            }
+        }
+
+        // For non-leaf nodes, check structural equality
+        let left_children = left.children();
+        let right_children = right.children();
+
+        // If either expression is a leaf node and we haven't found equality 
yet,
+        // they must be different
+        if left_children.is_empty() || right_children.is_empty() {
+            return false;
+        }
+
+        // Type equality check through reflection
+        if left.as_any().type_id() != right.as_any().type_id() {
+            return false;
+        }
+
+        // Check if the number of children is the same
+        if left_children.len() != right_children.len() {
+            return false;
+        }
+
+        // Check if all children are equal
+        left_children
+            .into_iter()
+            .zip(right_children)
+            .all(|(left_child, right_child)| self.exprs_equal(left_child, 
right_child))
+    }
 }
 
 impl Display for EquivalenceGroup {
@@ -647,9 +700,10 @@ mod tests {
 
     use super::*;
     use crate::equivalence::tests::create_test_params;
-    use crate::expressions::{lit, Literal};
+    use crate::expressions::{lit, BinaryExpr, Literal};
 
     use datafusion_common::{Result, ScalarValue};
+    use datafusion_expr::Operator;
 
     #[test]
     fn test_bridge_groups() -> Result<()> {
@@ -777,4 +831,159 @@ mod tests {
         assert!(!cls1.contains_any(&cls3));
         assert!(!cls2.contains_any(&cls3));
     }
+
+    #[test]
+    fn test_exprs_equal() -> Result<()> {
+        struct TestCase {
+            left: Arc<dyn PhysicalExpr>,
+            right: Arc<dyn PhysicalExpr>,
+            expected: bool,
+            description: &'static str,
+        }
+
+        // Create test columns
+        let col_a = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
+        let col_b = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
+        let col_x = Arc::new(Column::new("x", 2)) as Arc<dyn PhysicalExpr>;
+        let col_y = Arc::new(Column::new("y", 3)) as Arc<dyn PhysicalExpr>;
+
+        // Create test literals
+        let lit_1 =
+            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn 
PhysicalExpr>;
+        let lit_2 =
+            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn 
PhysicalExpr>;
+
+        // Create equivalence group with classes (a = x) and (b = y)
+        let eq_group = EquivalenceGroup::new(vec![
+            EquivalenceClass::new(vec![Arc::clone(&col_a), 
Arc::clone(&col_x)]),
+            EquivalenceClass::new(vec![Arc::clone(&col_b), 
Arc::clone(&col_y)]),
+        ]);
+
+        let test_cases = vec![
+            // Basic equality tests
+            TestCase {
+                left: Arc::clone(&col_a),
+                right: Arc::clone(&col_a),
+                expected: true,
+                description: "Same column should be equal",
+            },
+            // Equivalence class tests
+            TestCase {
+                left: Arc::clone(&col_a),
+                right: Arc::clone(&col_x),
+                expected: true,
+                description: "Columns in same equivalence class should be 
equal",
+            },
+            TestCase {
+                left: Arc::clone(&col_b),
+                right: Arc::clone(&col_y),
+                expected: true,
+                description: "Columns in same equivalence class should be 
equal",
+            },
+            TestCase {
+                left: Arc::clone(&col_a),
+                right: Arc::clone(&col_b),
+                expected: false,
+                description:
+                    "Columns in different equivalence classes should not be 
equal",
+            },
+            // Literal tests
+            TestCase {
+                left: Arc::clone(&lit_1),
+                right: Arc::clone(&lit_1),
+                expected: true,
+                description: "Same literal should be equal",
+            },
+            TestCase {
+                left: Arc::clone(&lit_1),
+                right: Arc::clone(&lit_2),
+                expected: false,
+                description: "Different literals should not be equal",
+            },
+            // Complex expression tests
+            TestCase {
+                left: Arc::new(BinaryExpr::new(
+                    Arc::clone(&col_a),
+                    Operator::Plus,
+                    Arc::clone(&col_b),
+                )) as Arc<dyn PhysicalExpr>,
+                right: Arc::new(BinaryExpr::new(
+                    Arc::clone(&col_x),
+                    Operator::Plus,
+                    Arc::clone(&col_y),
+                )) as Arc<dyn PhysicalExpr>,
+                expected: true,
+                description:
+                    "Binary expressions with equivalent operands should be 
equal",
+            },
+            TestCase {
+                left: Arc::new(BinaryExpr::new(
+                    Arc::clone(&col_a),
+                    Operator::Plus,
+                    Arc::clone(&col_b),
+                )) as Arc<dyn PhysicalExpr>,
+                right: Arc::new(BinaryExpr::new(
+                    Arc::clone(&col_x),
+                    Operator::Plus,
+                    Arc::clone(&col_a),
+                )) as Arc<dyn PhysicalExpr>,
+                expected: false,
+                description:
+                    "Binary expressions with non-equivalent operands should 
not be equal",
+            },
+            TestCase {
+                left: Arc::new(BinaryExpr::new(
+                    Arc::clone(&col_a),
+                    Operator::Plus,
+                    Arc::clone(&lit_1),
+                )) as Arc<dyn PhysicalExpr>,
+                right: Arc::new(BinaryExpr::new(
+                    Arc::clone(&col_x),
+                    Operator::Plus,
+                    Arc::clone(&lit_1),
+                )) as Arc<dyn PhysicalExpr>,
+                expected: true,
+                description: "Binary expressions with equivalent column and 
same literal should be equal",
+            },
+            TestCase {
+                left: Arc::new(BinaryExpr::new(
+                    Arc::new(BinaryExpr::new(
+                        Arc::clone(&col_a),
+                        Operator::Plus,
+                        Arc::clone(&col_b),
+                    )),
+                    Operator::Multiply,
+                    Arc::clone(&lit_1),
+                )) as Arc<dyn PhysicalExpr>,
+                right: Arc::new(BinaryExpr::new(
+                    Arc::new(BinaryExpr::new(
+                        Arc::clone(&col_x),
+                        Operator::Plus,
+                        Arc::clone(&col_y),
+                    )),
+                    Operator::Multiply,
+                    Arc::clone(&lit_1),
+                )) as Arc<dyn PhysicalExpr>,
+                expected: true,
+                description: "Nested binary expressions with equivalent 
operands should be equal",
+            },
+        ];
+
+        for TestCase {
+            left,
+            right,
+            expected,
+            description,
+        } in test_cases
+        {
+            let actual = eq_group.exprs_equal(&left, &right);
+            assert_eq!(
+                actual, expected,
+                "{}: Failed comparing {:?} and {:?}, expected {}, got {}",
+                description, left, right, expected, actual
+            );
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index d4814fb4d7..f019b2e570 100755
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -15,12 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt;
 use std::fmt::Display;
 use std::hash::{Hash, Hasher};
 use std::iter::Peekable;
 use std::slice::Iter;
 use std::sync::Arc;
+use std::{fmt, mem};
 
 use super::ordering::collapse_lex_ordering;
 use crate::equivalence::class::const_exprs_contains;
@@ -412,12 +412,51 @@ impl EquivalenceProperties {
     /// Updates the ordering equivalence group within assuming that the table
     /// is re-sorted according to the argument `sort_exprs`. Note that 
constants
     /// and equivalence classes are unchanged as they are unaffected by a 
re-sort.
+    /// If the given ordering is already satisfied, the function does nothing.
     pub fn with_reorder(mut self, sort_exprs: LexOrdering) -> Self {
-        // TODO: In some cases, existing ordering equivalences may still be 
valid add this analysis.
-        self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]);
+        // Filter out constant expressions as they don't affect ordering
+        let filtered_exprs = LexOrdering::new(
+            sort_exprs
+                .into_iter()
+                .filter(|expr| !self.is_expr_constant(&expr.expr))
+                .collect(),
+        );
+
+        if filtered_exprs.is_empty() {
+            return self;
+        }
+
+        let mut new_orderings = vec![filtered_exprs.clone()];
+
+        // Preserve valid suffixes from existing orderings
+        let orderings = mem::take(&mut self.oeq_class.orderings);
+        for existing in orderings {
+            if self.is_prefix_of(&filtered_exprs, &existing) {
+                let mut extended = filtered_exprs.clone();
+                
extended.extend(existing.into_iter().skip(filtered_exprs.len()));
+                new_orderings.push(extended);
+            }
+        }
+
+        self.oeq_class = OrderingEquivalenceClass::new(new_orderings);
         self
     }
 
+    /// Checks if the new ordering matches a prefix of the existing ordering
+    /// (considering expression equivalences)
+    fn is_prefix_of(&self, new_order: &LexOrdering, existing: &LexOrdering) -> 
bool {
+        // Check if new order is longer than existing - can't be a prefix
+        if new_order.len() > existing.len() {
+            return false;
+        }
+
+        // Check if new order matches existing prefix (considering 
equivalences)
+        new_order.iter().zip(existing).all(|(new, existing)| {
+            self.eq_group.exprs_equal(&new.expr, &existing.expr)
+                && new.options == existing.options
+        })
+    }
+
     /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
     /// equivalence group and the ordering equivalence class within.
     ///
@@ -3852,4 +3891,246 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_with_reorder_constant_filtering() -> Result<()> {
+        let schema = create_test_schema()?;
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        // Setup constant columns
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        eq_properties = 
eq_properties.with_constants([ConstExpr::from(&col_a)]);
+
+        let sort_exprs = LexOrdering::new(vec![
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_a),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_b),
+                options: SortOptions::default(),
+            },
+        ]);
+
+        let result = eq_properties.with_reorder(sort_exprs);
+
+        // Should only contain b since a is constant
+        assert_eq!(result.oeq_class().len(), 1);
+        assert_eq!(result.oeq_class().orderings[0].len(), 1);
+        assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_with_reorder_preserve_suffix() -> Result<()> {
+        let schema = create_test_schema()?;
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        let col_c = col("c", &schema)?;
+
+        let asc = SortOptions::default();
+        let desc = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+
+        // Initial ordering: [a ASC, b DESC, c ASC]
+        eq_properties.add_new_orderings([LexOrdering::new(vec![
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_a),
+                options: asc,
+            },
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_b),
+                options: desc,
+            },
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_c),
+                options: asc,
+            },
+        ])]);
+
+        // New ordering: [a ASC]
+        let new_order = LexOrdering::new(vec![PhysicalSortExpr {
+            expr: Arc::clone(&col_a),
+            options: asc,
+        }]);
+
+        let result = eq_properties.with_reorder(new_order);
+
+        // Should only contain [a ASC, b DESC, c ASC]
+        assert_eq!(result.oeq_class().len(), 1);
+        assert_eq!(result.oeq_class().orderings[0].len(), 3);
+        assert!(result.oeq_class().orderings[0][0].expr.eq(&col_a));
+        assert!(result.oeq_class().orderings[0][0].options.eq(&asc));
+        assert!(result.oeq_class().orderings[0][1].expr.eq(&col_b));
+        assert!(result.oeq_class().orderings[0][1].options.eq(&desc));
+        assert!(result.oeq_class().orderings[0][2].expr.eq(&col_c));
+        assert!(result.oeq_class().orderings[0][2].options.eq(&asc));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_with_reorder_equivalent_expressions() -> Result<()> {
+        let schema = create_test_schema()?;
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        let col_c = col("c", &schema)?;
+
+        // Make a and b equivalent
+        eq_properties.add_equal_conditions(&col_a, &col_b)?;
+
+        let asc = SortOptions::default();
+
+        // Initial ordering: [a ASC, c ASC]
+        eq_properties.add_new_orderings([LexOrdering::new(vec![
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_a),
+                options: asc,
+            },
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_c),
+                options: asc,
+            },
+        ])]);
+
+        // New ordering: [b ASC]
+        let new_order = LexOrdering::new(vec![PhysicalSortExpr {
+            expr: Arc::clone(&col_b),
+            options: asc,
+        }]);
+
+        let result = eq_properties.with_reorder(new_order);
+
+        // Should only contain [b ASC, c ASC]
+        assert_eq!(result.oeq_class().len(), 1);
+
+        // Verify orderings
+        assert_eq!(result.oeq_class().orderings[0].len(), 2);
+        assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));
+        assert!(result.oeq_class().orderings[0][0].options.eq(&asc));
+        assert!(result.oeq_class().orderings[0][1].expr.eq(&col_c));
+        assert!(result.oeq_class().orderings[0][1].options.eq(&asc));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_with_reorder_incompatible_prefix() -> Result<()> {
+        let schema = create_test_schema()?;
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+
+        let asc = SortOptions::default();
+        let desc = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
+
+        // Initial ordering: [a ASC, b DESC]
+        eq_properties.add_new_orderings([LexOrdering::new(vec![
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_a),
+                options: asc,
+            },
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_b),
+                options: desc,
+            },
+        ])]);
+
+        // New ordering: [a DESC]
+        let new_order = LexOrdering::new(vec![PhysicalSortExpr {
+            expr: Arc::clone(&col_a),
+            options: desc,
+        }]);
+
+        let result = eq_properties.with_reorder(new_order.clone());
+
+        // Should only contain the new ordering since options don't match
+        assert_eq!(result.oeq_class().len(), 1);
+        assert_eq!(result.oeq_class().orderings[0], new_order);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_with_reorder_comprehensive() -> Result<()> {
+        let schema = create_test_schema()?;
+        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&schema));
+
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        let col_c = col("c", &schema)?;
+        let col_d = col("d", &schema)?;
+        let col_e = col("e", &schema)?;
+
+        let asc = SortOptions::default();
+
+        // Constants: c is constant
+        eq_properties = 
eq_properties.with_constants([ConstExpr::from(&col_c)]);
+
+        // Equality: b = d
+        eq_properties.add_equal_conditions(&col_b, &col_d)?;
+
+        // Orderings: [d ASC, a ASC], [e ASC]
+        eq_properties.add_new_orderings([
+            LexOrdering::new(vec![
+                PhysicalSortExpr {
+                    expr: Arc::clone(&col_d),
+                    options: asc,
+                },
+                PhysicalSortExpr {
+                    expr: Arc::clone(&col_a),
+                    options: asc,
+                },
+            ]),
+            LexOrdering::new(vec![PhysicalSortExpr {
+                expr: Arc::clone(&col_e),
+                options: asc,
+            }]),
+        ]);
+
+        // Initial ordering: [b ASC, c ASC]
+        let new_order = LexOrdering::new(vec![
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_b),
+                options: asc,
+            },
+            PhysicalSortExpr {
+                expr: Arc::clone(&col_c),
+                options: asc,
+            },
+        ]);
+
+        let result = eq_properties.with_reorder(new_order);
+
+        // Should preserve the original [d ASC, a ASC] ordering
+        assert_eq!(result.oeq_class().len(), 1);
+        let ordering = &result.oeq_class().orderings[0];
+        assert_eq!(ordering.len(), 2);
+
+        // First expression should be either b or d (they're equivalent)
+        assert!(
+            ordering[0].expr.eq(&col_b) || ordering[0].expr.eq(&col_d),
+            "Expected b or d as first expression, got {:?}",
+            ordering[0].expr
+        );
+        assert!(ordering[0].options.eq(&asc));
+
+        // Second expression should be a
+        assert!(ordering[1].expr.eq(&col_a));
+        assert!(ordering[1].options.eq(&asc));
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to