This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 380d843bd9 Improve documentation and add Display impl to 
EquivalenceProperties (#12590)
380d843bd9 is described below

commit 380d843bd98589e71f36e4b58f89a76bcfd19c31
Author: Andrew Lamb <and...@nerdnetworks.org>
AuthorDate: Tue Sep 24 10:49:15 2024 -0400

    Improve documentation and add Display impl to EquivalenceProperties (#12590)
---
 .../fuzz_cases/sort_preserving_repartition_fuzz.rs |   2 +-
 .../physical-expr-common/src/physical_expr.rs      |  24 +++-
 datafusion/physical-expr/src/equivalence/class.rs  |  35 ++++++
 datafusion/physical-expr/src/equivalence/mod.rs    |   2 +-
 .../physical-expr/src/equivalence/ordering.rs      |  26 +++-
 .../physical-expr/src/equivalence/properties.rs    | 138 +++++++++++++++++----
 datafusion/physical-plan/src/filter.rs             |   4 +-
 datafusion/physical-plan/src/windows/mod.rs        |   2 +-
 8 files changed, 200 insertions(+), 33 deletions(-)

diff --git 
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index ceae13a469..408cadc35f 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -80,7 +80,7 @@ mod sp_repartition_fuzz_tests {
         // Define a and f are aliases
         eq_properties.add_equal_conditions(col_a, col_f)?;
         // Column e has constant value.
-        eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);
+        eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]);
 
         // Randomly order columns for sorting
         let mut rng = StdRng::seed_from_u64(seed);
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs 
b/datafusion/physical-expr-common/src/physical_expr.rs
index a443a65eaa..cc725cf2ce 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use std::any::Any;
-use std::fmt::{Debug, Display};
+use std::fmt::{Debug, Display, Formatter};
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
@@ -223,3 +223,25 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
         any
     }
 }
+
+/// Returns [`Display`] able a list of [`PhysicalExpr`]
+///
+/// Example output: `[a + 1, b]`
+pub fn format_physical_expr_list(exprs: &[Arc<dyn PhysicalExpr>]) -> impl 
Display + '_ {
+    struct DisplayWrapper<'a>(&'a [Arc<dyn PhysicalExpr>]);
+    impl<'a> Display for DisplayWrapper<'a> {
+        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+            let mut iter = self.0.iter();
+            write!(f, "[")?;
+            if let Some(expr) = iter.next() {
+                write!(f, "{}", expr)?;
+            }
+            for expr in iter {
+                write!(f, ", {}", expr)?;
+            }
+            write!(f, "]")?;
+            Ok(())
+        }
+    }
+    DisplayWrapper(exprs)
+}
diff --git a/datafusion/physical-expr/src/equivalence/class.rs 
b/datafusion/physical-expr/src/equivalence/class.rs
index a0396ea133..00708b4540 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt::Display;
 use std::sync::Arc;
 
 use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
@@ -27,6 +28,7 @@ use crate::{
 
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::JoinType;
+use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
 
 #[derive(Debug, Clone)]
 /// A structure representing a expression known to be constant in a physical 
execution plan.
@@ -101,6 +103,19 @@ impl ConstExpr {
     }
 }
 
+/// Display implementation for `ConstExpr`
+///
+/// Example `c` or `c(across_partitions)`
+impl Display for ConstExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.expr)?;
+        if self.across_partitions {
+            write!(f, "(across_partitions)")?;
+        }
+        Ok(())
+    }
+}
+
 impl From<Arc<dyn PhysicalExpr>> for ConstExpr {
     fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
         Self::new(expr)
@@ -224,6 +239,12 @@ impl EquivalenceClass {
     }
 }
 
+impl Display for EquivalenceClass {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "[{}]", format_physical_expr_list(&self.exprs))
+    }
+}
+
 /// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each
 /// class represents a distinct equivalence class in a relation.
 #[derive(Debug, Clone)]
@@ -575,6 +596,20 @@ impl EquivalenceGroup {
     }
 }
 
+impl Display for EquivalenceGroup {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "[")?;
+        let mut iter = self.iter();
+        if let Some(cls) = iter.next() {
+            write!(f, "{}", cls)?;
+        }
+        for cls in iter {
+            write!(f, ", {}", cls)?;
+        }
+        write!(f, "]")
+    }
+}
+
 #[cfg(test)]
 mod tests {
 
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs 
b/datafusion/physical-expr/src/equivalence/mod.rs
index 50b95c4454..38647f7ca1 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -207,7 +207,7 @@ mod tests {
         // Define a and f are aliases
         eq_properties.add_equal_conditions(col_a, col_f)?;
         // Column e has constant value.
-        eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);
+        eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]);
 
         // Randomly order columns for sorting
         let mut rng = StdRng::seed_from_u64(seed);
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs 
b/datafusion/physical-expr/src/equivalence/ordering.rs
index 49a0de7252..65423033d5 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt::Display;
 use std::hash::Hash;
 use std::sync::Arc;
 
-use arrow_schema::SortOptions;
-
 use crate::equivalence::add_offset_to_expr;
 use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow_schema::SortOptions;
 
 /// An `OrderingEquivalenceClass` object keeps track of different alternative
 /// orderings than can describe a schema. For example, consider the following 
table:
@@ -104,6 +104,11 @@ impl OrderingEquivalenceClass {
         self.remove_redundant_entries();
     }
 
+    /// Adds a single ordering to the existing ordering equivalence class.
+    pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
+        self.add_new_orderings([ordering]);
+    }
+
     /// Removes redundant orderings from this equivalence class. For instance,
     /// if we already have the ordering `[a ASC, b ASC, c DESC]`, then there is
     /// no need to keep ordering `[a ASC, b ASC]` in the state.
@@ -219,6 +224,21 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: 
usize, pre_idx: usize) ->
     false
 }
 
+impl Display for OrderingEquivalenceClass {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "[")?;
+        let mut iter = self.orderings.iter();
+        if let Some(ordering) = iter.next() {
+            write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
+        }
+        for ordering in iter {
+            write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
+        }
+        write!(f, "]")?;
+        Ok(())
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;
@@ -559,7 +579,7 @@ mod tests {
             let constants = constants
                 .into_iter()
                 .map(|expr| 
ConstExpr::from(expr).with_across_partitions(true));
-            eq_properties = eq_properties.add_constants(constants);
+            eq_properties = eq_properties.with_constants(constants);
 
             let reqs = convert_to_sort_exprs(&reqs);
             assert_eq!(
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 365b51e1a4..dc59a1eb83 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt::Display;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
@@ -41,11 +42,16 @@ use 
datafusion_physical_expr_common::utils::ExprPropertiesNode;
 use indexmap::{IndexMap, IndexSet};
 use itertools::Itertools;
 
-/// A `EquivalenceProperties` object stores useful information related to a 
schema.
+/// A `EquivalenceProperties` object stores information known about the output
+/// of a plan node, that can be used to optimize the plan.
+///
 /// Currently, it keeps track of:
-/// - Equivalent expressions, e.g expressions that have same value.
-/// - Valid sort expressions (orderings) for the schema.
-/// - Constants expressions (e.g expressions that are known to have constant 
values).
+/// - Sort expressions (orderings)
+/// - Equivalent expressions: expressions that are known to have same value.
+/// - Constants expressions: expressions that are known to contain a single
+///   constant value.
+///
+/// # Example equivalent sort expressions
 ///
 /// Consider table below:
 ///
@@ -60,9 +66,13 @@ use itertools::Itertools;
 /// └---┴---┘
 /// ```
 ///
-/// where both `a ASC` and `b DESC` can describe the table ordering. With
-/// `EquivalenceProperties`, we can keep track of these different valid sort
-/// expressions and treat `a ASC` and `b DESC` on an equal footing.
+/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
+/// `EquivalenceProperties`, tracks these different valid sort expressions and
+/// treat `a ASC` and `b DESC` on an equal footing. For example if the query
+/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
+/// avoided.
+///
+/// # Example equivalent expressions
 ///
 /// Similarly, consider the table below:
 ///
@@ -77,11 +87,39 @@ use itertools::Itertools;
 /// └---┴---┘
 /// ```
 ///
-/// where columns `a` and `b` always have the same value. We keep track of such
-/// equivalences inside this object. With this information, we can optimize
-/// things like partitioning. For example, if the partition requirement is
-/// `Hash(a)` and output partitioning is `Hash(b)`, then we can deduce that
-/// the existing partitioning satisfies the requirement.
+/// In this case,  columns `a` and `b` always have the same value, which can of
+/// such equivalences inside this object. With this information, Datafusion can
+/// optimize operations such as. For example, if the partition requirement is
+/// `Hash(a)` and output partitioning is `Hash(b)`, then DataFusion avoids
+/// repartitioning the data as the existing partitioning satisfies the
+/// requirement.
+///
+/// # Code Example
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_schema::{Schema, Field, DataType, SchemaRef};
+/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
+/// # use datafusion_physical_expr::expressions::col;
+/// use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
+/// #   Field::new("a", DataType::Int32, false),
+/// #   Field::new("b", DataType::Int32, false),
+/// #   Field::new("c", DataType::Int32, false),
+/// # ]));
+/// # let col_a = col("a", &schema).unwrap();
+/// # let col_b = col("b", &schema).unwrap();
+/// # let col_c = col("c", &schema).unwrap();
+/// // This object represents data that is sorted by a ASC, c DESC
+/// // with a single constant value of b
+/// let mut eq_properties = EquivalenceProperties::new(schema)
+///   .with_constants(vec![ConstExpr::from(col_b)]);
+/// eq_properties.add_new_ordering(vec![
+///   PhysicalSortExpr::new_default(col_a).asc(),
+///   PhysicalSortExpr::new_default(col_c).desc(),
+/// ]);
+///
+/// assert_eq!(eq_properties.to_string(), "order: [a@0 ASC,c@2 DESC], const: 
[b@1]")
+/// ```
 #[derive(Debug, Clone)]
 pub struct EquivalenceProperties {
     /// Collection of equivalence classes that store expressions with the same
@@ -164,7 +202,7 @@ impl EquivalenceProperties {
     pub fn extend(mut self, other: Self) -> Self {
         self.eq_group.extend(other.eq_group);
         self.oeq_class.extend(other.oeq_class);
-        self.add_constants(other.constants)
+        self.with_constants(other.constants)
     }
 
     /// Clears (empties) the ordering equivalence class within this object.
@@ -193,6 +231,11 @@ impl EquivalenceProperties {
         self.oeq_class.add_new_orderings(orderings);
     }
 
+    /// Adds a single ordering to the existing ordering equivalence class.
+    pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
+        self.add_new_orderings([ordering]);
+    }
+
     /// Incorporates the given equivalence group to into the existing
     /// equivalence group within.
     pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
@@ -231,7 +274,13 @@ impl EquivalenceProperties {
     }
 
     /// Track/register physical expressions with constant values.
-    pub fn add_constants(
+    #[deprecated(since = "43.0.0", note = "Use [`with_constants`] instead")]
+    pub fn add_constants(self, constants: impl IntoIterator<Item = ConstExpr>) 
-> Self {
+        self.with_constants(constants)
+    }
+
+    /// Track/register physical expressions with constant values.
+    pub fn with_constants(
         mut self,
         constants: impl IntoIterator<Item = ConstExpr>,
     ) -> Self {
@@ -427,7 +476,7 @@ impl EquivalenceProperties {
             // we add column `a` as constant to the algorithm state. This 
enables us
             // to deduce that `(b + c) ASC` is satisfied, given `a` is 
constant.
             eq_properties = eq_properties
-                
.add_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
+                
.with_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
         }
         true
     }
@@ -924,7 +973,7 @@ impl EquivalenceProperties {
             // an implementation strategy confined to this function.
             for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
                 eq_properties =
-                    
eq_properties.add_constants(std::iter::once(ConstExpr::from(expr)));
+                    
eq_properties.with_constants(std::iter::once(ConstExpr::from(expr)));
                 search_indices.shift_remove(idx);
             }
             // Add new ordered section to the state.
@@ -1050,6 +1099,41 @@ impl EquivalenceProperties {
     }
 }
 
+/// More readable display version of the `EquivalenceProperties`.
+///
+/// Format:
+/// ```text
+/// order: [[a ASC, b ASC], [a ASC, c ASC]], eq: [[a = b], [a = c]], const: [a 
= 1]
+/// ```
+impl Display for EquivalenceProperties {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        if self.eq_group.is_empty()
+            && self.oeq_class.is_empty()
+            && self.constants.is_empty()
+        {
+            return write!(f, "No properties");
+        }
+        if !self.oeq_class.is_empty() {
+            write!(f, "order: {}", self.oeq_class)?;
+        }
+        if !self.eq_group.is_empty() {
+            write!(f, ", eq: {}", self.eq_group)?;
+        }
+        if !self.constants.is_empty() {
+            write!(f, ", const: [")?;
+            let mut iter = self.constants.iter();
+            if let Some(c) = iter.next() {
+                write!(f, "{}", c)?;
+            }
+            for c in iter {
+                write!(f, ", {}", c)?;
+            }
+            write!(f, "]")?;
+        }
+        Ok(())
+    }
+}
+
 /// Calculates the properties of a given [`ExprPropertiesNode`].
 ///
 /// Order information can be retrieved as:
@@ -1477,10 +1561,10 @@ pub fn join_equivalence_properties(
     }
     match join_type {
         JoinType::LeftAnti | JoinType::LeftSemi => {
-            result = result.add_constants(left_constants);
+            result = result.with_constants(left_constants);
         }
         JoinType::RightAnti | JoinType::RightSemi => {
-            result = result.add_constants(right_constants);
+            result = result.with_constants(right_constants);
         }
         _ => {}
     }
@@ -2289,7 +2373,7 @@ mod tests {
         let col_h = &col("h", &test_schema)?;
 
         // Add column h as constant
-        eq_properties = 
eq_properties.add_constants(vec![ConstExpr::from(col_h)]);
+        eq_properties = 
eq_properties.with_constants(vec![ConstExpr::from(col_h)]);
 
         let test_cases = vec![
             // TEST CASE 1
@@ -2563,13 +2647,13 @@ mod tests {
                     for [left, right] in &case.equal_conditions {
                         properties.add_equal_conditions(left, right)?
                     }
-                    properties.add_constants(
+                    properties.with_constants(
                         case.constants.iter().cloned().map(ConstExpr::from),
                     )
                 },
                 // Constants before equal conditions
                 {
-                    let mut properties = base_properties.clone().add_constants(
+                    let mut properties = 
base_properties.clone().with_constants(
                         case.constants.iter().cloned().map(ConstExpr::from),
                     );
                     for [left, right] in &case.equal_conditions {
@@ -2601,6 +2685,12 @@ mod tests {
         Ok(())
     }
 
+    /// Return a new schema with the same types, but new field names
+    ///
+    /// The new field names are the old field names with `text` appended.
+    ///
+    /// For example, the schema "a", "b", "c" becomes "a1", "b1", "c1"
+    /// if `text` is "1".
     fn append_fields(schema: &SchemaRef, text: &str) -> SchemaRef {
         Arc::new(Schema::new(
             schema
@@ -2956,7 +3046,7 @@ mod tests {
                 .map(|expr| ConstExpr::new(Arc::clone(expr)))
                 .collect::<Vec<_>>();
             let mut lhs = EquivalenceProperties::new(Arc::clone(first_schema));
-            lhs = lhs.add_constants(first_constants);
+            lhs = lhs.with_constants(first_constants);
             lhs.add_new_orderings(first_orderings);
 
             let second_orderings = second_child_orderings
@@ -2968,7 +3058,7 @@ mod tests {
                 .map(|expr| ConstExpr::new(Arc::clone(expr)))
                 .collect::<Vec<_>>();
             let mut rhs = 
EquivalenceProperties::new(Arc::clone(second_schema));
-            rhs = rhs.add_constants(second_constants);
+            rhs = rhs.with_constants(second_constants);
             rhs.add_new_orderings(second_orderings);
 
             let union_expected_orderings = union_orderings
@@ -2980,7 +3070,7 @@ mod tests {
                 .map(|expr| ConstExpr::new(Arc::clone(expr)))
                 .collect::<Vec<_>>();
             let mut union_expected_eq = 
EquivalenceProperties::new(Arc::clone(&schema));
-            union_expected_eq = 
union_expected_eq.add_constants(union_constants);
+            union_expected_eq = 
union_expected_eq.with_constants(union_constants);
             union_expected_eq.add_new_orderings(union_expected_orderings);
 
             let actual_union_eq = calculate_union_binary(lhs, rhs)?;
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 3da0f21156..417d2098b0 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -255,11 +255,11 @@ impl FilterExec {
                 ConstExpr::new(expr).with_across_partitions(true)
             });
         // this is for statistics
-        eq_properties = eq_properties.add_constants(constants);
+        eq_properties = eq_properties.with_constants(constants);
         // this is for logical constant (for example: a = '1', then a could be 
marked as a constant)
         // to do: how to deal with multiple situation to represent = (for 
example c1 between 0 and 0)
         eq_properties =
-            eq_properties.add_constants(Self::extend_constants(input, 
predicate));
+            eq_properties.with_constants(Self::extend_constants(input, 
predicate));
 
         let mut output_partitioning = input.output_partitioning().clone();
         // If contains projection, update the PlanProperties.
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 3507b359dd..6e1cb8db5f 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -584,7 +584,7 @@ pub fn get_window_mode(
         }));
     // Treat partition by exprs as constant. During analysis of requirements 
are satisfied.
     let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
-    let partition_by_eqs = input_eqs.add_constants(const_exprs);
+    let partition_by_eqs = input_eqs.with_constants(const_exprs);
     let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys);
     let reverse_order_by_reqs =
         
PhysicalSortRequirement::from_sort_exprs(&reverse_order_bys(orderby_keys));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to