This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c72b98e414 Enhance/Refactor Ordering Equivalence Properties (#7566)
c72b98e414 is described below
commit c72b98e41489c09d02cdb5e335c547cfdc5319c4
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Sep 18 13:23:41 2023 +0300
Enhance/Refactor Ordering Equivalence Properties (#7566)
* separate implementation of oeq properties
* Simplifications
* Move utils to methods
* Remove unnecesary code
* Address todo
* Buggy is_aggressive mod eklenecek
* start implementing aggressive mode
* all tests pass
* minor changes
* All tests pass
* Minor changes
* All tests pass
* minor changes
* all tests pass
* Simplifications
* minor changes
* Resolve linter error
* Minor changes
* minor changes
* Update plan
* Simplifications, update comments
* Update comments, Use existing stats to find constants
* Simplifications
* Unknown input stats are handled
* Address reviews
* Simplifications
* Simplifications
* Address reviews
* Fix subdirectories
---------
Co-authored-by: berkaysynnada <[email protected]>
---
datafusion/common/src/stats.rs | 24 +
.../src/physical_optimizer/enforce_distribution.rs | 24 +-
datafusion/physical-expr/src/analysis.rs | 5 +-
datafusion/physical-expr/src/equivalence.rs | 604 +++++++++++++++++++--
datafusion/physical-expr/src/lib.rs | 10 +-
datafusion/physical-expr/src/partitioning.rs | 19 +-
datafusion/physical-expr/src/utils.rs | 446 +++++----------
datafusion/physical-plan/src/filter.rs | 26 +-
datafusion/physical-plan/src/joins/utils.rs | 228 +++-----
datafusion/physical-plan/src/memory.rs | 6 +-
datafusion/physical-plan/src/projection.rs | 6 +-
datafusion/sqllogictest/test_files/select.slt | 106 ++++
datafusion/sqllogictest/test_files/subquery.slt | 25 +-
datafusion/sqllogictest/test_files/window.slt | 9 +-
14 files changed, 952 insertions(+), 586 deletions(-)
diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index db788efef7..ca76e14cb8 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -19,6 +19,8 @@
use std::fmt::Display;
+use arrow::datatypes::DataType;
+
use crate::ScalarValue;
/// Statistics for a relation
@@ -70,3 +72,25 @@ pub struct ColumnStatistics {
/// Number of distinct values
pub distinct_count: Option<usize>,
}
+
+impl ColumnStatistics {
+ /// Column contains a single non null value (e.g constant).
+ pub fn is_singleton(&self) -> bool {
+ match (&self.min_value, &self.max_value) {
+ // Min and max values are the same and not infinity.
+ (Some(min), Some(max)) => !min.is_null() && !max.is_null() && (min
== max),
+ (_, _) => false,
+ }
+ }
+
+ /// Returns the [`ColumnStatistics`] corresponding to the given datatype
by assigning infinite bounds.
+ pub fn new_with_unbounded_column(dt: &DataType) -> ColumnStatistics {
+ let null = ScalarValue::try_from(dt.clone()).ok();
+ ColumnStatistics {
+ null_count: None,
+ max_value: null.clone(),
+ min_value: null,
+ distinct_count: None,
+ }
+ }
+}
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 77d6e7d712..565f76affa 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -54,8 +54,7 @@ use datafusion_physical_expr::utils::{
map_columns_before_projection, ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{
- expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
PhysicalExpr,
- PhysicalSortRequirement,
+ expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
};
use datafusion_common::internal_err;
@@ -807,36 +806,21 @@ fn try_reorder(
} else if !equivalence_properties.classes().is_empty() {
normalized_expected = expected
.iter()
- .map(|e| {
- normalize_expr_with_equivalence_properties(
- e.clone(),
- equivalence_properties.classes(),
- )
- })
+ .map(|e| equivalence_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(normalized_expected.len(), expected.len());
normalized_left_keys = join_keys
.left_keys
.iter()
- .map(|e| {
- normalize_expr_with_equivalence_properties(
- e.clone(),
- equivalence_properties.classes(),
- )
- })
+ .map(|e| equivalence_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());
normalized_right_keys = join_keys
.right_keys
.iter()
- .map(|e| {
- normalize_expr_with_equivalence_properties(
- e.clone(),
- equivalence_properties.classes(),
- )
- })
+ .map(|e| equivalence_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());
diff --git a/datafusion/physical-expr/src/analysis.rs
b/datafusion/physical-expr/src/analysis.rs
index d3fcdc11ad..990c643c6b 100644
--- a/datafusion/physical-expr/src/analysis.rs
+++ b/datafusion/physical-expr/src/analysis.rs
@@ -189,12 +189,15 @@ fn shrink_boundaries(
})?;
let final_result = graph.get_interval(*root_index);
+ // If during selectivity calculation we encounter an error, use 1.0 as
cardinality estimate
+ // safest estimate(e.q largest possible value).
let selectivity = calculate_selectivity(
&final_result.lower.value,
&final_result.upper.value,
&target_boundaries,
&initial_boundaries,
- )?;
+ )
+ .unwrap_or(1.0);
if !(0.0..=1.0).contains(&selectivity) {
return internal_err!("Selectivity is out of limit: {}", selectivity);
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index b8ca1acc1c..369c139aa3 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,34 +15,37 @@
// specific language governing permissions and limitations
// under the License.
-use crate::expressions::Column;
-use crate::utils::collect_columns;
+use crate::expressions::{CastExpr, Column};
+use crate::utils::{collect_columns, merge_vectors};
use crate::{
- normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
- PhysicalSortExpr,
+ LexOrdering, LexOrderingRef, LexOrderingReq, PhysicalExpr,
PhysicalSortExpr,
+ PhysicalSortRequirement,
};
use arrow::datatypes::SchemaRef;
use arrow_schema::Fields;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::Result;
+use itertools::izip;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
+use std::ops::Range;
use std::sync::Arc;
/// Represents a collection of [`EquivalentClass`] (equivalences
/// between columns in relations)
///
-/// This is used to represent both:
+/// This is used to represent:
///
/// 1. Equality conditions (like `A=B`), when `T` = [`Column`]
-/// 2. Ordering (like `A ASC = B ASC`), when `T` = [`PhysicalSortExpr`]
#[derive(Debug, Clone)]
-pub struct EquivalenceProperties<T = Column> {
- classes: Vec<EquivalentClass<T>>,
+pub struct EquivalenceProperties {
+ classes: Vec<EquivalentClass<Column>>,
schema: SchemaRef,
}
-impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
+impl EquivalenceProperties {
pub fn new(schema: SchemaRef) -> Self {
EquivalenceProperties {
classes: vec![],
@@ -51,7 +54,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
}
/// return the set of equivalences
- pub fn classes(&self) -> &[EquivalentClass<T>] {
+ pub fn classes(&self) -> &[EquivalentClass<Column>] {
&self.classes
}
@@ -60,7 +63,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
}
/// Add the [`EquivalentClass`] from `iter` to this list
- pub fn extend<I: IntoIterator<Item = EquivalentClass<T>>>(&mut self, iter:
I) {
+ pub fn extend<I: IntoIterator<Item = EquivalentClass<Column>>>(&mut self,
iter: I) {
for ec in iter {
self.classes.push(ec)
}
@@ -68,7 +71,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
/// Adds new equal conditions into the EquivalenceProperties. New equal
/// conditions usually come from equality predicates in a join/filter.
- pub fn add_equal_conditions(&mut self, new_conditions: (&T, &T)) {
+ pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column))
{
let mut idx1: Option<usize> = None;
let mut idx2: Option<usize> = None;
for (idx, class) in self.classes.iter_mut().enumerate() {
@@ -106,7 +109,7 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
}
(None, None) => {
// adding new pairs
- self.classes.push(EquivalentClass::<T>::new(
+ self.classes.push(EquivalentClass::<Column>::new(
new_conditions.0.clone(),
vec![new_conditions.1.clone()],
));
@@ -114,6 +117,81 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
_ => {}
}
}
+
+ /// Normalizes physical expression according to `EquivalentClass`es inside
`self.classes`.
+ /// expression is replaced with `EquivalentClass::head` expression if it
is among `EquivalentClass::others`.
+ pub fn normalize_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn
PhysicalExpr> {
+ expr.clone()
+ .transform(&|expr| {
+ let normalized_form =
+ expr.as_any().downcast_ref::<Column>().and_then(|column| {
+ for class in &self.classes {
+ if class.contains(column) {
+ return Some(Arc::new(class.head().clone()) as
_);
+ }
+ }
+ None
+ });
+ Ok(if let Some(normalized_form) = normalized_form {
+ Transformed::Yes(normalized_form)
+ } else {
+ Transformed::No(expr)
+ })
+ })
+ .unwrap_or(expr)
+ }
+
+ /// This function applies the \[`normalize_expr`]
+ /// function for all expression in `exprs` and returns a vector of
+ /// normalized physical expressions.
+ pub fn normalize_exprs(
+ &self,
+ exprs: &[Arc<dyn PhysicalExpr>],
+ ) -> Vec<Arc<dyn PhysicalExpr>> {
+ exprs
+ .iter()
+ .map(|expr| self.normalize_expr(expr.clone()))
+ .collect::<Vec<_>>()
+ }
+
+ /// This function normalizes `sort_requirement` according to
`EquivalenceClasses` in the `self`.
+ /// If the given sort requirement doesn't belong to equivalence set inside
+ /// `self`, it returns `sort_requirement` as is.
+ pub fn normalize_sort_requirement(
+ &self,
+ mut sort_requirement: PhysicalSortRequirement,
+ ) -> PhysicalSortRequirement {
+ sort_requirement.expr = self.normalize_expr(sort_requirement.expr);
+ sort_requirement
+ }
+
+ /// This function applies the \[`normalize_sort_requirement`]
+ /// function for all sort requirements in `sort_reqs` and returns a vector
of
+ /// normalized sort expressions.
+ pub fn normalize_sort_requirements(
+ &self,
+ sort_reqs: &[PhysicalSortRequirement],
+ ) -> Vec<PhysicalSortRequirement> {
+ let normalized_sort_reqs = sort_reqs
+ .iter()
+ .map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
+ .collect::<Vec<_>>();
+ collapse_vec(normalized_sort_reqs)
+ }
+
+ /// Similar to the \[`normalize_sort_requirements`] this function
normalizes
+ /// sort expressions in `sort_exprs` and returns a vector of
+ /// normalized sort expressions.
+ pub fn normalize_sort_exprs(
+ &self,
+ sort_exprs: &[PhysicalSortExpr],
+ ) -> Vec<PhysicalSortExpr> {
+ let sort_requirements =
+ PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+ let normalized_sort_requirement =
+ self.normalize_sort_requirements(&sort_requirements);
+ PhysicalSortRequirement::to_sort_exprs(normalized_sort_requirement)
+ }
}
/// `OrderingEquivalenceProperties` keeps track of columns that describe the
@@ -131,17 +209,120 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
/// where both `a ASC` and `b DESC` can describe the table ordering. With
/// `OrderingEquivalenceProperties`, we can keep track of these equivalences
/// and treat `a ASC` and `b DESC` as the same ordering requirement.
-pub type OrderingEquivalenceProperties = EquivalenceProperties<LexOrdering>;
+#[derive(Debug, Clone)]
+pub struct OrderingEquivalenceProperties {
+ oeq_class: Option<OrderingEquivalentClass>,
+ /// Keeps track of expressions that have constant value.
+ constants: Vec<Arc<dyn PhysicalExpr>>,
+ schema: SchemaRef,
+}
impl OrderingEquivalenceProperties {
+ /// Create an empty `OrderingEquivalenceProperties`
+ pub fn new(schema: SchemaRef) -> Self {
+ Self {
+ oeq_class: None,
+ constants: vec![],
+ schema,
+ }
+ }
+
+ /// Extends `OrderingEquivalenceProperties` by adding ordering inside the
`other`
+ /// to the `self.oeq_class`.
+ pub fn extend(&mut self, other: Option<OrderingEquivalentClass>) {
+ if let Some(other) = other {
+ if let Some(class) = &mut self.oeq_class {
+ class.others.insert(other.head);
+ class.others.extend(other.others);
+ } else {
+ self.oeq_class = Some(other);
+ }
+ }
+ }
+
+ pub fn oeq_class(&self) -> Option<&OrderingEquivalentClass> {
+ self.oeq_class.as_ref()
+ }
+
+ /// Adds new equal conditions into the EquivalenceProperties. New equal
+ /// conditions usually come from equality predicates in a join/filter.
+ pub fn add_equal_conditions(&mut self, new_conditions: (&LexOrdering,
&LexOrdering)) {
+ if let Some(class) = &mut self.oeq_class {
+ class.insert(new_conditions.0.clone());
+ class.insert(new_conditions.1.clone());
+ } else {
+ let head = new_conditions.0.clone();
+ let others = vec![new_conditions.1.clone()];
+ self.oeq_class = Some(OrderingEquivalentClass::new(head, others))
+ }
+ }
+
+ /// Add physical expression that have constant value to the
`self.constants`
+ pub fn with_constants(mut self, constants: Vec<Arc<dyn PhysicalExpr>>) ->
Self {
+ constants.into_iter().for_each(|constant| {
+ if !physical_exprs_contains(&self.constants, &constant) {
+ self.constants.push(constant);
+ }
+ });
+ self
+ }
+
+ pub fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// This function normalizes `sort_reqs` by
+ /// - removing expressions that have constant value from requirement
+ /// - replacing sections that are in the `self.oeq_class.others` with
`self.oeq_class.head`
+ /// - removing sections that satisfies global ordering that are in the
post fix of requirement
+ pub fn normalize_sort_requirements(
+ &self,
+ sort_reqs: &[PhysicalSortRequirement],
+ ) -> Vec<PhysicalSortRequirement> {
+ let normalized_sort_reqs =
+ prune_sort_reqs_with_constants(sort_reqs, &self.constants);
+ let mut normalized_sort_reqs = collapse_lex_req(normalized_sort_reqs);
+ if let Some(oeq_class) = &self.oeq_class {
+ for item in oeq_class.others() {
+ let item = PhysicalSortRequirement::from_sort_exprs(item);
+ let item = prune_sort_reqs_with_constants(&item,
&self.constants);
+ let ranges = get_compatible_ranges(&normalized_sort_reqs,
&item);
+ let mut offset: i64 = 0;
+ for Range { start, end } in ranges {
+ let head =
PhysicalSortRequirement::from_sort_exprs(oeq_class.head());
+ let mut head = prune_sort_reqs_with_constants(&head,
&self.constants);
+ let updated_start = (start as i64 + offset) as usize;
+ let updated_end = (end as i64 + offset) as usize;
+ let range = end - start;
+ offset += head.len() as i64 - range as i64;
+ let all_none =
normalized_sort_reqs[updated_start..updated_end]
+ .iter()
+ .all(|req| req.options.is_none());
+ if all_none {
+ for req in head.iter_mut() {
+ req.options = None;
+ }
+ }
+ normalized_sort_reqs.splice(updated_start..updated_end,
head);
+ }
+ }
+ normalized_sort_reqs = simplify_lex_req(normalized_sort_reqs,
oeq_class);
+ }
+ collapse_lex_req(normalized_sort_reqs)
+ }
+
/// Checks whether `leading_ordering` is contained in any of the ordering
/// equivalence classes.
pub fn satisfies_leading_ordering(
&self,
leading_ordering: &PhysicalSortExpr,
) -> bool {
- for cls in &self.classes {
- for ordering in
cls.others.iter().chain(std::iter::once(&cls.head)) {
+ if let Some(oeq_class) = &self.oeq_class {
+ for ordering in oeq_class
+ .others
+ .iter()
+ .chain(std::iter::once(&oeq_class.head))
+ {
if ordering[0].eq(leading_ordering) {
return true;
}
@@ -280,6 +461,55 @@ impl OrderingEquivalentClass {
self.insert(update_with_alias(ordering, oeq_alias_map));
}
}
+
+ /// Adds `offset` value to the index of each expression inside `self.head`
and `self.others`.
+ pub fn add_offset(&self, offset: usize) -> Result<OrderingEquivalentClass>
{
+ let head = add_offset_to_lex_ordering(self.head(), offset)?;
+ let others = self
+ .others()
+ .iter()
+ .map(|ordering| add_offset_to_lex_ordering(ordering, offset))
+ .collect::<Result<Vec<_>>>()?;
+ Ok(OrderingEquivalentClass::new(head, others))
+ }
+
+ /// This function normalizes `OrderingEquivalenceProperties` according to
`eq_properties`.
+ /// More explicitly, it makes sure that expressions in `oeq_class` are
head entries
+ /// in `eq_properties`, replacing any non-head entries with head entries
if necessary.
+ pub fn normalize_with_equivalence_properties(
+ &self,
+ eq_properties: &EquivalenceProperties,
+ ) -> OrderingEquivalentClass {
+ let head = eq_properties.normalize_sort_exprs(self.head());
+
+ let others = self
+ .others()
+ .iter()
+ .map(|other| eq_properties.normalize_sort_exprs(other))
+ .collect();
+
+ EquivalentClass::new(head, others)
+ }
+
+ /// Prefix with existing ordering.
+ pub fn prefix_ordering_equivalent_class_with_existing_ordering(
+ &self,
+ existing_ordering: &[PhysicalSortExpr],
+ eq_properties: &EquivalenceProperties,
+ ) -> OrderingEquivalentClass {
+ let existing_ordering =
eq_properties.normalize_sort_exprs(existing_ordering);
+ let normalized_head = eq_properties.normalize_sort_exprs(self.head());
+ let updated_head = merge_vectors(&existing_ordering, &normalized_head);
+ let updated_others = self
+ .others()
+ .iter()
+ .map(|ordering| {
+ let normalized_ordering =
eq_properties.normalize_sort_exprs(ordering);
+ merge_vectors(&existing_ordering, &normalized_ordering)
+ })
+ .collect();
+ OrderingEquivalentClass::new(updated_head, updated_others)
+ }
}
/// This is a builder object facilitating incremental construction
@@ -308,7 +538,7 @@ impl OrderingEquivalenceBuilder {
new_ordering_eq_properties: OrderingEquivalenceProperties,
) -> Self {
self.ordering_eq_properties
- .extend(new_ordering_eq_properties.classes().iter().cloned());
+ .extend(new_ordering_eq_properties.oeq_class().cloned());
self
}
@@ -334,10 +564,7 @@ impl OrderingEquivalenceBuilder {
let mut normalized_out_ordering = vec![];
for item in &self.existing_ordering {
// To account for ordering equivalences, first normalize the
expression:
- let normalized = normalize_expr_with_equivalence_properties(
- item.expr.clone(),
- self.eq_properties.classes(),
- );
+ let normalized =
self.eq_properties.normalize_expr(item.expr.clone());
normalized_out_ordering.push(PhysicalSortExpr {
expr: normalized,
options: item.options,
@@ -459,40 +686,77 @@ pub fn project_ordering_equivalence_properties(
let schema = output_eq.schema();
let fields = schema.fields();
- let mut eq_classes = input_eq.classes().to_vec();
+ let oeq_class = input_eq.oeq_class();
+ let mut oeq_class = if let Some(oeq_class) = oeq_class {
+ oeq_class.clone()
+ } else {
+ return;
+ };
let mut oeq_alias_map = vec![];
for (column, columns) in columns_map {
if is_column_invalid_in_new_schema(column, fields) {
oeq_alias_map.push((column.clone(), columns[0].clone()));
}
}
- for class in eq_classes.iter_mut() {
- class.update_with_aliases(&oeq_alias_map, fields);
- }
+ oeq_class.update_with_aliases(&oeq_alias_map, fields);
- // Prune columns that are no longer in the schema from the
OrderingEquivalenceProperties.
- for class in eq_classes.iter_mut() {
- let sort_exprs_to_remove = class
- .iter()
- .filter(|sort_exprs| {
- sort_exprs.iter().any(|sort_expr| {
- let cols_in_expr = collect_columns(&sort_expr.expr);
- // If any one of the columns, used in Expression is
invalid, remove expression
- // from ordering equivalences
- cols_in_expr
- .iter()
- .any(|col| is_column_invalid_in_new_schema(col,
fields))
- })
+ // Prune columns that no longer is in the schema from from the
OrderingEquivalenceProperties.
+ let sort_exprs_to_remove = oeq_class
+ .iter()
+ .filter(|sort_exprs| {
+ sort_exprs.iter().any(|sort_expr| {
+ let cols_in_expr = collect_columns(&sort_expr.expr);
+ // If any one of the columns, used in Expression is invalid,
remove expression
+ // from ordering equivalences
+ cols_in_expr
+ .iter()
+ .any(|col| is_column_invalid_in_new_schema(col, fields))
})
- .cloned()
- .collect::<Vec<_>>();
- for sort_exprs in sort_exprs_to_remove {
- class.remove(&sort_exprs);
+ })
+ .cloned()
+ .collect::<Vec<_>>();
+ for sort_exprs in sort_exprs_to_remove {
+ oeq_class.remove(&sort_exprs);
+ }
+ if oeq_class.len() > 1 {
+ output_eq.extend(Some(oeq_class));
+ }
+}
+
+/// Update `ordering` if it contains cast expression with target column
+/// after projection, if there is no cast expression among `ordering`
expressions,
+/// returns `None`.
+fn update_with_cast_exprs(
+ cast_exprs: &[(CastExpr, Column)],
+ mut ordering: LexOrdering,
+) -> Option<LexOrdering> {
+ let mut is_changed = false;
+ for sort_expr in ordering.iter_mut() {
+ for (cast_expr, target_col) in cast_exprs.iter() {
+ if sort_expr.expr.eq(cast_expr.expr()) {
+ sort_expr.expr = Arc::new(target_col.clone()) as _;
+ is_changed = true;
+ }
}
}
- eq_classes.retain(|props| props.len() > 1);
+ is_changed.then_some(ordering)
+}
- output_eq.extend(eq_classes);
+/// Update cast expressions inside ordering equivalence
+/// properties with its target column after projection
+pub fn update_ordering_equivalence_with_cast(
+ cast_exprs: &[(CastExpr, Column)],
+ input_oeq: &mut OrderingEquivalenceProperties,
+) {
+ if let Some(cls) = &mut input_oeq.oeq_class {
+ for ordering in
+
std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter())
+ {
+ if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs,
ordering) {
+ cls.insert(updated_ordering);
+ }
+ }
+ }
}
/// Retrieves the ordering equivalence properties for a given schema and
output ordering.
@@ -516,6 +780,197 @@ pub fn ordering_equivalence_properties_helper(
oep
}
+/// This function constructs a duplicate-free vector by filtering out duplicate
+/// entries inside the given vector `input`.
+fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
+ let mut output = vec![];
+ for item in input {
+ if !output.contains(&item) {
+ output.push(item);
+ }
+ }
+ output
+}
+
+/// This function constructs a duplicate-free `LexOrderingReq` by filtering
out duplicate
+/// entries that have same physical expression inside the given vector `input`.
+/// `vec![a Some(Asc), a Some(Desc)]` is collapsed to the `vec![a Some(Asc)]`.
Since
+/// when same expression is already seen before, following expressions are
redundant.
+fn collapse_lex_req(input: LexOrderingReq) -> LexOrderingReq {
+ let mut output = vec![];
+ for item in input {
+ if !lex_req_contains(&output, &item) {
+ output.push(item);
+ }
+ }
+ output
+}
+
+/// Check whether `sort_req.expr` is among the expressions of `lex_req`.
+fn lex_req_contains(
+ lex_req: &[PhysicalSortRequirement],
+ sort_req: &PhysicalSortRequirement,
+) -> bool {
+ for constant in lex_req {
+ if constant.expr.eq(&sort_req.expr) {
+ return true;
+ }
+ }
+ false
+}
+
+/// This function simplifies lexicographical ordering requirement
+/// inside `input` by removing postfix lexicographical requirements
+/// that satisfy global ordering (occurs inside the ordering equivalent class)
+fn simplify_lex_req(
+ input: LexOrderingReq,
+ oeq_class: &OrderingEquivalentClass,
+) -> LexOrderingReq {
+ let mut section = &input[..];
+ loop {
+ let n_prune = prune_last_n_that_is_in_oeq(section, oeq_class);
+ // Cannot prune entries from the end of requirement
+ if n_prune == 0 {
+ break;
+ }
+ section = §ion[0..section.len() - n_prune];
+ }
+ if section.is_empty() {
+ PhysicalSortRequirement::from_sort_exprs(oeq_class.head())
+ } else {
+ section.to_vec()
+ }
+}
+
+/// Determines how many entries from the end can be deleted.
+/// Last n entry satisfies global ordering, hence having them
+/// as postfix in the lexicographical requirement is unnecessary.
+/// Assume requirement is [a ASC, b ASC, c ASC], also assume that
+/// existing ordering is [c ASC, d ASC]. In this case, since [c ASC]
+/// is satisfied by the existing ordering (e.g corresponding section is global
ordering),
+/// [c ASC] can be pruned from the requirement: [a ASC, b ASC, c ASC]. In this
case,
+/// this function will return 1, to indicate last element can be removed from
the requirement
+fn prune_last_n_that_is_in_oeq(
+ input: &[PhysicalSortRequirement],
+ oeq_class: &OrderingEquivalentClass,
+) -> usize {
+ let input_len = input.len();
+ for ordering in
std::iter::once(oeq_class.head()).chain(oeq_class.others().iter()) {
+ let mut search_range = std::cmp::min(ordering.len(), input_len);
+ while search_range > 0 {
+ let req_section = &input[input_len - search_range..];
+ // let given_section = &ordering[0..search_range];
+ if req_satisfied(ordering, req_section) {
+ return search_range;
+ } else {
+ search_range -= 1;
+ }
+ }
+ }
+ 0
+}
+
+/// Checks whether given section satisfies req.
+fn req_satisfied(given: LexOrderingRef, req: &[PhysicalSortRequirement]) ->
bool {
+ for (given, req) in izip!(given.iter(), req.iter()) {
+ let PhysicalSortRequirement { expr, options } = req;
+ if let Some(options) = options {
+ if options != &given.options || !expr.eq(&given.expr) {
+ return false;
+ }
+ } else if !expr.eq(&given.expr) {
+ return false;
+ }
+ }
+ true
+}
+
+/// This function searches for the slice `section` inside the slice `given`.
+/// It returns each range where `section` is compatible with the corresponding
+/// slice in `given`.
+fn get_compatible_ranges(
+ given: &[PhysicalSortRequirement],
+ section: &[PhysicalSortRequirement],
+) -> Vec<Range<usize>> {
+ let n_section = section.len();
+ let n_end = if given.len() >= n_section {
+ given.len() - n_section + 1
+ } else {
+ 0
+ };
+ (0..n_end)
+ .filter_map(|idx| {
+ let end = idx + n_section;
+ given[idx..end]
+ .iter()
+ .zip(section)
+ .all(|(req, given)| given.compatible(req))
+ .then_some(Range { start: idx, end })
+ })
+ .collect()
+}
+
+/// It is similar to contains method of vector.
+/// Finds whether `expr` is among `physical_exprs`.
+pub fn physical_exprs_contains(
+ physical_exprs: &[Arc<dyn PhysicalExpr>],
+ expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+ physical_exprs
+ .iter()
+ .any(|physical_expr| physical_expr.eq(expr))
+}
+
+/// Remove ordering requirements that have constant value
+fn prune_sort_reqs_with_constants(
+ ordering: &[PhysicalSortRequirement],
+ constants: &[Arc<dyn PhysicalExpr>],
+) -> Vec<PhysicalSortRequirement> {
+ ordering
+ .iter()
+ .filter(|&order| !physical_exprs_contains(constants, &order.expr))
+ .cloned()
+ .collect()
+}
+
+/// Adds the `offset` value to `Column` indices inside `expr`. This function is
+/// generally used during the update of the right table schema in join
operations.
+pub(crate) fn add_offset_to_expr(
+ expr: Arc<dyn PhysicalExpr>,
+ offset: usize,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+ Some(col) => Ok(Transformed::Yes(Arc::new(Column::new(
+ col.name(),
+ offset + col.index(),
+ )))),
+ None => Ok(Transformed::No(e)),
+ })
+}
+
+/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`.
+pub(crate) fn add_offset_to_sort_expr(
+ sort_expr: &PhysicalSortExpr,
+ offset: usize,
+) -> Result<PhysicalSortExpr> {
+ Ok(PhysicalSortExpr {
+ expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?,
+ options: sort_expr.options,
+ })
+}
+
+/// Adds the `offset` value to `Column` indices for each `sort_expr.expr`
+/// inside `sort_exprs`.
+pub fn add_offset_to_lex_ordering(
+ sort_exprs: LexOrderingRef,
+ offset: usize,
+) -> Result<LexOrdering> {
+ sort_exprs
+ .iter()
+ .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset))
+ .collect()
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -523,8 +978,20 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
+ use arrow_schema::SortOptions;
use std::sync::Arc;
+ fn convert_to_requirement(
+ in_data: &[(&Column, Option<SortOptions>)],
+ ) -> Vec<PhysicalSortRequirement> {
+ in_data
+ .iter()
+ .map(|(col, options)| {
+ PhysicalSortRequirement::new(Arc::new((*col).clone()) as _,
*options)
+ })
+ .collect::<Vec<_>>()
+ }
+
#[test]
fn add_equal_conditions_test() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
@@ -615,4 +1082,53 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_collapse_vec() -> Result<()> {
+ assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]);
+ assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]);
+ assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_compatible_ranges() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let col_b = &Column::new("b", 1);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let test_data = vec![
+ (
+ vec![(col_a, Some(option1)), (col_b, Some(option1))],
+ vec![(col_a, Some(option1))],
+ vec![(0, 1)],
+ ),
+ (
+ vec![(col_a, None), (col_b, Some(option1))],
+ vec![(col_a, Some(option1))],
+ vec![(0, 1)],
+ ),
+ (
+ vec![
+ (col_a, None),
+ (col_b, Some(option1)),
+ (col_a, Some(option1)),
+ ],
+ vec![(col_a, Some(option1))],
+ vec![(0, 1), (2, 3)],
+ ),
+ ];
+ for (searched, to_search, expected) in test_data {
+ let searched = convert_to_requirement(&searched);
+ let to_search = convert_to_requirement(&to_search);
+ let expected = expected
+ .into_iter()
+ .map(|(start, end)| Range { start, end })
+ .collect::<Vec<_>>();
+ assert_eq!(get_compatible_ranges(&searched, &to_search), expected);
+ }
+ Ok(())
+ }
}
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index 85081c24c3..e83dee2e6c 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -55,9 +55,10 @@ pub use aggregate::groups_accumulator::{
pub use aggregate::AggregateExpr;
pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
pub use equivalence::{
- ordering_equivalence_properties_helper, project_equivalence_properties,
- project_ordering_equivalence_properties, EquivalenceProperties,
EquivalentClass,
- OrderingEquivalenceProperties, OrderingEquivalentClass,
+ add_offset_to_lex_ordering, ordering_equivalence_properties_helper,
+ project_equivalence_properties, project_ordering_equivalence_properties,
+ EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
+ OrderingEquivalentClass,
};
pub use partitioning::{Distribution, Partitioning};
@@ -70,7 +71,6 @@ pub use sort_expr::{
};
pub use sort_properties::update_ordering;
pub use utils::{
- expr_list_eq_any_order, expr_list_eq_strict_order, find_orderings_of_exprs,
- normalize_expr_with_equivalence_properties,
normalize_ordering_equivalence_classes,
+ expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction,
};
diff --git a/datafusion/physical-expr/src/partitioning.rs
b/datafusion/physical-expr/src/partitioning.rs
index 76567c8050..773eac40dc 100644
--- a/datafusion/physical-expr/src/partitioning.rs
+++ b/datafusion/physical-expr/src/partitioning.rs
@@ -20,10 +20,7 @@
use std::fmt;
use std::sync::Arc;
-use crate::{
- expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
- EquivalenceProperties, PhysicalExpr,
-};
+use crate::{expr_list_eq_strict_order, EquivalenceProperties, PhysicalExpr};
/// Partitioning schemes supported by operators.
#[derive(Debug, Clone)]
@@ -90,21 +87,11 @@ impl Partitioning {
if !eq_classes.is_empty() {
let normalized_required_exprs = required_exprs
.iter()
- .map(|e| {
-
normalize_expr_with_equivalence_properties(
- e.clone(),
- eq_classes,
- )
- })
+ .map(|e|
eq_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
let normalized_partition_exprs =
partition_exprs
.iter()
- .map(|e| {
-
normalize_expr_with_equivalence_properties(
- e.clone(),
- eq_classes,
- )
- })
+ .map(|e|
eq_properties.normalize_expr(e.clone()))
.collect::<Vec<_>>();
expr_list_eq_strict_order(
&normalized_required_exprs,
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index 2d3a395728..b2a6bb5ca6 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,21 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::borrow::Borrow;
-use std::collections::{HashMap, HashSet};
-use std::ops::Range;
-use std::sync::Arc;
-
-use crate::equivalence::{
- EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
- OrderingEquivalentClass,
-};
+use crate::equivalence::{EquivalenceProperties, OrderingEquivalenceProperties};
use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
use crate::sort_properties::{ExprOrdering, SortProperties};
use crate::update_ordering;
-use crate::{
- LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
-};
+use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
use arrow::array::{make_array, Array, ArrayRef, BooleanArray,
MutableArrayData};
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
@@ -42,8 +32,13 @@ use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_common::Result;
use datafusion_expr::Operator;
+use itertools::Itertools;
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;
+use std::borrow::Borrow;
+use std::collections::HashMap;
+use std::collections::HashSet;
+use std::sync::Arc;
/// Compare the two expr lists are equal no matter the order.
/// For example two InListExpr can be considered to be equals no matter the
order:
@@ -135,109 +130,6 @@ pub fn normalize_out_expr_with_columns_map(
.unwrap_or(expr)
}
-pub fn normalize_expr_with_equivalence_properties(
- expr: Arc<dyn PhysicalExpr>,
- eq_properties: &[EquivalentClass],
-) -> Arc<dyn PhysicalExpr> {
- expr.clone()
- .transform(&|expr| {
- let normalized_form =
- expr.as_any().downcast_ref::<Column>().and_then(|column| {
- for class in eq_properties {
- if class.contains(column) {
- return Some(Arc::new(class.head().clone()) as _);
- }
- }
- None
- });
- Ok(if let Some(normalized_form) = normalized_form {
- Transformed::Yes(normalized_form)
- } else {
- Transformed::No(expr)
- })
- })
- .unwrap_or(expr)
-}
-
-/// This function normalizes `sort_expr` according to `eq_properties`. If the
-/// given sort expression doesn't belong to equivalence set `eq_properties`,
-/// it returns `sort_expr` as is.
-fn normalize_sort_expr_with_equivalence_properties(
- mut sort_expr: PhysicalSortExpr,
- eq_properties: &[EquivalentClass],
-) -> PhysicalSortExpr {
- sort_expr.expr =
- normalize_expr_with_equivalence_properties(sort_expr.expr,
eq_properties);
- sort_expr
-}
-
-/// This function applies the
[`normalize_sort_expr_with_equivalence_properties`]
-/// function for all sort expressions in `sort_exprs` and returns a vector of
-/// normalized sort expressions.
-pub fn normalize_sort_exprs_with_equivalence_properties(
- sort_exprs: LexOrderingRef,
- eq_properties: &EquivalenceProperties,
-) -> LexOrdering {
- sort_exprs
- .iter()
- .map(|expr| {
- normalize_sort_expr_with_equivalence_properties(
- expr.clone(),
- eq_properties.classes(),
- )
- })
- .collect()
-}
-
-/// This function normalizes `sort_requirement` according to `eq_properties`.
-/// If the given sort requirement doesn't belong to equivalence set
-/// `eq_properties`, it returns `sort_requirement` as is.
-fn normalize_sort_requirement_with_equivalence_properties(
- mut sort_requirement: PhysicalSortRequirement,
- eq_properties: &[EquivalentClass],
-) -> PhysicalSortRequirement {
- sort_requirement.expr =
- normalize_expr_with_equivalence_properties(sort_requirement.expr,
eq_properties);
- sort_requirement
-}
-
-/// This function searches for the slice `section` inside the slice `given`.
-/// It returns each range where `section` is compatible with the corresponding
-/// slice in `given`.
-fn get_compatible_ranges(
- given: &[PhysicalSortRequirement],
- section: &[PhysicalSortRequirement],
-) -> Vec<Range<usize>> {
- let n_section = section.len();
- let n_end = if given.len() >= n_section {
- given.len() - n_section + 1
- } else {
- 0
- };
- (0..n_end)
- .filter_map(|idx| {
- let end = idx + n_section;
- given[idx..end]
- .iter()
- .zip(section)
- .all(|(req, given)| given.compatible(req))
- .then_some(Range { start: idx, end })
- })
- .collect()
-}
-
-/// This function constructs a duplicate-free vector by filtering out duplicate
-/// entries inside the given vector `input`.
-fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
- let mut output = vec![];
- for item in input {
- if !output.contains(&item) {
- output.push(item);
- }
- }
- output
-}
-
/// Transform `sort_exprs` vector, to standardized version using
`eq_properties` and `ordering_eq_properties`
/// Assume `eq_properties` states that `Column a` and `Column b` are aliases.
/// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]`
and `vec![a ASC, c ASC]` are
@@ -246,10 +138,10 @@ fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
/// This function converts `sort_exprs` `vec![b ASC, c ASC]` to first `vec![a
ASC, c ASC]` after considering `eq_properties`
/// Then converts `vec![a ASC, c ASC]` to `vec![d ASC]` after considering
`ordering_eq_properties`.
/// Standardized version `vec![d ASC]` is used in subsequent operations.
-pub fn normalize_sort_exprs(
+fn normalize_sort_exprs(
sort_exprs: &[PhysicalSortExpr],
- eq_properties: &[EquivalentClass],
- ordering_eq_properties: &[OrderingEquivalentClass],
+ eq_properties: &EquivalenceProperties,
+ ordering_eq_properties: &OrderingEquivalenceProperties,
) -> Vec<PhysicalSortExpr> {
let sort_requirements =
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
let normalized_exprs = normalize_sort_requirements(
@@ -257,35 +149,7 @@ pub fn normalize_sort_exprs(
eq_properties,
ordering_eq_properties,
);
- let normalized_exprs =
PhysicalSortRequirement::to_sort_exprs(normalized_exprs);
- collapse_vec(normalized_exprs)
-}
-/// This function normalizes `oeq_classes` expressions according to
`eq_properties`.
-/// More explicitly, it makes sure that expressions in `oeq_classes` are head
entries
-/// in `eq_properties`, replacing any non-head entries with head entries if
necessary.
-pub fn normalize_ordering_equivalence_classes(
- oeq_classes: &[OrderingEquivalentClass],
- eq_properties: &EquivalenceProperties,
-) -> Vec<OrderingEquivalentClass> {
- oeq_classes
- .iter()
- .map(|class| {
- let head = normalize_sort_exprs_with_equivalence_properties(
- class.head(),
- eq_properties,
- );
-
- let others = class
- .others()
- .iter()
- .map(|other| {
- normalize_sort_exprs_with_equivalence_properties(other,
eq_properties)
- })
- .collect();
-
- EquivalentClass::new(head, others)
- })
- .collect()
+ PhysicalSortRequirement::to_sort_exprs(normalized_exprs)
}
/// Transform `sort_reqs` vector, to standardized version using
`eq_properties` and `ordering_eq_properties`
@@ -296,53 +160,13 @@ pub fn normalize_ordering_equivalence_classes(
/// This function converts `sort_exprs` `vec![b Some(ASC), c None]` to first
`vec![a Some(ASC), c None]` after considering `eq_properties`
/// Then converts `vec![a Some(ASC), c None]` to `vec![d Some(ASC)]` after
considering `ordering_eq_properties`.
/// Standardized version `vec![d Some(ASC)]` is used in subsequent operations.
-pub fn normalize_sort_requirements(
+fn normalize_sort_requirements(
sort_reqs: &[PhysicalSortRequirement],
- eq_properties: &[EquivalentClass],
- ordering_eq_properties: &[OrderingEquivalentClass],
+ eq_properties: &EquivalenceProperties,
+ ordering_eq_properties: &OrderingEquivalenceProperties,
) -> Vec<PhysicalSortRequirement> {
- let mut normalized_exprs = sort_reqs
- .iter()
- .map(|sort_req| {
- normalize_sort_requirement_with_equivalence_properties(
- sort_req.clone(),
- eq_properties,
- )
- })
- .collect::<Vec<_>>();
- for ordering_eq_class in ordering_eq_properties {
- for item in ordering_eq_class.others() {
- let item = item
- .clone()
- .into_iter()
- .map(|elem| elem.into())
- .collect::<Vec<_>>();
- let ranges = get_compatible_ranges(&normalized_exprs, &item);
- let mut offset: i64 = 0;
- for Range { start, end } in ranges {
- let mut head = ordering_eq_class
- .head()
- .clone()
- .into_iter()
- .map(|elem| elem.into())
- .collect::<Vec<PhysicalSortRequirement>>();
- let updated_start = (start as i64 + offset) as usize;
- let updated_end = (end as i64 + offset) as usize;
- let range = end - start;
- offset += head.len() as i64 - range as i64;
- let all_none = normalized_exprs[updated_start..updated_end]
- .iter()
- .all(|req| req.options.is_none());
- if all_none {
- for req in head.iter_mut() {
- req.options = None;
- }
- }
- normalized_exprs.splice(updated_start..updated_end, head);
- }
- }
- }
- collapse_vec(normalized_exprs)
+ let normalized_sort_reqs =
eq_properties.normalize_sort_requirements(sort_reqs);
+ ordering_eq_properties.normalize_sort_requirements(&normalized_sort_reqs)
}
/// Checks whether given ordering requirements are satisfied by provided
[PhysicalSortExpr]s.
@@ -379,13 +203,11 @@ pub fn ordering_satisfy_concrete<
ordering_equal_properties: F2,
) -> bool {
let oeq_properties = ordering_equal_properties();
- let ordering_eq_classes = oeq_properties.classes();
let eq_properties = equal_properties();
- let eq_classes = eq_properties.classes();
let required_normalized =
- normalize_sort_exprs(required, eq_classes, ordering_eq_classes);
+ normalize_sort_exprs(required, &eq_properties, &oeq_properties);
let provided_normalized =
- normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
+ normalize_sort_exprs(provided, &eq_properties, &oeq_properties);
if required_normalized.len() > provided_normalized.len() {
return false;
}
@@ -430,13 +252,11 @@ pub fn ordering_satisfy_requirement_concrete<
ordering_equal_properties: F2,
) -> bool {
let oeq_properties = ordering_equal_properties();
- let ordering_eq_classes = oeq_properties.classes();
let eq_properties = equal_properties();
- let eq_classes = eq_properties.classes();
let required_normalized =
- normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+ normalize_sort_requirements(required, &eq_properties, &oeq_properties);
let provided_normalized =
- normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
+ normalize_sort_exprs(provided, &eq_properties, &oeq_properties);
if required_normalized.len() > provided_normalized.len() {
return false;
}
@@ -481,14 +301,12 @@ fn requirements_compatible_concrete<
equal_properties: F2,
) -> bool {
let oeq_properties = ordering_equal_properties();
- let ordering_eq_classes = oeq_properties.classes();
let eq_properties = equal_properties();
- let eq_classes = eq_properties.classes();
let required_normalized =
- normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+ normalize_sort_requirements(required, &eq_properties, &oeq_properties);
let provided_normalized =
- normalize_sort_requirements(provided, eq_classes, ordering_eq_classes);
+ normalize_sort_requirements(provided, &eq_properties, &oeq_properties);
if required_normalized.len() > provided_normalized.len() {
return false;
}
@@ -542,26 +360,15 @@ pub fn convert_to_expr<T: Borrow<PhysicalSortExpr>>(
/// This function finds the indices of `targets` within `items`, taking into
/// account equivalences according to `equal_properties`.
-pub fn get_indices_of_matching_exprs<
- T: Borrow<Arc<dyn PhysicalExpr>>,
- F: FnOnce() -> EquivalenceProperties,
->(
- targets: impl IntoIterator<Item = T>,
+pub fn get_indices_of_matching_exprs<F: FnOnce() -> EquivalenceProperties>(
+ targets: &[Arc<dyn PhysicalExpr>],
items: &[Arc<dyn PhysicalExpr>],
equal_properties: F,
) -> Vec<usize> {
- if let eq_classes @ [_, ..] = equal_properties().classes() {
- let normalized_targets = targets.into_iter().map(|e| {
- normalize_expr_with_equivalence_properties(e.borrow().clone(),
eq_classes)
- });
- let normalized_items = items
- .iter()
- .map(|e| normalize_expr_with_equivalence_properties(e.clone(),
eq_classes))
- .collect::<Vec<_>>();
- get_indices_of_exprs_strict(normalized_targets, &normalized_items)
- } else {
- get_indices_of_exprs_strict(targets, items)
- }
+ let eq_properties = equal_properties();
+ let normalized_items = eq_properties.normalize_exprs(items);
+ let normalized_targets = eq_properties.normalize_exprs(targets);
+ get_indices_of_exprs_strict(normalized_targets, &normalized_items)
}
/// This function finds the indices of `targets` within `items` using strict
@@ -870,13 +677,13 @@ pub fn get_indices_of_matching_sort_exprs_with_order_eq(
let normalized_required = normalize_sort_requirements(
&sort_requirement_on_requirements,
- eq_properties.classes(),
- &[],
+ eq_properties,
+ &OrderingEquivalenceProperties::new(order_eq_properties.schema()),
);
let normalized_provided = normalize_sort_requirements(
&PhysicalSortRequirement::from_sort_exprs(provided_sorts.iter()),
- eq_properties.classes(),
- &[],
+ eq_properties,
+ &OrderingEquivalenceProperties::new(order_eq_properties.schema()),
);
let provided_sorts = normalized_provided
@@ -902,9 +709,9 @@ pub fn get_indices_of_matching_sort_exprs_with_order_eq(
}
// We did not find all the expressions, consult ordering equivalence
properties:
- for class in order_eq_properties.classes() {
- let head = class.head();
- for ordering in class.others().iter().chain(std::iter::once(head)) {
+ if let Some(oeq_class) = order_eq_properties.oeq_class() {
+ let head = oeq_class.head();
+ for ordering in oeq_class.others().iter().chain(std::iter::once(head))
{
let order_eq_class_exprs = convert_to_expr(ordering);
if let Some(indices_of_equality) =
get_lexicographical_match_indices(
&normalized_required_expr,
@@ -981,6 +788,18 @@ pub fn find_orderings_of_exprs(
Ok(orderings)
}
+/// Merge left and right sort expressions, checking for duplicates.
+pub fn merge_vectors(
+ left: &[PhysicalSortExpr],
+ right: &[PhysicalSortExpr],
+) -> Vec<PhysicalSortExpr> {
+ left.iter()
+ .cloned()
+ .chain(right.iter().cloned())
+ .unique()
+ .collect()
+}
+
#[cfg(test)]
mod tests {
use std::fmt::{Display, Formatter};
@@ -990,7 +809,7 @@ mod tests {
use super::*;
use crate::equivalence::OrderingEquivalenceProperties;
use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
- use crate::PhysicalSortExpr;
+ use crate::{OrderingEquivalentClass, PhysicalSortExpr};
use arrow::compute::SortOptions;
use arrow_array::Int32Array;
@@ -1046,7 +865,8 @@ mod tests {
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, true);
let e = Field::new("e", DataType::Int32, true);
- let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+ let f = Field::new("f", DataType::Int32, true);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
Ok(schema)
}
@@ -1057,13 +877,15 @@ mod tests {
OrderingEquivalenceProperties,
)> {
// Assume schema satisfies ordering a ASC NULLS LAST
- // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, b
ASC NULLS LAST
+ // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, f
ASC NULLS LAST, g ASC NULLS LAST
// Assume that column a and c are aliases.
let col_a = &Column::new("a", 0);
let col_b = &Column::new("b", 1);
let col_c = &Column::new("c", 2);
let col_d = &Column::new("d", 3);
let col_e = &Column::new("e", 4);
+ let col_f = &Column::new("f", 5);
+ let col_g = &Column::new("g", 6);
let option1 = SortOptions {
descending: false,
nulls_first: false,
@@ -1104,7 +926,11 @@ mod tests {
options: option2,
},
PhysicalSortExpr {
- expr: Arc::new(col_b.clone()),
+ expr: Arc::new(col_f.clone()),
+ options: option1,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(col_g.clone()),
options: option1,
},
],
@@ -1312,6 +1138,8 @@ mod tests {
let col_c = &Column::new("c", 2);
let col_d = &Column::new("d", 3);
let col_e = &Column::new("e", 4);
+ let col_f = &Column::new("f", 5);
+ let col_g = &Column::new("g", 6);
let option1 = SortOptions {
descending: false,
nulls_first: false,
@@ -1342,10 +1170,16 @@ mod tests {
(vec![(col_c, option1)], true),
(vec![(col_c, option2)], false),
// Test whether ordering equivalence works as expected
- (vec![(col_d, option1)], false),
+ (vec![(col_d, option1)], true),
(vec![(col_d, option1), (col_b, option1)], true),
(vec![(col_d, option2), (col_b, option1)], false),
- (vec![(col_e, option2), (col_b, option1)], true),
+ (
+ vec![(col_e, option2), (col_f, option1), (col_g, option1)],
+ true,
+ ),
+ (vec![(col_e, option2), (col_f, option1)], true),
+ (vec![(col_e, option1), (col_f, option1)], false),
+ (vec![(col_e, option2), (col_b, option1)], false),
(vec![(col_e, option1), (col_b, option1)], false),
(
vec![
@@ -1356,6 +1190,15 @@ mod tests {
],
true,
),
+ (
+ vec![
+ (col_d, option1),
+ (col_b, option1),
+ (col_e, option2),
+ (col_f, option1),
+ ],
+ true,
+ ),
(
vec![
(col_d, option1),
@@ -1372,6 +1215,15 @@ mod tests {
(col_d, option2),
(col_b, option1),
],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option1),
+ (col_b, option1),
+ (col_e, option1),
+ (col_f, option1),
+ ],
false,
),
(
@@ -1383,7 +1235,9 @@ mod tests {
],
false,
),
+ (vec![(col_d, option1), (col_e, option2)], true),
];
+
for (cols, expected) in requirements {
let err_msg = format!("Error in test case:{cols:?}");
let required = cols
@@ -1427,6 +1281,7 @@ mod tests {
let col_c = &Column::new("c", 2);
let col_d = &Column::new("d", 3);
let col_e = &Column::new("e", 4);
+ let col_f = &Column::new("f", 5);
let option1 = SortOptions {
descending: false,
nulls_first: false,
@@ -1438,36 +1293,46 @@ mod tests {
// First element in the tuple stores vector of requirement, second
element is the expected return value for ordering_satisfy function
let requirements = vec![
(vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
- (vec![(col_a, None)], vec![(col_a, None)]),
+ (vec![(col_a, Some(option2))], vec![(col_a, Some(option2))]),
+ (vec![(col_a, None)], vec![(col_a, Some(option1))]),
// Test whether equivalence works as expected
(vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
- (vec![(col_c, None)], vec![(col_a, None)]),
+ (vec![(col_c, None)], vec![(col_a, Some(option1))]),
// Test whether ordering equivalence works as expected
(
vec![(col_d, Some(option1)), (col_b, Some(option1))],
vec![(col_a, Some(option1))],
),
- (vec![(col_d, None), (col_b, None)], vec![(col_a, None)]),
(
- vec![(col_e, Some(option2)), (col_b, Some(option1))],
+ vec![(col_d, None), (col_b, None)],
+ vec![(col_a, Some(option1))],
+ ),
+ (
+ vec![(col_e, Some(option2)), (col_f, Some(option1))],
vec![(col_a, Some(option1))],
),
// We should be able to normalize in compatible requirements also
(not exactly equal)
(
- vec![(col_e, Some(option2)), (col_b, None)],
+ vec![(col_e, Some(option2)), (col_f, None)],
+ vec![(col_a, Some(option1))],
+ ),
+ (
+ vec![(col_e, None), (col_f, None)],
vec![(col_a, Some(option1))],
),
- (vec![(col_e, None), (col_b, None)], vec![(col_a, None)]),
];
+
let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
- let eq_classes = eq_properties.classes();
- let ordering_eq_classes = ordering_eq_properties.classes();
for (reqs, expected_normalized) in requirements.into_iter() {
let req = convert_to_requirement(&reqs);
let expected_normalized =
convert_to_requirement(&expected_normalized);
assert_eq!(
- normalize_sort_requirements(&req, eq_classes,
ordering_eq_classes),
+ normalize_sort_requirements(
+ &req,
+ &eq_properties,
+ &ordering_eq_properties,
+ ),
expected_normalized
);
}
@@ -1536,10 +1401,7 @@ mod tests {
];
for (expr, expected_eq) in expressions {
assert!(
- expected_eq.eq(&normalize_expr_with_equivalence_properties(
- expr.clone(),
- eq_properties.classes()
- )),
+ expected_eq.eq(&eq_properties.normalize_expr(expr.clone())),
"error in test: expr: {expr:?}"
);
}
@@ -1583,10 +1445,7 @@ mod tests {
sort_options,
);
assert!(
-
expected.eq(&normalize_sort_requirement_with_equivalence_properties(
- arg.clone(),
- eq_properties.classes()
- )),
+
expected.eq(&eq_properties.normalize_sort_requirement(arg.clone())),
"error in test: expr: {expr:?}, sort_options: {sort_options:?}"
);
}
@@ -1685,55 +1544,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_get_compatible_ranges() -> Result<()> {
- let col_a = &Column::new("a", 0);
- let col_b = &Column::new("b", 1);
- let option1 = SortOptions {
- descending: false,
- nulls_first: false,
- };
- let test_data = vec![
- (
- vec![(col_a, Some(option1)), (col_b, Some(option1))],
- vec![(col_a, Some(option1))],
- vec![(0, 1)],
- ),
- (
- vec![(col_a, None), (col_b, Some(option1))],
- vec![(col_a, Some(option1))],
- vec![(0, 1)],
- ),
- (
- vec![
- (col_a, None),
- (col_b, Some(option1)),
- (col_a, Some(option1)),
- ],
- vec![(col_a, Some(option1))],
- vec![(0, 1), (2, 3)],
- ),
- ];
- for (searched, to_search, expected) in test_data {
- let searched = convert_to_requirement(&searched);
- let to_search = convert_to_requirement(&to_search);
- let expected = expected
- .into_iter()
- .map(|(start, end)| Range { start, end })
- .collect::<Vec<_>>();
- assert_eq!(get_compatible_ranges(&searched, &to_search), expected);
- }
- Ok(())
- }
-
- #[test]
- fn test_collapse_vec() -> Result<()> {
- assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]);
- assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]);
- assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
- Ok(())
- }
-
#[test]
fn test_collect_columns() -> Result<()> {
let expr1 = Arc::new(Column::new("col1", 2)) as _;
@@ -1940,22 +1750,20 @@ mod tests {
Field::new("c", DataType::Int32, true),
]);
let mut equal_properties =
EquivalenceProperties::new(Arc::new(schema.clone()));
- let mut ordering_equal_properties =
- OrderingEquivalenceProperties::new(Arc::new(schema.clone()));
let mut expected_oeq =
OrderingEquivalenceProperties::new(Arc::new(schema));
equal_properties
.add_equal_conditions((&Column::new("a", 0), &Column::new("c",
2)));
- ordering_equal_properties.add_equal_conditions((
- &vec![PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
- options: sort_options,
- }],
- &vec![PhysicalSortExpr {
- expr: Arc::new(Column::new("c", 2)),
- options: sort_options,
- }],
- ));
+ let head = vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: sort_options,
+ }];
+ let others = vec![vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: sort_options,
+ }]];
+ let oeq_class = OrderingEquivalentClass::new(head, others);
+
expected_oeq.add_equal_conditions((
&vec![PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
@@ -1967,13 +1775,13 @@ mod tests {
}],
));
- assert!(!normalize_ordering_equivalence_classes(
- ordering_equal_properties.classes(),
- &equal_properties,
- )
- .iter()
- .zip(expected_oeq.classes())
- .any(|(a, b)| a.head().ne(b.head()) || a.others().ne(b.others())));
+ let normalized_oeq_class =
+ oeq_class.normalize_with_equivalence_properties(&equal_properties);
+ let expected = expected_oeq.oeq_class().unwrap();
+ assert!(
+ normalized_oeq_class.head().eq(expected.head())
+ && normalized_oeq_class.others().eq(expected.others())
+ );
Ok(())
}
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 15208fd082..4a8b189144 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -41,12 +41,13 @@ use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
-use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ExprBoundaries,
OrderingEquivalenceProperties, PhysicalExpr,
};
+use datafusion_physical_expr::intervals::utils::check_support;
+use datafusion_physical_expr::utils::collect_columns;
use futures::stream::{Stream, StreamExt};
use log::trace;
@@ -153,7 +154,19 @@ impl ExecutionPlan for FilterExec {
}
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
- self.input.ordering_equivalence_properties()
+ let stats = self.statistics();
+ // Add the columns that have only one value (singleton) after
filtering to constants.
+ if let Some(col_stats) = stats.column_statistics {
+ let constants = collect_columns(self.predicate())
+ .into_iter()
+ .filter(|column| col_stats[column.index()].is_singleton())
+ .map(|column| Arc::new(column) as Arc<dyn PhysicalExpr>)
+ .collect::<Vec<_>>();
+ let filter_oeq = self.input.ordering_equivalence_properties();
+ filter_oeq.with_constants(constants)
+ } else {
+ self.input.ordering_equivalence_properties()
+ }
}
fn with_new_children(
@@ -197,7 +210,14 @@ impl ExecutionPlan for FilterExec {
let input_stats = self.input.statistics();
let input_column_stats = match input_stats.column_statistics {
Some(stats) => stats,
- None => return Statistics::default(),
+ None => self
+ .schema()
+ .fields
+ .iter()
+ .map(|field| {
+
ColumnStatistics::new_with_unbounded_column(field.data_type())
+ })
+ .collect::<Vec<_>>(),
};
let starter_ctx =
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index e33de001df..67f60e57d7 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -44,17 +44,15 @@ use datafusion_common::{
exec_err, plan_err, DataFusionError, JoinType, Result, ScalarValue,
SharedResult,
};
use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::utils::{
- normalize_ordering_equivalence_classes, normalize_sort_exprs,
-};
use datafusion_physical_expr::{
- EquivalentClass, LexOrdering, LexOrderingRef,
OrderingEquivalenceProperties,
- OrderingEquivalentClass, PhysicalExpr, PhysicalSortExpr,
+ add_offset_to_lex_ordering, EquivalentClass, LexOrdering, LexOrderingRef,
+ OrderingEquivalenceProperties, OrderingEquivalentClass, PhysicalExpr,
+ PhysicalSortExpr,
};
+use datafusion_physical_expr::utils::merge_vectors;
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
-use itertools::Itertools;
use parking_lot::Mutex;
/// The on clause of the join, as vector of (left, right) columns.
@@ -324,66 +322,24 @@ pub fn cross_join_equivalence_properties(
///
/// This way; once we normalize an expression according to equivalence
properties,
/// it can thereafter safely be used for ordering equivalence normalization.
-fn get_updated_right_ordering_equivalence_properties(
+fn get_updated_right_ordering_equivalent_class(
join_type: &JoinType,
- right_oeq_classes: &[OrderingEquivalentClass],
+ right_oeq_class: &OrderingEquivalentClass,
left_columns_len: usize,
join_eq_properties: &EquivalenceProperties,
-) -> Result<Vec<OrderingEquivalentClass>> {
- let updated_oeqs = match join_type {
+) -> Result<OrderingEquivalentClass> {
+ match join_type {
// In these modes, indices of the right schema should be offset by
// the left table size.
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right =>
{
- add_offset_to_ordering_equivalence_classes(
- right_oeq_classes,
- left_columns_len,
- )?
+ let right_oeq_class =
right_oeq_class.add_offset(left_columns_len)?;
+ return Ok(
+
right_oeq_class.normalize_with_equivalence_properties(join_eq_properties)
+ );
}
- _ => right_oeq_classes.to_vec(),
+ _ => {}
};
-
- Ok(normalize_ordering_equivalence_classes(
- &updated_oeqs,
- join_eq_properties,
- ))
-}
-
-/// Merge left and right sort expressions, checking for duplicates.
-fn merge_vectors(
- left: &[PhysicalSortExpr],
- right: &[PhysicalSortExpr],
-) -> Vec<PhysicalSortExpr> {
- left.iter()
- .cloned()
- .chain(right.iter().cloned())
- .unique()
- .collect()
-}
-
-/// Prefix with existing ordering.
-fn prefix_ordering_equivalence_with_existing_ordering(
- existing_ordering: &[PhysicalSortExpr],
- oeq_classes: &[OrderingEquivalentClass],
- eq_classes: &[EquivalentClass],
-) -> Vec<OrderingEquivalentClass> {
- let existing_ordering = normalize_sort_exprs(existing_ordering,
eq_classes, &[]);
- oeq_classes
- .iter()
- .map(|oeq_class| {
- let normalized_head = normalize_sort_exprs(oeq_class.head(),
eq_classes, &[]);
- let updated_head = merge_vectors(&existing_ordering,
&normalized_head);
- let updated_others = oeq_class
- .others()
- .iter()
- .map(|ordering| {
- let normalized_ordering =
- normalize_sort_exprs(ordering, eq_classes, &[]);
- merge_vectors(&existing_ordering, &normalized_ordering)
- })
- .collect();
- OrderingEquivalentClass::new(updated_head, updated_others)
- })
- .collect()
+
Ok(right_oeq_class.normalize_with_equivalence_properties(join_eq_properties))
}
/// Calculate ordering equivalence properties for the given join operation.
@@ -411,20 +367,29 @@ pub fn combine_join_ordering_equivalence_properties(
))
}
(true, false) => {
-
new_properties.extend(left_oeq_properties.classes().iter().cloned());
+ new_properties.extend(left_oeq_properties.oeq_class().cloned());
// In this special case, right side ordering can be prefixed with
left side ordering.
- if probe_side == Some(JoinSide::Left)
- && right.output_ordering().is_some()
- && *join_type == JoinType::Inner
- {
- let right_oeq_classes =
- get_updated_right_ordering_equivalence_properties(
- join_type,
- right_oeq_properties.classes(),
- left_columns_len,
- &join_eq_properties,
- )?;
+ if let (
+ Some(JoinSide::Left),
+ // right side have an ordering
+ Some(_),
+ JoinType::Inner,
+ Some(oeq_class),
+ ) = (
+ probe_side,
+ right.output_ordering(),
+ join_type,
+ right_oeq_properties.oeq_class(),
+ ) {
let left_output_ordering =
left.output_ordering().unwrap_or(&[]);
+
+ let updated_right_oeq =
get_updated_right_ordering_equivalent_class(
+ join_type,
+ oeq_class,
+ left_columns_len,
+ &join_eq_properties,
+ )?;
+
// Right side ordering equivalence properties should be
prepended with
// those of the left side while constructing output ordering
equivalence
// properties since stream side is the left side.
@@ -433,32 +398,44 @@ pub fn combine_join_ordering_equivalence_properties(
// ordering of the left table is `a ASC`, then the ordering
equivalence `b ASC`
// for the right table should be converted to `a ASC, b ASC`
before it is added
// to the ordering equivalences of the join.
- let updated_right_oeq_classes =
- prefix_ordering_equivalence_with_existing_ordering(
+ let updated_right_oeq_class = updated_right_oeq
+ .prefix_ordering_equivalent_class_with_existing_ordering(
left_output_ordering,
- &right_oeq_classes,
- join_eq_properties.classes(),
+ &join_eq_properties,
);
- new_properties.extend(updated_right_oeq_classes);
+ new_properties.extend(Some(updated_right_oeq_class));
}
}
(false, true) => {
- let right_oeq_classes =
get_updated_right_ordering_equivalence_properties(
- join_type,
- right_oeq_properties.classes(),
- left_columns_len,
- &join_eq_properties,
- )?;
- new_properties.extend(right_oeq_classes);
+ let updated_right_oeq = right_oeq_properties
+ .oeq_class()
+ .map(|right_oeq_class| {
+ get_updated_right_ordering_equivalent_class(
+ join_type,
+ right_oeq_class,
+ left_columns_len,
+ &join_eq_properties,
+ )
+ })
+ .transpose()?;
+ new_properties.extend(updated_right_oeq);
// In this special case, left side ordering can be prefixed with
right side ordering.
- if probe_side == Some(JoinSide::Right)
- && left.output_ordering().is_some()
- && *join_type == JoinType::Inner
- {
- let left_oeq_classes = left_oeq_properties.classes();
+ if let (
+ Some(JoinSide::Right),
+ // left side have an ordering
+ Some(_),
+ JoinType::Inner,
+ Some(left_oeq_class),
+ ) = (
+ probe_side,
+ left.output_ordering(),
+ join_type,
+ left_oeq_properties.oeq_class(),
+ ) {
let right_output_ordering =
right.output_ordering().unwrap_or(&[]);
let right_output_ordering =
add_offset_to_lex_ordering(right_output_ordering,
left_columns_len)?;
+
// Left side ordering equivalence properties should be
prepended with
// those of the right side while constructing output ordering
equivalence
// properties since stream side is the right side.
@@ -467,13 +444,12 @@ pub fn combine_join_ordering_equivalence_properties(
// ordering of the left table is `a ASC`, then the ordering
equivalence `b ASC`
// for the right table should be converted to `a ASC, b ASC`
before it is added
// to the ordering equivalences of the join.
- let updated_left_oeq_classes =
- prefix_ordering_equivalence_with_existing_ordering(
+ let updated_left_oeq_class = left_oeq_class
+ .prefix_ordering_equivalent_class_with_existing_ordering(
&right_output_ordering,
- left_oeq_classes,
- join_eq_properties.classes(),
+ &join_eq_properties,
);
- new_properties.extend(updated_left_oeq_classes);
+ new_properties.extend(Some(updated_left_oeq_class));
}
}
(false, false) => {}
@@ -481,64 +457,6 @@ pub fn combine_join_ordering_equivalence_properties(
Ok(new_properties)
}
-/// Adds the `offset` value to `Column` indices inside `expr`. This function is
-/// generally used during the update of the right table schema in join
operations.
-pub(crate) fn add_offset_to_expr(
- expr: Arc<dyn PhysicalExpr>,
- offset: usize,
-) -> Result<Arc<dyn PhysicalExpr>> {
- expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
- Some(col) => Ok(Transformed::Yes(Arc::new(Column::new(
- col.name(),
- offset + col.index(),
- )))),
- None => Ok(Transformed::No(e)),
- })
-}
-
-/// Adds the `offset` value to `Column` indices inside `sort_expr.expr`.
-pub(crate) fn add_offset_to_sort_expr(
- sort_expr: &PhysicalSortExpr,
- offset: usize,
-) -> Result<PhysicalSortExpr> {
- Ok(PhysicalSortExpr {
- expr: add_offset_to_expr(sort_expr.expr.clone(), offset)?,
- options: sort_expr.options,
- })
-}
-
-/// Adds the `offset` value to `Column` indices for each `sort_expr.expr`
-/// inside `sort_exprs`.
-pub(crate) fn add_offset_to_lex_ordering(
- sort_exprs: LexOrderingRef,
- offset: usize,
-) -> Result<LexOrdering> {
- sort_exprs
- .iter()
- .map(|sort_expr| add_offset_to_sort_expr(sort_expr, offset))
- .collect()
-}
-
-/// Adds the `offset` value to `Column` indices for all expressions inside the
-/// given `OrderingEquivalentClass`es.
-pub(crate) fn add_offset_to_ordering_equivalence_classes(
- oeq_classes: &[OrderingEquivalentClass],
- offset: usize,
-) -> Result<Vec<OrderingEquivalentClass>> {
- oeq_classes
- .iter()
- .map(|prop| {
- let new_head = add_offset_to_lex_ordering(prop.head(), offset)?;
- let new_others = prop
- .others()
- .iter()
- .map(|ordering| add_offset_to_lex_ordering(ordering, offset))
- .collect::<Result<Vec<_>>>()?;
- Ok(OrderingEquivalentClass::new(new_head, new_others))
- })
- .collect()
-}
-
impl Display for JoinSide {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@@ -1920,7 +1838,7 @@ mod tests {
let join_type = JoinType::Inner;
let options = SortOptions::default();
- let right_oeq_classes = OrderingEquivalentClass::new(
+ let right_oeq_class = OrderingEquivalentClass::new(
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("x", 0)),
@@ -1957,9 +1875,9 @@ mod tests {
join_eq_properties
.add_equal_conditions((&Column::new("d", 3), &Column::new("w",
7)));
- let result = get_updated_right_ordering_equivalence_properties(
+ let result = get_updated_right_ordering_equivalent_class(
&join_type,
- &[right_oeq_classes],
+ &right_oeq_class,
left_columns_len,
&join_eq_properties,
)?;
@@ -1987,8 +1905,8 @@ mod tests {
]],
);
- assert_eq!(result[0].head(), expected.head());
- assert_eq!(result[0].others(), expected.others());
+ assert_eq!(result.head(), expected.head());
+ assert_eq!(result.others(), expected.others());
Ok(())
}
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index d36d93d29e..b29c8e9c7b 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -296,9 +296,9 @@ mod tests {
assert_eq!(mem_exec.output_ordering().unwrap(), expected_output_order);
let order_eq = mem_exec.ordering_equivalence_properties();
assert!(order_eq
- .classes()
- .iter()
- .any(|class| class.contains(&expected_order_eq)));
+ .oeq_class()
+ .map(|class| class.contains(&expected_order_eq))
+ .unwrap_or(false));
Ok(())
}
}
diff --git a/datafusion/physical-plan/src/projection.rs
b/datafusion/physical-plan/src/projection.rs
index f1ec0a68a6..4fc48e971c 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -40,11 +40,11 @@ use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::{Literal, UnKnownColumn};
use datafusion_physical_expr::{
- find_orderings_of_exprs, normalize_out_expr_with_columns_map,
- project_equivalence_properties, project_ordering_equivalence_properties,
- OrderingEquivalenceProperties,
+ normalize_out_expr_with_columns_map, project_equivalence_properties,
+ project_ordering_equivalence_properties, OrderingEquivalenceProperties,
};
+use datafusion_physical_expr::utils::find_orderings_of_exprs;
use futures::stream::{Stream, StreamExt};
use log::trace;
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index a7ed2bf5c7..b099107358 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -888,6 +888,112 @@ physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, 2 as Int64(2)]
--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b],
output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true
+# source is ordered by a,b,c
+# when filter result is constant for column a
+# ordering b, c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0
+ORDER BY b, c;
+----
+logical_plan
+Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC
NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d],
partial_filters=[annotated_data_finite2.a = Int32(0)]
+physical_plan
+SortPreservingMergeExec: [b@2 ASC NULLS LAST,c@3 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is constant for column a and b
+# ordering c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 and b=0
+ORDER BY c;
+----
+logical_plan
+Sort: annotated_data_finite2.c ASC NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b =
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d],
partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b
= Int32(0)]
+physical_plan
+SortPreservingMergeExec: [c@3 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0 AND b@2 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is constant for column a and b
+# ordering b, c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 and b=0
+ORDER BY b, c;
+----
+logical_plan
+Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC
NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b =
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d],
partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b
= Int32(0)]
+physical_plan
+SortPreservingMergeExec: [b@2 ASC NULLS LAST,c@3 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0 AND b@2 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is constant for column a and b
+# ordering a, b, c is still satisfied. Final plan shouldn't have
+# SortExec.
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 and b=0
+ORDER BY a, b, c;
+----
+logical_plan
+Sort: annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC
NULLS LAST, annotated_data_finite2.c ASC NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b =
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d],
partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b
= Int32(0)]
+physical_plan
+SortPreservingMergeExec: [a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS
LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: a@1 = 0 AND b@2 = 0
+------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC
NULLS LAST], has_header=true
+
+# source is ordered by a,b,c
+# when filter result is when filter contains or
+# column a, and b may not be constant. Hence final plan
+# should contain SortExec
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_finite2
+WHERE a=0 or b=0
+ORDER BY c;
+----
+logical_plan
+Sort: annotated_data_finite2.c ASC NULLS LAST
+--Filter: annotated_data_finite2.a = Int32(0) OR annotated_data_finite2.b =
Int32(0)
+----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d],
partial_filters=[annotated_data_finite2.a = Int32(0) OR
annotated_data_finite2.b = Int32(0)]
+physical_plan
+SortPreservingMergeExec: [c@3 ASC NULLS LAST]
+--SortExec: expr=[c@3 ASC NULLS LAST]
+----CoalesceBatchesExec: target_batch_size=8192
+------FilterExec: a@1 = 0 OR b@2 = 0
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC
NULLS LAST], has_header=true
+
statement ok
drop table annotated_data_finite2;
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index fe074da1bb..2eccb60aad 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -284,19 +284,20 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS
t2_sum
------------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
-----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as
t2_id]
+--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int),
t2_id@1 as t2_id]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
+--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as
t2_id]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------FilterExec: SUM(t2.t2_int)@1 < 3
+--------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
+----------------CoalesceBatchesExec: target_batch_size=8192
+------------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
+--------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
+----------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--------CoalesceBatchesExec: target_batch_size=8192
-----------FilterExec: SUM(t2.t2_int)@1 < 3
-------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
---------------CoalesceBatchesExec: target_batch_size=8192
-----------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
-------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
---------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
+------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
query II rowsort
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having
sum(t2_int) < 3) as t2_sum from t1
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index f8f8f30ade..3d9f7511be 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2342,11 +2342,10 @@ Limit: skip=0, fetch=5
----------TableScan: aggregate_test_100 projection=[c9]
physical_plan
GlobalLimitExec: skip=0, fetch=5
---SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]
-----ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 as rn1]
-------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field {
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }],
mode=[Sorted]
---------SortExec: expr=[c9@0 DESC]
-----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9],
has_header=true
+--ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 as rn1]
+----BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field {
name: "ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }],
mode=[Sorted]
+------SortExec: expr=[c9@0 DESC]
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9],
has_header=true
query II
SELECT c9, rn1 FROM (SELECT c9,