This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a769025f13 Refactor `DependencyMap` and `Dependencies` into structs
(#12761)
a769025f13 is described below
commit a769025f13757d54b4cea745a415ba9093c80b47
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Oct 8 15:56:59 2024 -0400
Refactor `DependencyMap` and `Dependencies` into structs (#12761)
* Improve documentation, make DependencyMap / Dependencies a real struct +
fix stack overflow
* Update datafusion/physical-expr/src/equivalence/properties.rs
Co-authored-by: Berkay Şahin
<[email protected]>
---------
Co-authored-by: Berkay Şahin
<[email protected]>
---
.../physical-expr/src/equivalence/properties.rs | 201 ++++++++++++++++++---
1 file changed, 179 insertions(+), 22 deletions(-)
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 6a1268ef8c..005e5776d3 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::fmt;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::iter::Peekable;
@@ -709,7 +710,7 @@ impl EquivalenceProperties {
/// c ASC: Node {None, HashSet{a ASC}}
/// ```
fn construct_dependency_map(&self, mapping: &ProjectionMapping) ->
DependencyMap {
- let mut dependency_map = IndexMap::new();
+ let mut dependency_map = DependencyMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
@@ -731,13 +732,11 @@ impl EquivalenceProperties {
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
// Add sort expressions that can be projected or referred
to
// by any of the projection expressions to the dependency
map:
- dependency_map
- .entry(sort_expr.clone())
- .or_insert_with(|| DependencyNode {
- target_sort_expr: target_sort_expr.clone(),
- dependencies: IndexSet::new(),
- })
- .insert_dependency(dependency);
+ dependency_map.insert(
+ sort_expr,
+ target_sort_expr.as_ref(),
+ dependency,
+ );
}
if !is_projected {
// If we can not project, stop constructing the dependency
@@ -1257,7 +1256,7 @@ fn referred_dependencies(
// Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
- .keys()
+ .sort_exprs()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = ExprWrapper(Arc::clone(&sort_expr.expr));
@@ -1270,10 +1269,16 @@ fn referred_dependencies(
// Generate all valid dependencies for the source. For example, if the
source
// is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
// `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
- expr_to_sort_exprs
- .values()
+ let dependencies = expr_to_sort_exprs
+ .into_values()
+ .map(Dependencies::into_inner)
+ .collect::<Vec<_>>();
+ dependencies
+ .iter()
.multi_cartesian_product()
- .map(|referred_deps| referred_deps.into_iter().cloned().collect())
+ .map(|referred_deps| {
+ Dependencies::new_from_iter(referred_deps.into_iter().cloned())
+ })
.collect()
}
@@ -1296,7 +1301,9 @@ fn construct_prefix_orderings(
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let mut dep_enumerator = DependencyEnumerator::new();
- dependency_map[relevant_sort_expr]
+ dependency_map
+ .get(relevant_sort_expr)
+ .expect("no relevant sort expr found")
.dependencies
.iter()
.flat_map(|dep| dep_enumerator.construct_orderings(dep,
dependency_map))
@@ -1433,13 +1440,161 @@ impl DependencyNode {
}
}
-// Using `IndexMap` and `IndexSet` makes sure to generate consistent results
across different executions for the same query.
-// We could have used `HashSet`, `HashMap` in place of them without any loss
of functionality.
-// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for
output ordering
-// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g.
concatenated version of the alternative orderings).
-// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent
result, among the possible 2 results in the example above.
-type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
-type Dependencies = IndexSet<PhysicalSortExpr>;
+impl Display for DependencyNode {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ if let Some(target) = &self.target_sort_expr {
+ write!(f, "(target: {}, ", target)?;
+ } else {
+ write!(f, "(")?;
+ }
+ write!(f, "dependencies: [{}])", self.dependencies)
+ }
+}
+
+/// Maps an expression --> DependencyNode
+///
+/// # Debugging / deplaying `DependencyMap`
+///
+/// This structure implements `Display` to assist debugging. For example:
+///
+/// ```text
+/// DependencyMap: {
+/// a@0 ASC --> (target: a@0 ASC, dependencies: [[]])
+/// b@1 ASC --> (target: b@1 ASC, dependencies: [[a@0 ASC, c@2 ASC]])
+/// c@2 ASC --> (target: c@2 ASC, dependencies: [[b@1 ASC, a@0 ASC]])
+/// d@3 ASC --> (target: d@3 ASC, dependencies: [[c@2 ASC, b@1 ASC]])
+/// }
+/// ```
+///
+/// # Note on IndexMap Rationale
+///
+/// Using `IndexMap` (which preserves insert order) to ensure consistent
results
+/// across different executions for the same query. We could have used
+/// `HashSet`, `HashMap` in place of them without any loss of functionality.
+///
+/// As an example, if existing orderings are
+/// 1. `[a ASC, b ASC]`
+/// 2. `[c ASC]` for
+///
+/// Then both the following output orderings are valid
+/// 1. `[a ASC, b ASC, c ASC]`
+/// 2. `[c ASC, a ASC, b ASC]`
+///
+/// (this are both valid as they are concatenated versions of the alternative
+/// orderings). When using `HashSet`, `HashMap` it is not guaranteed to
generate
+/// consistent result, among the possible 2 results in the example above.
+#[derive(Debug)]
+struct DependencyMap {
+ inner: IndexMap<PhysicalSortExpr, DependencyNode>,
+}
+
+impl DependencyMap {
+ fn new() -> Self {
+ Self {
+ inner: IndexMap::new(),
+ }
+ }
+
+ /// Insert a new dependency `sort_expr` --> `dependency` into the map.
+ ///
+ /// If `target_sort_expr` is none, a new entry is created with empty
dependencies.
+ fn insert(
+ &mut self,
+ sort_expr: &PhysicalSortExpr,
+ target_sort_expr: Option<&PhysicalSortExpr>,
+ dependency: Option<&PhysicalSortExpr>,
+ ) {
+ self.inner
+ .entry(sort_expr.clone())
+ .or_insert_with(|| DependencyNode {
+ target_sort_expr: target_sort_expr.cloned(),
+ dependencies: Dependencies::new(),
+ })
+ .insert_dependency(dependency)
+ }
+
+ /// Iterator over (sort_expr, DependencyNode) pairs
+ fn iter(&self) -> impl Iterator<Item = (&PhysicalSortExpr,
&DependencyNode)> {
+ self.inner.iter()
+ }
+
+ /// iterator over all sort exprs
+ fn sort_exprs(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
+ self.inner.keys()
+ }
+
+ /// Return the dependency node for the given sort expression, if any
+ fn get(&self, sort_expr: &PhysicalSortExpr) -> Option<&DependencyNode> {
+ self.inner.get(sort_expr)
+ }
+}
+
+impl Display for DependencyMap {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ writeln!(f, "DependencyMap: {{")?;
+ for (sort_expr, node) in self.inner.iter() {
+ writeln!(f, " {sort_expr} --> {node}")?;
+ }
+ writeln!(f, "}}")
+ }
+}
+
+/// A list of sort expressions that can be calculated from a known set of
+/// dependencies.
+#[derive(Debug, Default, Clone, PartialEq, Eq)]
+struct Dependencies {
+ inner: IndexSet<PhysicalSortExpr>,
+}
+
+impl Display for Dependencies {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "[")?;
+ let mut iter = self.inner.iter();
+ if let Some(dep) = iter.next() {
+ write!(f, "{}", dep)?;
+ }
+ for dep in iter {
+ write!(f, ", {}", dep)?;
+ }
+ write!(f, "]")
+ }
+}
+
+impl Dependencies {
+ /// Create a new empty `Dependencies` instance.
+ fn new() -> Self {
+ Self {
+ inner: IndexSet::new(),
+ }
+ }
+
+ /// Create a new `Dependencies` from an iterator of `PhysicalSortExpr`.
+ fn new_from_iter(iter: impl IntoIterator<Item = PhysicalSortExpr>) -> Self
{
+ Self {
+ inner: iter.into_iter().collect(),
+ }
+ }
+
+ /// Insert a new dependency into the set.
+ fn insert(&mut self, sort_expr: PhysicalSortExpr) {
+ self.inner.insert(sort_expr);
+ }
+
+ /// Iterator over dependencies in the set
+ fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> + Clone {
+ self.inner.iter()
+ }
+
+ /// Return the inner set of dependencies
+ fn into_inner(self) -> IndexSet<PhysicalSortExpr> {
+ self.inner
+ }
+
+ /// Returns true if there are no dependencies
+ fn is_empty(&self) -> bool {
+ self.inner.is_empty()
+ }
+}
/// Contains a mapping of all dependencies we have processed for each sort expr
struct DependencyEnumerator<'a> {
@@ -1487,8 +1642,9 @@ impl<'a> DependencyEnumerator<'a> {
referred_sort_expr: &'a PhysicalSortExpr,
dependency_map: &'a DependencyMap,
) -> Vec<LexOrdering> {
- // We are sure that `referred_sort_expr` is inside `dependency_map`.
- let node = &dependency_map[referred_sort_expr];
+ let node = dependency_map
+ .get(referred_sort_expr)
+ .expect("`referred_sort_expr` should be inside `dependency_map`");
// Since we work on intermediate nodes, we are sure
`val.target_sort_expr`
// exists.
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
@@ -1506,6 +1662,7 @@ impl<'a> DependencyEnumerator<'a> {
} else {
vec![]
};
+
for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]