This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a769025f13 Refactor `DependencyMap` and `Dependencies`   into structs 
(#12761)
a769025f13 is described below

commit a769025f13757d54b4cea745a415ba9093c80b47
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Oct 8 15:56:59 2024 -0400

    Refactor `DependencyMap` and `Dependencies`   into structs (#12761)
    
    * Improve documentation, make DependencyMap / Dependencies a real struct + 
fix stack overflow
    
    * Update datafusion/physical-expr/src/equivalence/properties.rs
    
    Co-authored-by: Berkay Şahin 
<[email protected]>
    
    ---------
    
    Co-authored-by: Berkay Şahin 
<[email protected]>
---
 .../physical-expr/src/equivalence/properties.rs    | 201 ++++++++++++++++++---
 1 file changed, 179 insertions(+), 22 deletions(-)

diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 6a1268ef8c..005e5776d3 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt;
 use std::fmt::Display;
 use std::hash::{Hash, Hasher};
 use std::iter::Peekable;
@@ -709,7 +710,7 @@ impl EquivalenceProperties {
     /// c ASC: Node {None, HashSet{a ASC}}
     /// ```
     fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> 
DependencyMap {
-        let mut dependency_map = IndexMap::new();
+        let mut dependency_map = DependencyMap::new();
         for ordering in self.normalized_oeq_class().iter() {
             for (idx, sort_expr) in ordering.iter().enumerate() {
                 let target_sort_expr =
@@ -731,13 +732,11 @@ impl EquivalenceProperties {
                     let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
                     // Add sort expressions that can be projected or referred 
to
                     // by any of the projection expressions to the dependency 
map:
-                    dependency_map
-                        .entry(sort_expr.clone())
-                        .or_insert_with(|| DependencyNode {
-                            target_sort_expr: target_sort_expr.clone(),
-                            dependencies: IndexSet::new(),
-                        })
-                        .insert_dependency(dependency);
+                    dependency_map.insert(
+                        sort_expr,
+                        target_sort_expr.as_ref(),
+                        dependency,
+                    );
                 }
                 if !is_projected {
                     // If we can not project, stop constructing the dependency
@@ -1257,7 +1256,7 @@ fn referred_dependencies(
     // Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
     let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
     for sort_expr in dependency_map
-        .keys()
+        .sort_exprs()
         .filter(|sort_expr| expr_refers(source, &sort_expr.expr))
     {
         let key = ExprWrapper(Arc::clone(&sort_expr.expr));
@@ -1270,10 +1269,16 @@ fn referred_dependencies(
     // Generate all valid dependencies for the source. For example, if the 
source
     // is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
     // `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
-    expr_to_sort_exprs
-        .values()
+    let dependencies = expr_to_sort_exprs
+        .into_values()
+        .map(Dependencies::into_inner)
+        .collect::<Vec<_>>();
+    dependencies
+        .iter()
         .multi_cartesian_product()
-        .map(|referred_deps| referred_deps.into_iter().cloned().collect())
+        .map(|referred_deps| {
+            Dependencies::new_from_iter(referred_deps.into_iter().cloned())
+        })
         .collect()
 }
 
@@ -1296,7 +1301,9 @@ fn construct_prefix_orderings(
     dependency_map: &DependencyMap,
 ) -> Vec<LexOrdering> {
     let mut dep_enumerator = DependencyEnumerator::new();
-    dependency_map[relevant_sort_expr]
+    dependency_map
+        .get(relevant_sort_expr)
+        .expect("no relevant sort expr found")
         .dependencies
         .iter()
         .flat_map(|dep| dep_enumerator.construct_orderings(dep, 
dependency_map))
@@ -1433,13 +1440,161 @@ impl DependencyNode {
     }
 }
 
-// Using `IndexMap` and `IndexSet` makes sure to generate consistent results 
across different executions for the same query.
-// We could have used `HashSet`, `HashMap` in place of them without any loss 
of functionality.
-// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for 
output ordering
-// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. 
concatenated version of the alternative orderings).
-// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent 
result, among the possible 2 results in the example above.
-type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
-type Dependencies = IndexSet<PhysicalSortExpr>;
+impl Display for DependencyNode {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        if let Some(target) = &self.target_sort_expr {
+            write!(f, "(target: {}, ", target)?;
+        } else {
+            write!(f, "(")?;
+        }
+        write!(f, "dependencies: [{}])", self.dependencies)
+    }
+}
+
+/// Maps an expression --> DependencyNode
+///
+/// # Debugging / deplaying `DependencyMap`
+///
+/// This structure implements `Display` to assist debugging. For example:
+///
+/// ```text
+/// DependencyMap: {
+///   a@0 ASC --> (target: a@0 ASC, dependencies: [[]])
+///   b@1 ASC --> (target: b@1 ASC, dependencies: [[a@0 ASC, c@2 ASC]])
+///   c@2 ASC --> (target: c@2 ASC, dependencies: [[b@1 ASC, a@0 ASC]])
+///   d@3 ASC --> (target: d@3 ASC, dependencies: [[c@2 ASC, b@1 ASC]])
+/// }
+/// ```
+///
+/// # Note on IndexMap Rationale
+///
+/// Using `IndexMap` (which preserves insert order) to ensure consistent 
results
+/// across different executions for the same query. We could have used
+/// `HashSet`, `HashMap` in place of them without any loss of functionality.
+///
+/// As an example, if existing orderings are
+/// 1. `[a ASC, b ASC]`
+/// 2. `[c ASC]` for
+///
+/// Then both the following output orderings are valid
+/// 1. `[a ASC, b ASC, c ASC]`
+/// 2. `[c ASC, a ASC, b ASC]`
+///
+/// (this are both valid as they are concatenated versions of the alternative
+/// orderings). When using `HashSet`, `HashMap` it is not guaranteed to 
generate
+/// consistent result, among the possible 2 results in the example above.
+#[derive(Debug)]
+struct DependencyMap {
+    inner: IndexMap<PhysicalSortExpr, DependencyNode>,
+}
+
+impl DependencyMap {
+    fn new() -> Self {
+        Self {
+            inner: IndexMap::new(),
+        }
+    }
+
+    /// Insert a new dependency `sort_expr` --> `dependency` into the map.
+    ///
+    /// If `target_sort_expr` is none, a new entry is created with empty 
dependencies.
+    fn insert(
+        &mut self,
+        sort_expr: &PhysicalSortExpr,
+        target_sort_expr: Option<&PhysicalSortExpr>,
+        dependency: Option<&PhysicalSortExpr>,
+    ) {
+        self.inner
+            .entry(sort_expr.clone())
+            .or_insert_with(|| DependencyNode {
+                target_sort_expr: target_sort_expr.cloned(),
+                dependencies: Dependencies::new(),
+            })
+            .insert_dependency(dependency)
+    }
+
+    /// Iterator over (sort_expr, DependencyNode) pairs
+    fn iter(&self) -> impl Iterator<Item = (&PhysicalSortExpr, 
&DependencyNode)> {
+        self.inner.iter()
+    }
+
+    /// iterator over all sort exprs
+    fn sort_exprs(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
+        self.inner.keys()
+    }
+
+    /// Return the dependency node for the given sort expression, if any
+    fn get(&self, sort_expr: &PhysicalSortExpr) -> Option<&DependencyNode> {
+        self.inner.get(sort_expr)
+    }
+}
+
+impl Display for DependencyMap {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        writeln!(f, "DependencyMap: {{")?;
+        for (sort_expr, node) in self.inner.iter() {
+            writeln!(f, "  {sort_expr} --> {node}")?;
+        }
+        writeln!(f, "}}")
+    }
+}
+
+/// A list of sort expressions that can be calculated from a known set of
+/// dependencies.
+#[derive(Debug, Default, Clone, PartialEq, Eq)]
+struct Dependencies {
+    inner: IndexSet<PhysicalSortExpr>,
+}
+
+impl Display for Dependencies {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "[")?;
+        let mut iter = self.inner.iter();
+        if let Some(dep) = iter.next() {
+            write!(f, "{}", dep)?;
+        }
+        for dep in iter {
+            write!(f, ", {}", dep)?;
+        }
+        write!(f, "]")
+    }
+}
+
+impl Dependencies {
+    /// Create a new empty `Dependencies` instance.
+    fn new() -> Self {
+        Self {
+            inner: IndexSet::new(),
+        }
+    }
+
+    /// Create a new `Dependencies` from an iterator of `PhysicalSortExpr`.
+    fn new_from_iter(iter: impl IntoIterator<Item = PhysicalSortExpr>) -> Self 
{
+        Self {
+            inner: iter.into_iter().collect(),
+        }
+    }
+
+    /// Insert a new dependency into the set.
+    fn insert(&mut self, sort_expr: PhysicalSortExpr) {
+        self.inner.insert(sort_expr);
+    }
+
+    /// Iterator over  dependencies in the set
+    fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> + Clone {
+        self.inner.iter()
+    }
+
+    /// Return the inner set of dependencies
+    fn into_inner(self) -> IndexSet<PhysicalSortExpr> {
+        self.inner
+    }
+
+    /// Returns true if there are no dependencies
+    fn is_empty(&self) -> bool {
+        self.inner.is_empty()
+    }
+}
 
 /// Contains a mapping of all dependencies we have processed for each sort expr
 struct DependencyEnumerator<'a> {
@@ -1487,8 +1642,9 @@ impl<'a> DependencyEnumerator<'a> {
         referred_sort_expr: &'a PhysicalSortExpr,
         dependency_map: &'a DependencyMap,
     ) -> Vec<LexOrdering> {
-        // We are sure that `referred_sort_expr` is inside `dependency_map`.
-        let node = &dependency_map[referred_sort_expr];
+        let node = dependency_map
+            .get(referred_sort_expr)
+            .expect("`referred_sort_expr` should be inside `dependency_map`");
         // Since we work on intermediate nodes, we are sure 
`val.target_sort_expr`
         // exists.
         let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
@@ -1506,6 +1662,7 @@ impl<'a> DependencyEnumerator<'a> {
                 } else {
                     vec![]
                 };
+
                 for ordering in orderings.iter_mut() {
                     ordering.push(target_sort_expr.clone())
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to