This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1015d98d9b Lexicographical Ordering Equivalence Support (#6431)
1015d98d9b is described below
commit 1015d98d9b59da67c257f70031abc4a359a7b828
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu May 25 11:44:13 2023 +0300
Lexicographical Ordering Equivalence Support (#6431)
* Convert ordering equivalence to vec (unit test)
* compiles
* Simplifications
* simplifications
* Remove unnecessary codes
* simplifications
* Add test cases
* fix bug
* simplifications
* Resolve linter errors
* remove unnecessary codes
* simplifications
* simplifications
* Remove unnecessary codes
* Add pruning to ordering_equivalence projection
* Remove unnecessary clones
* Convert get range to calculate compatible ranges
* Simplifications
* Update comments
* Update comments
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../core/src/physical_plan/aggregates/mod.rs | 4 +-
datafusion/core/src/physical_plan/windows/mod.rs | 51 +-
datafusion/physical-expr/src/equivalence.rs | 135 +++--
datafusion/physical-expr/src/utils.rs | 600 +++++++++++----------
4 files changed, 419 insertions(+), 371 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 5e6c5656c8..fc722e9ac3 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -1735,8 +1735,8 @@ mod tests {
eq_properties.add_equal_conditions((&col_a, &col_b));
let mut ordering_eq_properties =
OrderingEquivalenceProperties::new(test_schema);
ordering_eq_properties.add_equal_conditions((
- &OrderedColumn::new(col_a.clone(), options1),
- &OrderedColumn::new(col_c.clone(), options2),
+ &vec![OrderedColumn::new(col_a.clone(), options1)],
+ &vec![OrderedColumn::new(col_c.clone(), options2)],
));
let order_by_exprs = vec![
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index f6fe3bcaee..d4732cc1f6 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -262,7 +262,22 @@ pub(crate) fn window_ordering_equivalence(
.iter()
.cloned(),
);
- let out_ordering = input.output_ordering().unwrap_or(&[]);
+ let mut normalized_out_ordering = vec![];
+ for item in input.output_ordering().unwrap_or(&[]) {
+ // To account for ordering equivalences, first normalize the
expression:
+ let normalized = normalize_expr_with_equivalence_properties(
+ item.expr.clone(),
+ input.equivalence_properties().classes(),
+ );
+ // Currently we only support, ordering equivalences for `Column`
expressions.
+ // TODO: Add support for ordering equivalence for all `PhysicalExpr`s
+ if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
+ normalized_out_ordering
+ .push(OrderedColumn::new(column.clone(), item.options));
+ } else {
+ break;
+ }
+ }
for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
@@ -275,28 +290,18 @@ pub(crate) fn window_ordering_equivalence(
.is::<RowNumber>()
{
// If there is an existing ordering, add new ordering as an
equivalence:
- if let Some(first) = out_ordering.first() {
- // Normalize expression, as we search for ordering
equivalences
- // on normalized versions:
- let normalized =
normalize_expr_with_equivalence_properties(
- first.expr.clone(),
- input.equivalence_properties().classes(),
- );
- if let Some(column) =
normalized.as_any().downcast_ref::<Column>() {
- let column_info =
-
schema.column_with_name(expr.field().unwrap().name());
- if let Some((idx, field)) = column_info {
- let lhs = OrderedColumn::new(column.clone(),
first.options);
- let options = SortOptions {
- descending: false,
- nulls_first: false,
- }; // ASC, NULLS LAST
- let rhs = OrderedColumn::new(
- Column::new(field.name(), idx),
- options,
- );
- result.add_equal_conditions((&lhs, &rhs));
- }
+ if !normalized_out_ordering.is_empty() {
+ if let Some((idx, field)) =
+ schema.column_with_name(expr.field().unwrap().name())
+ {
+ let column = Column::new(field.name(), idx);
+ let options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ }; // ASC, NULLS LAST
+ let rhs = OrderedColumn::new(column, options);
+ result
+ .add_equal_conditions((&normalized_out_ordering,
&vec![rhs]));
}
}
}
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index ab3443424b..a1f2df9208 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -16,12 +16,14 @@
// under the License.
use crate::expressions::Column;
+use crate::{PhysicalSortExpr, PhysicalSortRequirement};
use arrow::datatypes::SchemaRef;
use arrow_schema::SortOptions;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
+use std::sync::Arc;
/// Equivalence Properties is a vec of EquivalentClass.
#[derive(Debug, Clone)]
@@ -117,7 +119,7 @@ impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
/// where both `a ASC` and `b DESC` can describe the table ordering. With
/// `OrderingEquivalenceProperties`, we can keep track of these equivalences
/// and treat `a ASC` and `b DESC` as the same ordering requirement.
-pub type OrderingEquivalenceProperties = EquivalenceProperties<OrderedColumn>;
+pub type OrderingEquivalenceProperties =
EquivalenceProperties<Vec<OrderedColumn>>;
/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are
known
/// to have the same value in all tuples in a relation.
`EquivalentClass<Column>`
@@ -198,36 +200,61 @@ impl OrderedColumn {
}
}
-trait ColumnAccessor {
- fn column(&self) -> &Column;
-}
-
-impl ColumnAccessor for Column {
- fn column(&self) -> &Column {
- self
+impl From<OrderedColumn> for PhysicalSortExpr {
+ fn from(value: OrderedColumn) -> Self {
+ PhysicalSortExpr {
+ expr: Arc::new(value.col) as _,
+ options: value.options,
+ }
}
}
-impl ColumnAccessor for OrderedColumn {
- fn column(&self) -> &Column {
- &self.col
+impl From<OrderedColumn> for PhysicalSortRequirement {
+ fn from(value: OrderedColumn) -> Self {
+ PhysicalSortRequirement {
+ expr: Arc::new(value.col) as _,
+ options: Some(value.options),
+ }
}
}
-pub type OrderingEquivalentClass = EquivalentClass<OrderedColumn>;
+/// `Vec<OrderedColumn>` stores the lexicographical ordering for a schema.
+/// OrderingEquivalentClass keeps track of different alternative orderings
than can
+/// describe the schema.
+/// For instance, for the table below
+/// |a|b|c|d|
+/// |1|4|3|1|
+/// |2|3|3|2|
+/// |3|1|2|2|
+/// |3|2|1|3|
+/// both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the ordering
of the table.
+/// For this case, we say that `vec![a ASC, b ASC]`, and `vec![c DESC, d ASC]`
are ordering equivalent.
+pub type OrderingEquivalentClass = EquivalentClass<Vec<OrderedColumn>>;
impl OrderingEquivalentClass {
- /// Finds the matching column inside the `OrderingEquivalentClass`.
- fn get_matching_column(&self, column: &Column) -> Option<OrderedColumn> {
- if self.head.col.eq(column) {
- Some(self.head.clone())
- } else {
- for item in &self.others {
- if item.col.eq(column) {
- return Some(item.clone());
+ /// This function extends ordering equivalences with alias information.
+ /// For instance, assume column a and b are aliases,
+ /// and column (a ASC), (c DESC) are ordering equivalent. We append (b
ASC) to ordering equivalence,
+ /// since b is alias of colum a. After this function (a ASC), (c DESC), (b
ASC) would be ordering equivalent.
+ fn update_with_aliases(&mut self, columns_map: &HashMap<Column,
Vec<Column>>) {
+ for (column, columns) in columns_map {
+ let mut to_insert = vec![];
+ for ordering in
std::iter::once(&self.head).chain(self.others.iter()) {
+ for (idx, item) in ordering.iter().enumerate() {
+ if item.col.eq(column) {
+ for col in columns {
+ let mut normalized = self.head.clone();
+ // Change the corresponding entry in the head with
the alias column:
+ let entry = &mut normalized[idx];
+ (entry.col, entry.options) = (col.clone(),
item.options);
+ to_insert.push(normalized);
+ }
+ }
}
}
- None
+ for items in to_insert {
+ self.insert(items);
+ }
}
}
}
@@ -242,10 +269,10 @@ pub fn project_equivalence_properties(
alias_map: &HashMap<Column, Vec<Column>>,
output_eq: &mut EquivalenceProperties,
) {
- let mut ec_classes = input_eq.classes().to_vec();
+ let mut eq_classes = input_eq.classes().to_vec();
for (column, columns) in alias_map {
let mut find_match = false;
- for class in ec_classes.iter_mut() {
+ for class in eq_classes.iter_mut() {
if class.contains(column) {
for col in columns {
class.insert(col.clone());
@@ -255,12 +282,29 @@ pub fn project_equivalence_properties(
}
}
if !find_match {
- ec_classes.push(EquivalentClass::new(column.clone(),
columns.clone()));
+ eq_classes.push(EquivalentClass::new(column.clone(),
columns.clone()));
+ }
+ }
+
+ // Prune columns that are no longer in the schema from equivalences.
+ let schema = output_eq.schema();
+ let fields = schema.fields();
+ for class in eq_classes.iter_mut() {
+ let columns_to_remove = class
+ .iter()
+ .filter(|column| {
+ let idx = column.index();
+ idx >= fields.len() || fields[idx].name() != column.name()
+ })
+ .cloned()
+ .collect::<Vec<_>>();
+ for column in columns_to_remove {
+ class.remove(&column);
}
}
+ eq_classes.retain(|props| props.len() > 1);
- prune_columns_to_remove(output_eq, &mut ec_classes);
- output_eq.extend(ec_classes);
+ output_eq.extend(eq_classes);
}
/// This function applies the given projection to the given ordering
@@ -275,39 +319,22 @@ pub fn project_ordering_equivalence_properties(
columns_map: &HashMap<Column, Vec<Column>>,
output_eq: &mut OrderingEquivalenceProperties,
) {
- let mut ec_classes = input_eq.classes().to_vec();
- for (column, columns) in columns_map {
- for class in ec_classes.iter_mut() {
- if let Some(OrderedColumn { options, .. }) =
class.get_matching_column(column)
- {
- for col in columns {
- class.insert(OrderedColumn {
- col: col.clone(),
- options,
- });
- }
- break;
- }
- }
+ let mut eq_classes = input_eq.classes().to_vec();
+ for class in eq_classes.iter_mut() {
+ class.update_with_aliases(columns_map);
}
- prune_columns_to_remove(output_eq, &mut ec_classes);
- output_eq.extend(ec_classes);
-}
-
-fn prune_columns_to_remove<T: Eq + Hash + Clone + ColumnAccessor>(
- eq_properties: &EquivalenceProperties<T>,
- eq_classes: &mut Vec<EquivalentClass<T>>,
-) {
- let schema = eq_properties.schema();
+ // Prune columns that no longer is in the schema from from the
OrderingEquivalenceProperties.
+ let schema = output_eq.schema();
let fields = schema.fields();
for class in eq_classes.iter_mut() {
let columns_to_remove = class
.iter()
- .filter(|elem| {
- let column = elem.column();
- let idx = column.index();
- idx >= fields.len() || fields[idx].name() != column.name()
+ .filter(|columns| {
+ columns.iter().any(|column| {
+ let idx = column.col.index();
+ idx >= fields.len() || fields[idx].name() !=
column.col.name()
+ })
})
.cloned()
.collect::<Vec<_>>();
@@ -316,6 +343,8 @@ fn prune_columns_to_remove<T: Eq + Hash + Clone +
ColumnAccessor>(
}
}
eq_classes.retain(|props| props.len() > 1);
+
+ output_eq.extend(eq_classes);
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index a8a0625ca0..54a18a1573 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -16,14 +16,13 @@
// under the License.
use crate::equivalence::{
- EquivalenceProperties, EquivalentClass, OrderedColumn,
OrderingEquivalenceProperties,
+ EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
OrderingEquivalentClass,
};
use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
use arrow::datatypes::SchemaRef;
-use arrow_schema::SortOptions;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
};
@@ -35,6 +34,7 @@ use petgraph::stable_graph::StableGraph;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::collections::HashSet;
+use std::ops::Range;
use std::sync::Arc;
/// Compare the two expr lists are equal no matter the order.
@@ -164,70 +164,6 @@ pub fn normalize_expr_with_equivalence_properties(
.unwrap_or(expr)
}
-fn normalize_expr_with_ordering_equivalence_properties(
- expr: Arc<dyn PhysicalExpr>,
- sort_options: SortOptions,
- eq_properties: &[OrderingEquivalentClass],
-) -> (Arc<dyn PhysicalExpr>, SortOptions) {
- let normalized_expr = expr
- .clone()
- .transform(&|expr| {
- let normalized_form =
- expr.as_any().downcast_ref::<Column>().and_then(|column| {
- for class in eq_properties {
- let ordered_column = OrderedColumn {
- col: column.clone(),
- options: sort_options,
- };
- if class.contains(&ordered_column) {
- return Some(class.head().clone());
- }
- }
- None
- });
- Ok(if let Some(normalized_form) = normalized_form {
- Transformed::Yes(Arc::new(normalized_form.col) as _)
- } else {
- Transformed::No(expr)
- })
- })
- .unwrap_or_else(|_| expr.clone());
- if expr.ne(&normalized_expr) {
- if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
- for eq_class in eq_properties.iter() {
- let head = eq_class.head();
- if head.col.eq(col) {
- // Use options of the normalized version:
- return (normalized_expr, head.options);
- }
- }
- }
- }
- (expr, sort_options)
-}
-
-fn normalize_sort_expr_with_equivalence_properties(
- mut sort_expr: PhysicalSortExpr,
- eq_properties: &[EquivalentClass],
-) -> PhysicalSortExpr {
- sort_expr.expr =
- normalize_expr_with_equivalence_properties(sort_expr.expr,
eq_properties);
- sort_expr
-}
-
-fn normalize_sort_expr_with_ordering_equivalence_properties(
- mut sort_expr: PhysicalSortExpr,
- eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortExpr {
- (sort_expr.expr, sort_expr.options) =
- normalize_expr_with_ordering_equivalence_properties(
- sort_expr.expr.clone(),
- sort_expr.options,
- eq_properties,
- );
- sort_expr
-}
-
fn normalize_sort_requirement_with_equivalence_properties(
mut sort_requirement: PhysicalSortRequirement,
eq_properties: &[EquivalentClass],
@@ -237,47 +173,121 @@ fn
normalize_sort_requirement_with_equivalence_properties(
sort_requirement
}
-fn normalize_sort_requirement_with_ordering_equivalence_properties(
- mut sort_requirement: PhysicalSortRequirement,
- eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortRequirement {
- if let Some(options) = &mut sort_requirement.options {
- (sort_requirement.expr, *options) =
- normalize_expr_with_ordering_equivalence_properties(
- sort_requirement.expr,
- *options,
- eq_properties,
- );
+/// This function searches for the slice `section` inside the slice `given`.
+/// It returns each range where `section` is compatible with the corresponding
+/// slice in `given`.
+fn get_compatible_ranges(
+ given: &[PhysicalSortRequirement],
+ section: &[PhysicalSortRequirement],
+) -> Vec<Range<usize>> {
+ let n_section = section.len();
+ let n_end = if given.len() >= n_section {
+ given.len() - n_section + 1
+ } else {
+ 0
+ };
+ (0..n_end)
+ .filter_map(|idx| {
+ let end = idx + n_section;
+ given[idx..end]
+ .iter()
+ .zip(section)
+ .all(|(req, given)| given.compatible(req))
+ .then_some(Range { start: idx, end })
+ })
+ .collect()
+}
+
+/// This function constructs a duplicate-free vector by filtering out duplicate
+/// entries inside the given vector `input`.
+fn collapse_vec<T: PartialEq>(input: Vec<T>) -> Vec<T> {
+ let mut output = vec![];
+ for item in input {
+ if !output.contains(&item) {
+ output.push(item);
+ }
}
- sort_requirement
+ output
}
-pub fn normalize_sort_expr(
- sort_expr: PhysicalSortExpr,
+/// Transform `sort_exprs` vector, to standardized version using
`eq_properties` and `ordering_eq_properties`
+/// Assume `eq_properties` states that `Column a` and `Column b` are aliases.
+/// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]`
and `vec![a ASC, c ASC]` are
+/// ordering equivalent (in the sense that both describe the ordering of the
table).
+/// If the `sort_exprs` input to this function were `vec![b ASC, c ASC]`,
+/// This function converts `sort_exprs` `vec![b ASC, c ASC]` to first `vec![a
ASC, c ASC]` after considering `eq_properties`
+/// Then converts `vec![a ASC, c ASC]` to `vec![d ASC]` after considering
`ordering_eq_properties`.
+/// Standardized version `vec![d ASC]` is used in subsequent operations.
+pub fn normalize_sort_exprs(
+ sort_exprs: &[PhysicalSortExpr],
eq_properties: &[EquivalentClass],
ordering_eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortExpr {
- let normalized =
- normalize_sort_expr_with_equivalence_properties(sort_expr,
eq_properties);
- normalize_sort_expr_with_ordering_equivalence_properties(
- normalized,
+) -> Vec<PhysicalSortExpr> {
+ let sort_requirements =
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+ let normalized_exprs = normalize_sort_requirements(
+ &sort_requirements,
+ eq_properties,
ordering_eq_properties,
- )
+ );
+ let normalized_exprs =
PhysicalSortRequirement::to_sort_exprs(normalized_exprs);
+ collapse_vec(normalized_exprs)
}
-pub fn normalize_sort_requirement(
- sort_requirement: PhysicalSortRequirement,
+/// Transform `sort_reqs` vector, to standardized version using
`eq_properties` and `ordering_eq_properties`
+/// Assume `eq_properties` states that `Column a` and `Column b` are aliases.
+/// Also assume `ordering_eq_properties` states that ordering `vec![d ASC]`
and `vec![a ASC, c ASC]` are
+/// ordering equivalent (in the sense that both describe the ordering of the
table).
+/// If the `sort_reqs` input to this function were `vec![b Some(ASC), c None]`,
+/// This function converts `sort_exprs` `vec![b Some(ASC), c None]` to first
`vec![a Some(ASC), c None]` after considering `eq_properties`
+/// Then converts `vec![a Some(ASC), c None]` to `vec![d Some(ASC)]` after
considering `ordering_eq_properties`.
+/// Standardized version `vec![d Some(ASC)]` is used in subsequent operations.
+pub fn normalize_sort_requirements(
+ sort_reqs: &[PhysicalSortRequirement],
eq_properties: &[EquivalentClass],
ordering_eq_properties: &[OrderingEquivalentClass],
-) -> PhysicalSortRequirement {
- let normalized = normalize_sort_requirement_with_equivalence_properties(
- sort_requirement,
- eq_properties,
- );
- normalize_sort_requirement_with_ordering_equivalence_properties(
- normalized,
- ordering_eq_properties,
- )
+) -> Vec<PhysicalSortRequirement> {
+ let mut normalized_exprs = sort_reqs
+ .iter()
+ .map(|sort_req| {
+ normalize_sort_requirement_with_equivalence_properties(
+ sort_req.clone(),
+ eq_properties,
+ )
+ })
+ .collect::<Vec<_>>();
+ for ordering_eq_class in ordering_eq_properties {
+ for item in ordering_eq_class.others() {
+ let item = item
+ .clone()
+ .into_iter()
+ .map(|elem| elem.into())
+ .collect::<Vec<_>>();
+ let ranges = get_compatible_ranges(&normalized_exprs, &item);
+ let mut offset: i64 = 0;
+ for Range { start, end } in ranges {
+ let mut head = ordering_eq_class
+ .head()
+ .clone()
+ .into_iter()
+ .map(|elem| elem.into())
+ .collect::<Vec<PhysicalSortRequirement>>();
+ let updated_start = (start as i64 + offset) as usize;
+ let updated_end = (end as i64 + offset) as usize;
+ let range = end - start;
+ offset += head.len() as i64 - range as i64;
+ let all_none = normalized_exprs[updated_start..updated_end]
+ .iter()
+ .all(|req| req.options.is_none());
+ if all_none {
+ for req in head.iter_mut() {
+ req.options = None;
+ }
+ }
+ normalized_exprs.splice(updated_start..updated_end, head);
+ }
+ }
+ }
+ collapse_vec(normalized_exprs)
}
/// Checks whether given ordering requirements are satisfied by provided
[PhysicalSortExpr]s.
@@ -317,17 +327,10 @@ pub fn ordering_satisfy_concrete<
let ordering_eq_classes = oeq_properties.classes();
let eq_properties = equal_properties();
let eq_classes = eq_properties.classes();
- let mut required_normalized = Vec::new();
- for expr in required {
- let item = normalize_sort_expr(expr.clone(), eq_classes,
ordering_eq_classes);
- if !required_normalized.contains(&item) {
- required_normalized.push(item);
- }
- }
- let provided_normalized = provided
- .iter()
- .map(|e| normalize_sort_expr(e.clone(), eq_classes,
ordering_eq_classes))
- .collect::<Vec<_>>();
+ let required_normalized =
+ normalize_sort_exprs(required, eq_classes, ordering_eq_classes);
+ let provided_normalized =
+ normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
if required_normalized.len() > provided_normalized.len() {
return false;
}
@@ -375,18 +378,10 @@ pub fn ordering_satisfy_requirement_concrete<
let ordering_eq_classes = oeq_properties.classes();
let eq_properties = equal_properties();
let eq_classes = eq_properties.classes();
- let mut required_normalized = Vec::new();
- for req in required {
- let item =
- normalize_sort_requirement(req.clone(), eq_classes,
ordering_eq_classes);
- if !required_normalized.contains(&item) {
- required_normalized.push(item);
- }
- }
- let provided_normalized = provided
- .iter()
- .map(|e| normalize_sort_expr(e.clone(), eq_classes,
ordering_eq_classes))
- .collect::<Vec<_>>();
+ let required_normalized =
+ normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+ let provided_normalized =
+ normalize_sort_exprs(provided, eq_classes, ordering_eq_classes);
if required_normalized.len() > provided_normalized.len() {
return false;
}
@@ -434,18 +429,11 @@ fn requirements_compatible_concrete<
let ordering_eq_classes = oeq_properties.classes();
let eq_properties = equal_properties();
let eq_classes = eq_properties.classes();
- let mut required_normalized = Vec::new();
- for req in required {
- let item =
- normalize_sort_requirement(req.clone(), eq_classes,
ordering_eq_classes);
- if !required_normalized.contains(&item) {
- required_normalized.push(item);
- }
- }
- let provided_normalized = provided
- .iter()
- .map(|e| normalize_sort_requirement(e.clone(), eq_classes,
ordering_eq_classes))
- .collect::<Vec<_>>();
+
+ let required_normalized =
+ normalize_sort_requirements(required, eq_classes, ordering_eq_classes);
+ let provided_normalized =
+ normalize_sort_requirements(provided, eq_classes, ordering_eq_classes);
if required_normalized.len() > provided_normalized.len() {
return false;
}
@@ -708,11 +696,12 @@ pub fn reassign_predicate_columns(
mod tests {
use super::*;
use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
- use crate::PhysicalSortExpr;
+ use crate::{OrderedColumn, PhysicalSortExpr};
use arrow::compute::SortOptions;
use datafusion_common::{Result, ScalarValue};
use std::fmt::{Display, Formatter};
+ use crate::equivalence::OrderingEquivalenceProperties;
use arrow_schema::{DataType, Field, Schema};
use petgraph::visit::Bfs;
use std::sync::Arc;
@@ -774,10 +763,10 @@ mod tests {
OrderingEquivalenceProperties,
)> {
// Assume schema satisfies ordering a ASC NULLS LAST
- // and d ASC NULLS LAST and e DESC NULLS FIRST
+ // and d ASC NULLS LAST, b ASC NULLS LAST and e DESC NULLS FIRST, b
ASC NULLS LAST
// Assume that column a and c are aliases.
let col_a = &Column::new("a", 0);
- let _col_b = &Column::new("b", 1);
+ let col_b = &Column::new("b", 1);
let col_c = &Column::new("c", 2);
let col_d = &Column::new("d", 3);
let col_e = &Column::new("e", 4);
@@ -795,12 +784,18 @@ mod tests {
let mut ordering_eq_properties =
OrderingEquivalenceProperties::new(test_schema.clone());
ordering_eq_properties.add_equal_conditions((
- &OrderedColumn::new(col_a.clone(), option1),
- &OrderedColumn::new(col_d.clone(), option1),
+ &vec![OrderedColumn::new(col_a.clone(), option1)],
+ &vec![
+ OrderedColumn::new(col_d.clone(), option1),
+ OrderedColumn::new(col_b.clone(), option1),
+ ],
));
ordering_eq_properties.add_equal_conditions((
- &OrderedColumn::new(col_a.clone(), option1),
- &OrderedColumn::new(col_e.clone(), option2),
+ &vec![OrderedColumn::new(col_a.clone(), option1)],
+ &vec![
+ OrderedColumn::new(col_e.clone(), option2),
+ OrderedColumn::new(col_b.clone(), option1),
+ ],
));
Ok((test_schema, eq_properties, ordering_eq_properties))
}
@@ -1125,10 +1120,47 @@ mod tests {
(vec![(col_c, option1)], true),
(vec![(col_c, option2)], false),
// Test whether ordering equivalence works as expected
- (vec![(col_d, option1)], true),
- (vec![(col_d, option2)], false),
- (vec![(col_e, option2)], true),
- (vec![(col_e, option1)], false),
+ (vec![(col_d, option1)], false),
+ (vec![(col_d, option1), (col_b, option1)], true),
+ (vec![(col_d, option2), (col_b, option1)], false),
+ (vec![(col_e, option2), (col_b, option1)], true),
+ (vec![(col_e, option1), (col_b, option1)], false),
+ (
+ vec![
+ (col_d, option1),
+ (col_b, option1),
+ (col_d, option1),
+ (col_b, option1),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option1),
+ (col_b, option1),
+ (col_e, option2),
+ (col_b, option1),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option1),
+ (col_b, option1),
+ (col_d, option2),
+ (col_b, option1),
+ ],
+ false,
+ ),
+ (
+ vec![
+ (col_d, option1),
+ (col_b, option1),
+ (col_e, option1),
+ (col_b, option1),
+ ],
+ false,
+ ),
];
for (cols, expected) in requirements {
let err_msg = format!("Error in test case:{cols:?}");
@@ -1155,6 +1187,71 @@ mod tests {
Ok(())
}
+ fn convert_to_requirement(
+ in_data: &[(&Column, Option<SortOptions>)],
+ ) -> Vec<PhysicalSortRequirement> {
+ in_data
+ .iter()
+ .map(|(col, options)| {
+ PhysicalSortRequirement::new(Arc::new((*col).clone()) as _,
*options)
+ })
+ .collect::<Vec<_>>()
+ }
+
+ #[test]
+ fn test_normalize_sort_reqs() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let col_b = &Column::new("b", 1);
+ let col_c = &Column::new("c", 2);
+ let col_d = &Column::new("d", 3);
+ let col_e = &Column::new("e", 4);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option2 = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ // First element in the tuple stores vector of requirement, second
element is the expected return value for ordering_satisfy function
+ let requirements = vec![
+ (vec![(col_a, Some(option1))], vec![(col_a, Some(option1))]),
+ (vec![(col_a, None)], vec![(col_a, None)]),
+ // Test whether equivalence works as expected
+ (vec![(col_c, Some(option1))], vec![(col_a, Some(option1))]),
+ (vec![(col_c, None)], vec![(col_a, None)]),
+ // Test whether ordering equivalence works as expected
+ (
+ vec![(col_d, Some(option1)), (col_b, Some(option1))],
+ vec![(col_a, Some(option1))],
+ ),
+ (vec![(col_d, None), (col_b, None)], vec![(col_a, None)]),
+ (
+ vec![(col_e, Some(option2)), (col_b, Some(option1))],
+ vec![(col_a, Some(option1))],
+ ),
+ // We should be able to normalize in compatible requirements also
(not exactly equal)
+ (
+ vec![(col_e, Some(option2)), (col_b, None)],
+ vec![(col_a, Some(option1))],
+ ),
+ (vec![(col_e, None), (col_b, None)], vec![(col_a, None)]),
+ ];
+ let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
+ let eq_classes = eq_properties.classes();
+ let ordering_eq_classes = ordering_eq_properties.classes();
+ for (reqs, expected_normalized) in requirements.into_iter() {
+ let req = convert_to_requirement(&reqs);
+ let expected_normalized =
convert_to_requirement(&expected_normalized);
+
+ assert_eq!(
+ normalize_sort_requirements(&req, eq_classes,
ordering_eq_classes),
+ expected_normalized
+ );
+ }
+ Ok(())
+ }
+
#[test]
fn test_reassign_predicate_columns_in_list() {
let int_field = Field::new("should_not_matter", DataType::Int64, true);
@@ -1195,30 +1292,26 @@ mod tests {
#[test]
fn test_normalize_expr_with_equivalence() -> Result<()> {
let col_a = &Column::new("a", 0);
- let _col_b = &Column::new("b", 1);
+ let col_b = &Column::new("b", 1);
let col_c = &Column::new("c", 2);
- let col_d = &Column::new("d", 3);
- let col_e = &Column::new("e", 4);
- let option1 = SortOptions {
- descending: false,
- nulls_first: false,
- };
- let option2 = SortOptions {
- descending: true,
- nulls_first: true,
- };
- // Assume schema satisfies ordering a ASC NULLS LAST
- // and d ASC NULLS LAST and e DESC NULLS FIRST
+ let _col_d = &Column::new("d", 3);
+ let _col_e = &Column::new("e", 4);
// Assume that column a and c are aliases.
- let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
+ let (_test_schema, eq_properties, _ordering_eq_properties) =
+ create_test_params()?;
let col_a_expr = Arc::new(col_a.clone()) as Arc<dyn PhysicalExpr>;
+ let col_b_expr = Arc::new(col_b.clone()) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(col_c.clone()) as Arc<dyn PhysicalExpr>;
- let col_d_expr = Arc::new(col_d.clone()) as Arc<dyn PhysicalExpr>;
- let col_e_expr = Arc::new(col_e.clone()) as Arc<dyn PhysicalExpr>;
// Test cases for equivalence normalization,
// First entry in the tuple is argument, second entry is expected
result after normalization.
- let expressions = vec![(&col_a_expr, &col_a_expr), (&col_c_expr,
&col_a_expr)];
+ let expressions = vec![
+ // Normalized version of the column a and c should go to a (since
a is head)
+ (&col_a_expr, &col_a_expr),
+ (&col_c_expr, &col_a_expr),
+ // Cannot normalize column b
+ (&col_b_expr, &col_b_expr),
+ ];
for (expr, expected_eq) in expressions {
assert!(
expected_eq.eq(&normalize_expr_with_equivalence_properties(
@@ -1229,102 +1322,6 @@ mod tests {
);
}
- // Test cases for ordering equivalence normalization
- // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
- let expressions = vec![
- (&col_d_expr, option1, option1, &col_a_expr),
- (&col_e_expr, option2, option1, &col_a_expr),
- // Cannot normalize, hence should return itself.
- (&col_e_expr, option1, option1, &col_e_expr),
- ];
- for (expr, sort_options, expected_options, expected_ordering_eq) in
expressions {
- let (normalized_expr, options) =
- normalize_expr_with_ordering_equivalence_properties(
- expr.clone(),
- sort_options,
- ordering_eq_properties.classes(),
- );
- assert!(
- normalized_expr.eq(expected_ordering_eq) && (expected_options
== options),
- "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
- );
- }
- Ok(())
- }
-
- #[test]
- fn test_normalize_sort_expr_with_equivalence() -> Result<()> {
- let col_a = &Column::new("a", 0);
- let _col_b = &Column::new("b", 1);
- let col_c = &Column::new("c", 2);
- let col_d = &Column::new("d", 3);
- let col_e = &Column::new("e", 4);
- let option1 = SortOptions {
- descending: false,
- nulls_first: false,
- };
- let option2 = SortOptions {
- descending: true,
- nulls_first: true,
- };
- // Assume schema satisfies ordering a ASC NULLS LAST
- // and d ASC NULLS LAST and e DESC NULLS FIRST
- // Assume that column a and c are aliases.
- let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
-
- // Test cases for equivalence normalization
- // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
- let expressions = vec![
- (&col_a, option1, &col_a, option1),
- (&col_c, option1, &col_a, option1),
- // Cannot normalize column d, since it is not in equivalence
properties.
- (&col_d, option1, &col_d, option1),
- ];
- for (expr, sort_options, expected_col, expected_options) in
- expressions.into_iter()
- {
- let expected = PhysicalSortExpr {
- expr: Arc::new((*expected_col).clone()) as _,
- options: expected_options,
- };
- let arg = PhysicalSortExpr {
- expr: Arc::new((*expr).clone()) as _,
- options: sort_options,
- };
- assert!(
- expected.eq(&normalize_sort_expr_with_equivalence_properties(
- arg.clone(),
- eq_properties.classes()
- )),
- "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
- );
- }
-
- // Test cases for ordering equivalence normalization
- // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
- let expressions = vec![
- (&col_d, option1, &col_a, option1),
- (&col_e, option2, &col_a, option1),
- ];
- for (expr, sort_options, expected_col, expected_options) in
- expressions.into_iter()
- {
- let expected = PhysicalSortExpr {
- expr: Arc::new((*expected_col).clone()) as _,
- options: expected_options,
- };
- let arg = PhysicalSortExpr {
- expr: Arc::new((*expr).clone()) as _,
- options: sort_options,
- };
- assert!(
-
expected.eq(&normalize_sort_expr_with_ordering_equivalence_properties(
- arg.clone(),
- ordering_eq_properties.classes()
- )),
- "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
- );
- }
Ok(())
}
@@ -1334,19 +1331,14 @@ mod tests {
let _col_b = &Column::new("b", 1);
let col_c = &Column::new("c", 2);
let col_d = &Column::new("d", 3);
- let col_e = &Column::new("e", 4);
+ let _col_e = &Column::new("e", 4);
let option1 = SortOptions {
descending: false,
nulls_first: false,
};
- let option2 = SortOptions {
- descending: true,
- nulls_first: true,
- };
- // Assume schema satisfies ordering a ASC NULLS LAST
- // and d ASC NULLS LAST and e DESC NULLS FIRST
// Assume that column a and c are aliases.
- let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
+ let (_test_schema, eq_properties, _ordering_eq_properties) =
+ create_test_params()?;
// Test cases for equivalence normalization
// First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
@@ -1377,33 +1369,6 @@ mod tests {
);
}
- // Test cases for ordering equivalence normalization
- // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
- let expressions = vec![
- (&col_d, Some(option1), &col_a, Some(option1)),
- (&col_e, Some(option2), &col_a, Some(option1)),
- ];
- for (expr, sort_options, expected_col, expected_options) in
- expressions.into_iter()
- {
- let expected = PhysicalSortRequirement::new(
- Arc::new((*expected_col).clone()) as _,
- expected_options,
- );
- let arg = PhysicalSortRequirement::new(
- Arc::new((*expr).clone()) as _,
- sort_options,
- );
- assert!(
- expected.eq(
-
&normalize_sort_requirement_with_ordering_equivalence_properties(
- arg.clone(),
- ordering_eq_properties.classes()
- )
- ),
- "error in test: expr: {expr:?}, sort_options: {sort_options:?}"
- );
- }
Ok(())
}
@@ -1426,8 +1391,8 @@ mod tests {
// Column a and e are ordering equivalent (e.g global ordering of the
table can be described both as a ASC and e ASC.)
let mut ordering_eq_properties =
OrderingEquivalenceProperties::new(test_schema);
ordering_eq_properties.add_equal_conditions((
- &OrderedColumn::new(col_a.clone(), option1),
- &OrderedColumn::new(col_e.clone(), option1),
+ &vec![OrderedColumn::new(col_a.clone(), option1)],
+ &vec![OrderedColumn::new(col_e.clone(), option1)],
));
let sort_req_a = PhysicalSortExpr {
expr: Arc::new((col_a).clone()) as _,
@@ -1491,4 +1456,53 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_get_compatible_ranges() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let col_b = &Column::new("b", 1);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let test_data = vec![
+ (
+ vec![(col_a, Some(option1)), (col_b, Some(option1))],
+ vec![(col_a, Some(option1))],
+ vec![(0, 1)],
+ ),
+ (
+ vec![(col_a, None), (col_b, Some(option1))],
+ vec![(col_a, Some(option1))],
+ vec![(0, 1)],
+ ),
+ (
+ vec![
+ (col_a, None),
+ (col_b, Some(option1)),
+ (col_a, Some(option1)),
+ ],
+ vec![(col_a, Some(option1))],
+ vec![(0, 1), (2, 3)],
+ ),
+ ];
+ for (searched, to_search, expected) in test_data {
+ let searched = convert_to_requirement(&searched);
+ let to_search = convert_to_requirement(&to_search);
+ let expected = expected
+ .into_iter()
+ .map(|(start, end)| Range { start, end })
+ .collect::<Vec<_>>();
+ assert_eq!(get_compatible_ranges(&searched, &to_search), expected);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_collapse_vec() -> Result<()> {
+ assert_eq!(collapse_vec(vec![1, 2, 3]), vec![1, 2, 3]);
+ assert_eq!(collapse_vec(vec![1, 2, 3, 2, 3]), vec![1, 2, 3]);
+ assert_eq!(collapse_vec(vec![3, 1, 2, 3, 2, 3]), vec![3, 1, 2]);
+ Ok(())
+ }
}