This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2fd704ca74 Projection Order Propagation (#7364)
2fd704ca74 is described below
commit 2fd704ca74be0504f4a051890af5cc1b7189de76
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Aug 24 01:01:07 2023 +0300
Projection Order Propagation (#7364)
* projection exec is updated, get_ordering method is added to physical
expr's
* fix after merge
* simplifications
* Refactor, normalization code
* Simplifications
* mustafa's simplifications
* test source update
* Comment edited
* Simplifications
* Code improvements, comment reviews
* Comments are enriched
* tests added, ExtendedSortOptions renamed
* fix after merge
* Minor change.
* Address reviews
* structural changes
* Finalizing changes
---------
Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: metesynnada <[email protected]>
---
datafusion/core/src/physical_plan/joins/utils.rs | 25 +-
datafusion/core/src/physical_plan/projection.rs | 102 ++++--
datafusion/physical-expr/src/equivalence.rs | 68 ++--
datafusion/physical-expr/src/expressions/binary.rs | 50 +--
datafusion/physical-expr/src/expressions/cast.rs | 7 +
.../physical-expr/src/expressions/literal.rs | 5 +
.../physical-expr/src/expressions/negative.rs | 11 +-
datafusion/physical-expr/src/lib.rs | 6 +-
datafusion/physical-expr/src/physical_expr.rs | 14 +
datafusion/physical-expr/src/sort_properties.rs | 277 ++++++++++++++++
datafusion/physical-expr/src/utils.rs | 360 +++++++++++++++++++--
datafusion/sqllogictest/test_files/order.slt | 35 ++
12 files changed, 837 insertions(+), 123 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
index 4f7b0023f4..3842d7d7ea 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -222,7 +222,7 @@ pub fn calculate_join_output_ordering(
);
merge_vectors(&right_ordering, left_ordering)
} else {
- right_ordering
+ right_ordering.to_vec()
}
}
// Doesn't maintain ordering, output ordering is None.
@@ -310,17 +310,20 @@ pub fn cross_join_equivalence_properties(
new_properties
}
-/// Update right table ordering equivalences so that: (1) They point to valid
-/// indices at the output of the join schema, and (2) they are normalized
w.r.t.
-/// given equivalence properties. To do so, we first increment column indices
by
-/// the left table size when join schema consists of a combination of left and
-/// right schemas (Inner, Left, Full, Right joins).
-/// Then, we normalize the sort expressions of ordering equivalences one by
one.
-/// We make sure that each expression in the ordering equivalence is either:
-/// - Is the head of an equivalent classes, or
+/// Update right table ordering equivalences so that:
+/// - They point to valid indices at the output of the join schema, and
+/// - They are normalized with respect to equivalence columns.
+///
+/// To do so, we increment column indices by the size of the left table when
+/// join schema consists of a combination of left and right schema (Inner,
+/// Left, Full, Right joins). Then, we normalize the sort expressions of
+/// ordering equivalences one by one. We make sure that each expression in the
+/// ordering equivalence is either:
+/// - The head of the one of the equivalent classes, or
/// - Doesn't have an equivalent column.
-/// This way, once we normalize an expression according to equivalence
properties,
-/// then it can be safely used for ordering equivalence normalization.
+///
+/// This way; once we normalize an expression according to equivalence
properties,
+/// it can thereafter safely be used for ordering equivalence normalization.
fn get_updated_right_ordering_equivalence_properties(
join_type: &JoinType,
right_oeq_classes: &[OrderingEquivalentClass],
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index 86449e8ea4..f7f8b0f452 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -26,28 +26,28 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
+use super::expressions::{Column, PhysicalSortExpr};
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream,
Statistics};
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
Partitioning, PhysicalExpr,
};
+
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use datafusion_common::Result;
use datafusion_execution::TaskContext;
-use futures::stream::{Stream, StreamExt};
-use log::trace;
-
-use super::expressions::{Column, PhysicalSortExpr};
-use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
-use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream,
Statistics};
-
-use
datafusion_physical_expr::equivalence::update_ordering_equivalence_with_cast;
-use datafusion_physical_expr::expressions::{CastExpr, Literal};
+use datafusion_physical_expr::expressions::{Literal, UnKnownColumn};
use datafusion_physical_expr::{
- normalize_out_expr_with_columns_map, project_equivalence_properties,
- project_ordering_equivalence_properties, OrderingEquivalenceProperties,
+ find_orderings_of_exprs, normalize_out_expr_with_columns_map,
+ project_equivalence_properties, project_ordering_equivalence_properties,
+ OrderingEquivalenceProperties,
};
+use futures::stream::{Stream, StreamExt};
+use log::trace;
+
/// Execution plan for a projection
#[derive(Debug)]
pub struct ProjectionExec {
@@ -64,6 +64,10 @@ pub struct ProjectionExec {
columns_map: HashMap<Column, Vec<Column>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ /// Expressions' normalized orderings (as given by the output ordering API
+ /// and normalized with respect to equivalence classes of input plan). The
+ /// projected expressions are mapped by their indices to this vector.
+ orderings: Vec<Option<PhysicalSortExpr>>,
}
impl ProjectionExec {
@@ -136,13 +140,24 @@ impl ProjectionExec {
None => None,
};
+ let orderings = find_orderings_of_exprs(
+ &expr,
+ input.output_ordering(),
+ input.equivalence_properties(),
+ input.ordering_equivalence_properties(),
+ )?;
+
+ let output_ordering =
+ validate_output_ordering(output_ordering, &orderings, &expr);
+
Ok(Self {
expr,
schema,
- input: input.clone(),
+ input,
output_ordering,
columns_map,
metrics: ExecutionPlanMetricsSet::new(),
+ orderings,
})
}
@@ -251,17 +266,7 @@ impl ExecutionPlan for ProjectionExec {
return new_properties;
}
- let mut input_oeq = self.input().ordering_equivalence_properties();
- // Stores cast expression and its `Column` version in the output:
- let mut cast_exprs: Vec<(CastExpr, Column)> = vec![];
- for (idx, (expr, name)) in self.expr.iter().enumerate() {
- if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
- let target_col = Column::new(name, idx);
- cast_exprs.push((cast_expr.clone(), target_col));
- }
- }
-
- update_ordering_equivalence_with_cast(&cast_exprs, &mut input_oeq);
+ let input_oeq = self.input().ordering_equivalence_properties();
project_ordering_equivalence_properties(
input_oeq,
@@ -269,6 +274,23 @@ impl ExecutionPlan for ProjectionExec {
&mut new_properties,
);
+ if let Some(leading_ordering) = self
+ .output_ordering
+ .as_ref()
+ .map(|output_ordering| &output_ordering[0])
+ {
+ for order in self.orderings.iter().flatten() {
+ if !order.eq(leading_ordering)
+ && !new_properties.satisfies_leading_ordering(order)
+ {
+ new_properties.add_equal_conditions((
+ &vec![leading_ordering.clone()],
+ &vec![order.clone()],
+ ));
+ }
+ }
+ }
+
new_properties
}
@@ -318,6 +340,40 @@ impl ExecutionPlan for ProjectionExec {
}
}
+/// This function takes the current `output_ordering`, the `orderings` based
on projected expressions,
+/// and the `expr` representing the projected expressions themselves. It aims
to ensure that the output
+/// ordering is valid and correctly corresponds to the projected columns.
+///
+/// If the leading expression in the `output_ordering` is an
[`UnKnownColumn`], it indicates that the column
+/// referenced in the ordering is not found among the projected expressions.
In such cases, this function
+/// attempts to create a new output ordering by referring to valid columns
from the leftmost side of the
+/// expressions that have an ordering specified.
+fn validate_output_ordering(
+ output_ordering: Option<Vec<PhysicalSortExpr>>,
+ orderings: &[Option<PhysicalSortExpr>],
+ expr: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Option<Vec<PhysicalSortExpr>> {
+ output_ordering.and_then(|ordering| {
+ // If the leading expression is invalid column, change output
+ // ordering of the projection so that it refers to valid columns if
+ // possible.
+ if ordering[0].expr.as_any().is::<UnKnownColumn>() {
+ for (idx, order) in orderings.iter().enumerate() {
+ if let Some(sort_expr) = order {
+ let (_, col_name) = &expr[idx];
+ return Some(vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new(col_name, idx)),
+ options: sort_expr.options,
+ }]);
+ }
+ }
+ None
+ } else {
+ Some(ordering)
+ }
+ })
+}
+
/// If e is a direct column reference, returns the field level
/// metadata for that field, if any. Otherwise returns None
fn get_field_metadata(
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index a2477b0546..b8ca1acc1c 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use crate::expressions::{CastExpr, Column};
+use crate::expressions::Column;
+use crate::utils::collect_columns;
use crate::{
normalize_expr_with_equivalence_properties, LexOrdering, PhysicalExpr,
PhysicalSortExpr,
@@ -24,7 +25,6 @@ use crate::{
use arrow::datatypes::SchemaRef;
use arrow_schema::Fields;
-use crate::utils::collect_columns;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::sync::Arc;
@@ -133,6 +133,24 @@ impl<T: Eq + Clone + Hash> EquivalenceProperties<T> {
/// and treat `a ASC` and `b DESC` as the same ordering requirement.
pub type OrderingEquivalenceProperties = EquivalenceProperties<LexOrdering>;
+impl OrderingEquivalenceProperties {
+ /// Checks whether `leading_ordering` is contained in any of the ordering
+ /// equivalence classes.
+ pub fn satisfies_leading_ordering(
+ &self,
+ leading_ordering: &PhysicalSortExpr,
+ ) -> bool {
+ for cls in &self.classes {
+ for ordering in
cls.others.iter().chain(std::iter::once(&cls.head)) {
+ if ordering[0].eq(leading_ordering) {
+ return true;
+ }
+ }
+ }
+ false
+ }
+}
+
/// EquivalentClass is a set of [`Column`]s or [`PhysicalSortExpr`]s that are
known
/// to have the same value in all tuples in a relation.
`EquivalentClass<Column>`
/// is generated by equality predicates, typically equijoin conditions and
equality
@@ -414,7 +432,13 @@ pub fn project_equivalence_properties(
class.remove(&column);
}
}
- eq_classes.retain(|props| props.len() > 1);
+
+ eq_classes.retain(|props| {
+ props.len() > 1
+ &&
+ // A column should not give an equivalence with itself.
+ !(props.len() == 2 &&
props.head.eq(props.others().iter().next().unwrap()))
+ });
output_eq.extend(eq_classes);
}
@@ -446,7 +470,7 @@ pub fn project_ordering_equivalence_properties(
class.update_with_aliases(&oeq_alias_map, fields);
}
- // Prune columns that no longer is in the schema from from the
OrderingEquivalenceProperties.
+ // Prune columns that are no longer in the schema from the
OrderingEquivalenceProperties.
for class in eq_classes.iter_mut() {
let sort_exprs_to_remove = class
.iter()
@@ -471,42 +495,6 @@ pub fn project_ordering_equivalence_properties(
output_eq.extend(eq_classes);
}
-/// Update `ordering` if it contains cast expression with target column
-/// after projection, if there is no cast expression among `ordering`
expressions,
-/// returns `None`.
-fn update_with_cast_exprs(
- cast_exprs: &[(CastExpr, Column)],
- mut ordering: LexOrdering,
-) -> Option<LexOrdering> {
- let mut is_changed = false;
- for sort_expr in ordering.iter_mut() {
- for (cast_expr, target_col) in cast_exprs.iter() {
- if sort_expr.expr.eq(cast_expr.expr()) {
- sort_expr.expr = Arc::new(target_col.clone()) as _;
- is_changed = true;
- }
- }
- }
- is_changed.then_some(ordering)
-}
-
-/// Update cast expressions inside ordering equivalence
-/// properties with its target column after projection
-pub fn update_ordering_equivalence_with_cast(
- cast_exprs: &[(CastExpr, Column)],
- input_oeq: &mut OrderingEquivalenceProperties,
-) {
- for cls in input_oeq.classes.iter_mut() {
- for ordering in
-
std::iter::once(cls.head().clone()).chain(cls.others().clone().into_iter())
- {
- if let Some(updated_ordering) = update_with_cast_exprs(cast_exprs,
ordering) {
- cls.insert(updated_ordering);
- }
- }
- }
-}
-
/// Retrieves the ordering equivalence properties for a given schema and
output ordering.
pub fn ordering_equivalence_properties_helper(
schema: SchemaRef,
diff --git a/datafusion/physical-expr/src/expressions/binary.rs
b/datafusion/physical-expr/src/expressions/binary.rs
index ece5089ba3..2729d5e56d 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -22,6 +22,16 @@ mod kernels_arrow;
use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
+use crate::array_expressions::{
+ array_append, array_concat, array_has_all, array_prepend,
+};
+use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
+use crate::intervals::{apply_operator, Interval};
+use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
+use crate::PhysicalExpr;
+
+use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
use arrow::array::*;
use arrow::compute::cast;
use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
@@ -43,12 +53,14 @@ use arrow::compute::kernels::comparison::{
eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar,
lt_dyn_utf8_scalar,
lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
};
+use arrow::compute::kernels::concat_elements::concat_elements_utf8;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
-
-use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
-use arrow::compute::kernels::concat_elements::concat_elements_utf8;
-
+use arrow_array::{Datum, Scalar};
+use datafusion_common::cast::as_boolean_array;
+use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::type_coercion::binary::get_result_type;
+use datafusion_expr::{ColumnarValue, Operator};
use kernels::{
bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn,
bitwise_or_dyn_scalar,
bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar,
bitwise_shift_right_dyn,
@@ -63,22 +75,6 @@ use kernels_arrow::{
is_not_distinct_from_utf8,
};
-use crate::array_expressions::{
- array_append, array_concat, array_has_all, array_prepend,
-};
-use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
-use crate::intervals::{apply_operator, Interval};
-use crate::physical_expr::down_cast_any_ref;
-use crate::PhysicalExpr;
-use arrow_array::{Datum, Scalar};
-
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::internal_err;
-use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::type_coercion::binary::get_result_type;
-use datafusion_expr::{ColumnarValue, Operator};
-
/// Binary expression
#[derive(Debug, Hash, Clone)]
pub struct BinaryExpr {
@@ -694,6 +690,20 @@ impl PhysicalExpr for BinaryExpr {
let mut s = state;
self.hash(&mut s);
}
+
+ /// For each operator, [`BinaryExpr`] has distinct ordering rules.
+ /// TODO: There may be rules specific to some data types (such as division
and multiplication on unsigned integers)
+ fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
+ let (left_child, right_child) = (&children[0], &children[1]);
+ match self.op() {
+ Operator::Plus => left_child.add(right_child),
+ Operator::Minus => left_child.sub(right_child),
+ Operator::Gt | Operator::GtEq =>
left_child.gt_or_gteq(right_child),
+ Operator::Lt | Operator::LtEq =>
right_child.gt_or_gteq(left_child),
+ Operator::And => left_child.and(right_child),
+ _ => SortProperties::Unordered,
+ }
+ }
}
impl PartialEq<dyn Any> for BinaryExpr {
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr/src/expressions/cast.rs
index b1046f88e9..cd70c6e274 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -22,7 +22,9 @@ use std::sync::Arc;
use crate::intervals::Interval;
use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
+
use arrow::compute;
use arrow::compute::{kernels, CastOptions};
use arrow::datatypes::{DataType, Schema};
@@ -138,6 +140,11 @@ impl PhysicalExpr for CastExpr {
// Add `self.cast_options` when hash is available
// https://github.com/apache/arrow-rs/pull/4395
}
+
+ /// A [`CastExpr`] preserves the ordering of its child.
+ fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
+ children[0]
+ }
}
impl PartialEq<dyn Any> for CastExpr {
diff --git a/datafusion/physical-expr/src/expressions/literal.rs
b/datafusion/physical-expr/src/expressions/literal.rs
index 8e86716123..517a0fe411 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -22,6 +22,7 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;
use arrow::{
@@ -88,6 +89,10 @@ impl PhysicalExpr for Literal {
let mut s = state;
self.hash(&mut s);
}
+
+ fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
+ SortProperties::Singleton
+ }
}
impl PartialEq<dyn Any> for Literal {
diff --git a/datafusion/physical-expr/src/expressions/negative.rs
b/datafusion/physical-expr/src/expressions/negative.rs
index 897f3b0d52..90430cb2bb 100644
--- a/datafusion/physical-expr/src/expressions/negative.rs
+++ b/datafusion/physical-expr/src/expressions/negative.rs
@@ -21,14 +21,16 @@ use std::any::Any;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
+use crate::physical_expr::down_cast_any_ref;
+use crate::sort_properties::SortProperties;
+use crate::PhysicalExpr;
+
use arrow::{
compute::kernels::numeric::neg_wrapping,
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
-use crate::physical_expr::down_cast_any_ref;
-use crate::PhysicalExpr;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::{
type_coercion::{is_interval, is_null, is_signed_numeric},
@@ -102,6 +104,11 @@ impl PhysicalExpr for NegativeExpr {
let mut s = state;
self.hash(&mut s);
}
+
+ /// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
+ fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
+ -children[0]
+ }
}
impl PartialEq<dyn Any> for NegativeExpr {
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index c61077f1a0..a8e49bfbd6 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -38,6 +38,7 @@ pub mod planner;
pub mod regex_expressions;
mod scalar_function;
mod sort_expr;
+pub mod sort_properties;
pub mod string_expressions;
pub mod struct_expressions;
pub mod tree_node;
@@ -53,12 +54,12 @@ pub use aggregate::groups_accumulator::{
};
pub use aggregate::AggregateExpr;
pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
-
pub use equivalence::{
ordering_equivalence_properties_helper, project_equivalence_properties,
project_ordering_equivalence_properties, EquivalenceProperties,
EquivalentClass,
OrderingEquivalenceProperties, OrderingEquivalentClass,
};
+
pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
@@ -67,8 +68,9 @@ pub use sort_expr::{
LexOrdering, LexOrderingRef, LexOrderingReq, PhysicalSortExpr,
PhysicalSortRequirement,
};
+pub use sort_properties::update_ordering;
pub use utils::{
- expr_list_eq_any_order, expr_list_eq_strict_order,
+ expr_list_eq_any_order, expr_list_eq_strict_order, find_orderings_of_exprs,
normalize_expr_with_equivalence_properties,
normalize_ordering_equivalence_classes,
normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction,
};
diff --git a/datafusion/physical-expr/src/physical_expr.rs
b/datafusion/physical-expr/src/physical_expr.rs
index 29d7a6320a..ce3b7b6cf4 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::intervals::Interval;
+use crate::sort_properties::SortProperties;
use crate::utils::scatter;
use arrow::array::BooleanArray;
@@ -122,6 +123,19 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug +
PartialEq<dyn Any> {
/// Note: [`PhysicalExpr`] is not constrained by [`Hash`]
/// directly because it must remain object safe.
fn dyn_hash(&self, _state: &mut dyn Hasher);
+
+ /// The order information of a PhysicalExpr can be estimated from its
children.
+ /// This is especially helpful for projection expressions. If we can
ensure that the
+ /// order of a PhysicalExpr to project matches with the order of SortExec,
we can
+ /// eliminate that SortExecs.
+ ///
+ /// By recursively calling this function, we can obtain the overall order
+ /// information of the PhysicalExpr. Since `SortOptions` cannot fully
handle
+ /// the propagation of unordered columns and literals, the `SortProperties`
+ /// struct is used.
+ fn get_ordering(&self, _children: &[SortProperties]) -> SortProperties {
+ SortProperties::Unordered
+ }
}
impl Hash for dyn PhysicalExpr {
diff --git a/datafusion/physical-expr/src/sort_properties.rs
b/datafusion/physical-expr/src/sort_properties.rs
new file mode 100644
index 0000000000..001b86e60a
--- /dev/null
+++ b/datafusion/physical-expr/src/sort_properties.rs
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ops::Neg, sync::Arc};
+
+use crate::expressions::Column;
+use crate::utils::get_indices_of_matching_sort_exprs_with_order_eq;
+use crate::{
+ EquivalenceProperties, OrderingEquivalenceProperties, PhysicalExpr,
PhysicalSortExpr,
+};
+
+use arrow_schema::SortOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
+use datafusion_common::Result;
+
+use itertools::Itertools;
+
+/// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is
insufficient
+/// to simply use `Option<SortOptions>`: There must be a differentiation
between
+/// unordered columns and literal values, since literals may not break the
ordering
+/// when they are used as a child of some binary expression when the other
child has
+/// some ordering. On the other hand, unordered columns cannot maintain
ordering when
+/// they take part in such operations.
+///
+/// Example: ((a_ordered + b_unordered) + c_ordered) expression cannot end up
with
+/// sorted data; however the ((a_ordered + 999) + c_ordered) expression can.
Therefore,
+/// we need two different variants for literals and unordered columns as
literals are
+/// often more ordering-friendly under most mathematical operations.
+#[derive(PartialEq, Debug, Clone, Copy)]
+pub enum SortProperties {
+ /// Use the ordinary [`SortOptions`] struct to represent ordered data:
+ Ordered(SortOptions),
+ // This alternative represents unordered data:
+ Unordered,
+ // Singleton is used for single-valued literal numbers:
+ Singleton,
+}
+
+impl SortProperties {
+ pub fn add(&self, rhs: &Self) -> Self {
+ match (self, rhs) {
+ (Self::Singleton, _) => *rhs,
+ (_, Self::Singleton) => *self,
+ (Self::Ordered(lhs), Self::Ordered(rhs))
+ if lhs.descending == rhs.descending =>
+ {
+ Self::Ordered(SortOptions {
+ descending: lhs.descending,
+ nulls_first: lhs.nulls_first || rhs.nulls_first,
+ })
+ }
+ _ => Self::Unordered,
+ }
+ }
+
+ pub fn sub(&self, rhs: &Self) -> Self {
+ match (self, rhs) {
+ (Self::Singleton, Self::Singleton) => Self::Singleton,
+ (Self::Singleton, Self::Ordered(rhs)) => Self::Ordered(SortOptions
{
+ descending: !rhs.descending,
+ nulls_first: rhs.nulls_first,
+ }),
+ (_, Self::Singleton) => *self,
+ (Self::Ordered(lhs), Self::Ordered(rhs))
+ if lhs.descending != rhs.descending =>
+ {
+ Self::Ordered(SortOptions {
+ descending: lhs.descending,
+ nulls_first: lhs.nulls_first || rhs.nulls_first,
+ })
+ }
+ _ => Self::Unordered,
+ }
+ }
+
+ pub fn gt_or_gteq(&self, rhs: &Self) -> Self {
+ match (self, rhs) {
+ (Self::Singleton, Self::Ordered(rhs)) => Self::Ordered(SortOptions
{
+ descending: !rhs.descending,
+ nulls_first: rhs.nulls_first,
+ }),
+ (_, Self::Singleton) => *self,
+ (Self::Ordered(lhs), Self::Ordered(rhs))
+ if lhs.descending != rhs.descending =>
+ {
+ *self
+ }
+ _ => Self::Unordered,
+ }
+ }
+
+ pub fn and(&self, rhs: &Self) -> Self {
+ match (self, rhs) {
+ (Self::Ordered(lhs), Self::Ordered(rhs))
+ if lhs.descending == rhs.descending =>
+ {
+ Self::Ordered(SortOptions {
+ descending: lhs.descending,
+ nulls_first: lhs.nulls_first || rhs.nulls_first,
+ })
+ }
+ (Self::Ordered(opt), Self::Singleton)
+ | (Self::Singleton, Self::Ordered(opt)) =>
Self::Ordered(SortOptions {
+ descending: opt.descending,
+ nulls_first: opt.nulls_first,
+ }),
+ (Self::Singleton, Self::Singleton) => Self::Singleton,
+ _ => Self::Unordered,
+ }
+ }
+}
+
+impl Neg for SortProperties {
+ type Output = Self;
+
+ fn neg(self) -> Self::Output {
+ match self {
+ SortProperties::Ordered(SortOptions {
+ descending,
+ nulls_first,
+ }) => SortProperties::Ordered(SortOptions {
+ descending: !descending,
+ nulls_first,
+ }),
+ SortProperties::Singleton => SortProperties::Singleton,
+ SortProperties::Unordered => SortProperties::Unordered,
+ }
+ }
+}
+
+/// The `ExprOrdering` struct is designed to aid in the determination of
ordering (represented
+/// by [`SortProperties`]) for a given [`PhysicalExpr`]. When analyzing the
orderings
+/// of a [`PhysicalExpr`], the process begins by assigning the ordering of its
leaf nodes.
+/// By propagating these leaf node orderings upwards in the expression tree,
the overall
+/// ordering of the entire [`PhysicalExpr`] can be derived.
+///
+/// This struct holds the necessary state information for each expression in
the [`PhysicalExpr`].
+/// It encapsulates the orderings (`state`) associated with the expression
(`expr`), and
+/// orderings of the children expressions (`children_states`). The
[`ExprOrdering`] of a parent
+/// expression is determined based on the [`ExprOrdering`] states of its
children expressions.
+#[derive(Debug)]
+pub struct ExprOrdering {
+ pub expr: Arc<dyn PhysicalExpr>,
+ pub state: Option<SortProperties>,
+ pub children_states: Option<Vec<SortProperties>>,
+}
+
+impl ExprOrdering {
+ pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
+ Self {
+ expr,
+ state: None,
+ children_states: None,
+ }
+ }
+
+ pub fn children(&self) -> Vec<ExprOrdering> {
+ self.expr
+ .children()
+ .into_iter()
+ .map(|e| ExprOrdering::new(e))
+ .collect()
+ }
+
+ pub fn new_with_children(
+ children_states: Vec<SortProperties>,
+ parent_expr: Arc<dyn PhysicalExpr>,
+ ) -> Self {
+ Self {
+ expr: parent_expr,
+ state: None,
+ children_states: Some(children_states),
+ }
+ }
+}
+
+impl TreeNode for ExprOrdering {
+ fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
+ where
+ F: FnMut(&Self) -> Result<VisitRecursion>,
+ {
+ for child in self.children() {
+ match op(&child)? {
+ VisitRecursion::Continue => {}
+ VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
+ VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
+ }
+ }
+ Ok(VisitRecursion::Continue)
+ }
+
+ fn map_children<F>(self, transform: F) -> Result<Self>
+ where
+ F: FnMut(Self) -> Result<Self>,
+ {
+ let children = self.children();
+ if children.is_empty() {
+ Ok(self)
+ } else {
+ Ok(ExprOrdering::new_with_children(
+ children
+ .into_iter()
+ .map(transform)
+ .map_ok(|c| c.state.unwrap_or(SortProperties::Unordered))
+ .collect::<Result<Vec<_>>>()?,
+ self.expr,
+ ))
+ }
+ }
+}
+
+/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node.
+/// The node is either a leaf node, or an intermediate node:
+/// - If it is a leaf node, the children states are `None`. We directly find
+/// the order of the node by looking at the given sort expression and
equivalence
+/// properties if it is a `Column` leaf, or we mark it as unordered. In the
case
+/// of a `Literal` leaf, we mark it as singleton so that it can cooperate with
+/// some ordered columns at the upper steps.
+/// - If it is an intermediate node, the children states matter. Each
`PhysicalExpr`
+/// and operator has its own rules about how to propagate the children
orderings.
+/// However, before the children order propagation, it is checked that whether
+/// the intermediate node can be directly matched with the sort expression. If
there
+/// is a match, the sort expression emerges at that node immediately,
discarding
+/// the order coming from the children.
+pub fn update_ordering(
+ mut node: ExprOrdering,
+ sort_expr: &PhysicalSortExpr,
+ equal_properties: &EquivalenceProperties,
+ ordering_equal_properties: &OrderingEquivalenceProperties,
+) -> Result<Transformed<ExprOrdering>> {
+ // If we can directly match a sort expr with the current node, we can set
+ // its state and return early.
+ // TODO: If there is a PhysicalExpr other than a Column at this node (e.g.
+ // a BinaryExpr like a + b), and there is an ordering equivalence of
+ // it (let's say like c + d), we actually can find it at this step.
+ if sort_expr.expr.eq(&node.expr) {
+ node.state = Some(SortProperties::Ordered(sort_expr.options));
+ return Ok(Transformed::Yes(node));
+ }
+
+ if let Some(children_sort_options) = &node.children_states {
+ // We have an intermediate (non-leaf) node, account for its children:
+ node.state = Some(node.expr.get_ordering(children_sort_options));
+ } else if let Some(column) = node.expr.as_any().downcast_ref::<Column>() {
+ // We have a Column, which is one of the two possible leaf node types:
+ node.state = get_indices_of_matching_sort_exprs_with_order_eq(
+ &[sort_expr.clone()],
+ &[column.clone()],
+ equal_properties,
+ ordering_equal_properties,
+ )
+ .map(|(sort_options, _)| {
+ SortProperties::Ordered(SortOptions {
+ descending: sort_options[0].descending,
+ nulls_first: sort_options[0].nulls_first,
+ })
+ });
+ } else {
+ // We have a Literal, which is the other possible leaf node type:
+ node.state = Some(node.expr.get_ordering(&[]));
+ }
+ Ok(Transformed::Yes(node))
+}
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index 1492d6008e..6d3a68d40c 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,11 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+use std::borrow::Borrow;
+use std::collections::{HashMap, HashSet};
+use std::ops::Range;
+use std::sync::Arc;
+
use crate::equivalence::{
EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,
OrderingEquivalentClass,
};
use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
+use crate::sort_properties::{ExprOrdering, SortProperties};
+use crate::update_ordering;
use crate::{
LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
};
@@ -27,19 +34,16 @@ use crate::{
use arrow::array::{make_array, Array, ArrayRef, BooleanArray,
MutableArrayData};
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
use arrow::datatypes::SchemaRef;
+use arrow_schema::SortOptions;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
};
+use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_common::Result;
use datafusion_expr::Operator;
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;
-use std::borrow::Borrow;
-use std::collections::HashMap;
-use std::collections::HashSet;
-use std::ops::Range;
-use std::sync::Arc;
/// Compare the two expr lists are equal no matter the order.
/// For example two InListExpr can be considered to be equals no matter the
order:
@@ -155,10 +159,9 @@ pub fn normalize_expr_with_equivalence_properties(
.unwrap_or(expr)
}
-/// This function returns the normalized version of `sort_expr` with respect to
-/// `eq_properties`, if possible. Otherwise, it returns its first argument as
is.
-/// Note that this simply means returning the head [`PhysicalSortExpr`] in the
-/// given equivalence set.
+/// This function normalizes `sort_expr` according to `eq_properties`. If the
+/// given sort expression doesn't belong to equivalence set `eq_properties`,
+/// it returns `sort_expr` as is.
fn normalize_sort_expr_with_equivalence_properties(
mut sort_expr: PhysicalSortExpr,
eq_properties: &[EquivalentClass],
@@ -168,11 +171,9 @@ fn normalize_sort_expr_with_equivalence_properties(
sort_expr
}
-/// This function returns the normalized version of every [`PhysicalSortExpr`]
-/// in `sort_exprs` w.r.t. `eq_properties`, if possible. The
[`PhysicalSortExpr`]s
-/// for which this is impossible are returned as is. Basically, this function
-/// applies [`normalize_sort_expr_with_equivalence_properties`] to multiple
-/// [`PhysicalSortExpr`]s at once.
+/// This function applies the
[`normalize_sort_expr_with_equivalence_properties`]
+/// function for all sort expressions in `sort_exprs` and returns a vector of
+/// normalized sort expressions.
pub fn normalize_sort_exprs_with_equivalence_properties(
sort_exprs: LexOrderingRef,
eq_properties: &EquivalenceProperties,
@@ -188,8 +189,9 @@ pub fn normalize_sort_exprs_with_equivalence_properties(
.collect()
}
-/// This function returns the head [`PhysicalSortRequirement`] of equivalence
set of a [`PhysicalSortRequirement`],
-/// if there is any, otherwise; returns the same [`PhysicalSortRequirement`].
+/// This function normalizes `sort_requirement` according to `eq_properties`.
+/// If the given sort requirement doesn't belong to equivalence set
+/// `eq_properties`, it returns `sort_requirement` as is.
fn normalize_sort_requirement_with_equivalence_properties(
mut sort_requirement: PhysicalSortRequirement,
eq_properties: &[EquivalentClass],
@@ -258,10 +260,9 @@ pub fn normalize_sort_exprs(
let normalized_exprs =
PhysicalSortRequirement::to_sort_exprs(normalized_exprs);
collapse_vec(normalized_exprs)
}
-
-/// This function "normalizes" its argument `oeq_classes` by making sure that
-/// it only refers to representative (i.e. head) entries in the given
equivlance
-/// properties (`eq_properties`).
+/// This function normalizes `oeq_classes` expressions according to
`eq_properties`.
+/// More explicitly, it makes sure that expressions in `oeq_classes` are head
entries
+/// in `eq_properties`, replacing any non-head entries with head entries if
necessary.
pub fn normalize_ordering_equivalence_classes(
oeq_classes: &[OrderingEquivalentClass],
eq_properties: &EquivalenceProperties,
@@ -565,7 +566,7 @@ pub fn get_indices_of_matching_exprs<
/// This function finds the indices of `targets` within `items` using strict
/// equality.
-fn get_indices_of_exprs_strict<T: Borrow<Arc<dyn PhysicalExpr>>>(
+pub fn get_indices_of_exprs_strict<T: Borrow<Arc<dyn PhysicalExpr>>>(
targets: impl IntoIterator<Item = T>,
items: &[Arc<dyn PhysicalExpr>],
) -> Vec<usize> {
@@ -827,21 +828,172 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array)
-> Result<ArrayRef> {
Ok(make_array(data))
}
+/// Return indices of each item in `required_exprs` inside `provided_exprs`.
+/// All the items should be found inside `provided_exprs`. Found indices will
+/// be a permutation of the range 0, 1, ..., N. For example, \[2,1,0\] is valid
+/// (\[0,1,2\] is consecutive), but \[3,1,0\] is not valid (\[0,1,3\] is not
+/// consecutive).
+fn get_lexicographical_match_indices(
+ required_exprs: &[Arc<dyn PhysicalExpr>],
+ provided_exprs: &[Arc<dyn PhysicalExpr>],
+) -> Option<Vec<usize>> {
+ let indices_of_equality = get_indices_of_exprs_strict(required_exprs,
provided_exprs);
+ let mut ordered_indices = indices_of_equality.clone();
+ ordered_indices.sort();
+ let n_match = indices_of_equality.len();
+ let first_n = longest_consecutive_prefix(ordered_indices);
+ (n_match == required_exprs.len() && first_n == n_match && n_match > 0)
+ .then_some(indices_of_equality)
+}
+
+/// Attempts to find a full match between the required columns to be ordered
(lexicographically), and
+/// the provided sort options (lexicographically), while considering
equivalence properties.
+///
+/// It starts by normalizing members of both the required columns and the
provided sort options.
+/// If a full match is found, returns the sort options and indices of the
matches. If no full match is found,
+/// the function proceeds to check against ordering equivalence properties. If
still no full match is found,
+/// the function returns `None`.
+pub fn get_indices_of_matching_sort_exprs_with_order_eq(
+ provided_sorts: &[PhysicalSortExpr],
+ required_columns: &[Column],
+ eq_properties: &EquivalenceProperties,
+ order_eq_properties: &OrderingEquivalenceProperties,
+) -> Option<(Vec<SortOptions>, Vec<usize>)> {
+ // Create a vector of `PhysicalSortRequirement`s from the required columns:
+ let sort_requirement_on_requirements = required_columns
+ .iter()
+ .map(|required_column| PhysicalSortRequirement {
+ expr: Arc::new(required_column.clone()) as _,
+ options: None,
+ })
+ .collect::<Vec<_>>();
+
+ let normalized_required = normalize_sort_requirements(
+ &sort_requirement_on_requirements,
+ eq_properties.classes(),
+ &[],
+ );
+ let normalized_provided = normalize_sort_requirements(
+ &PhysicalSortRequirement::from_sort_exprs(provided_sorts.iter()),
+ eq_properties.classes(),
+ &[],
+ );
+
+ let provided_sorts = normalized_provided
+ .iter()
+ .map(|req| req.expr.clone())
+ .collect::<Vec<_>>();
+
+ let normalized_required_expr = normalized_required
+ .iter()
+ .map(|req| req.expr.clone())
+ .collect::<Vec<_>>();
+
+ if let Some(indices_of_equality) =
+ get_lexicographical_match_indices(&normalized_required_expr,
&provided_sorts)
+ {
+ return Some((
+ indices_of_equality
+ .iter()
+ .filter_map(|index| normalized_provided[*index].options)
+ .collect(),
+ indices_of_equality,
+ ));
+ }
+
+ // We did not find all the expressions, consult ordering equivalence
properties:
+ for class in order_eq_properties.classes() {
+ let head = class.head();
+ for ordering in class.others().iter().chain(std::iter::once(head)) {
+ let order_eq_class_exprs = convert_to_expr(ordering);
+ if let Some(indices_of_equality) =
get_lexicographical_match_indices(
+ &normalized_required_expr,
+ &order_eq_class_exprs,
+ ) {
+ return Some((
+ indices_of_equality
+ .iter()
+ .map(|index| ordering[*index].options)
+ .collect(),
+ indices_of_equality,
+ ));
+ }
+ }
+ }
+ // If no match found, return `None`:
+ None
+}
+
+/// Calculates the output orderings for a set of expressions within the
context of a given
+/// execution plan. The resulting orderings are all in the type of [`Column`],
since these
+/// expressions become [`Column`] after the projection step. The expressions
having an alias
+/// are renamed with those aliases in the returned [`PhysicalSortExpr`]'s. If
an expression
+/// is found to be unordered, the corresponding entry in the output vector is
`None`.
+///
+/// # Arguments
+///
+/// * `expr` - A slice of tuples containing expressions and their
corresponding aliases.
+///
+/// * `input` - A reference to an execution plan that provides output ordering
and equivalence
+/// properties.
+///
+/// # Returns
+///
+/// A `Result` containing a vector of optional [`PhysicalSortExpr`]'s. Each
element of the
+/// vector corresponds to an expression from the input slice. If an expression
can be ordered,
+/// the corresponding entry is `Some(PhysicalSortExpr)`. If an expression
cannot be ordered,
+/// the entry is `None`.
+pub fn find_orderings_of_exprs(
+ expr: &[(Arc<dyn PhysicalExpr>, String)],
+ input_output_ordering: Option<&[PhysicalSortExpr]>,
+ input_equal_properties: EquivalenceProperties,
+ input_ordering_equal_properties: OrderingEquivalenceProperties,
+) -> Result<Vec<Option<PhysicalSortExpr>>> {
+ let mut orderings: Vec<Option<PhysicalSortExpr>> = vec![];
+ if let Some(leading_ordering) =
+ input_output_ordering.map(|output_ordering| &output_ordering[0])
+ {
+ for (index, (expression, name)) in expr.iter().enumerate() {
+ let initial_expr = ExprOrdering::new(expression.clone());
+ let transformed = initial_expr.transform_up(&|expr| {
+ update_ordering(
+ expr,
+ leading_ordering,
+ &input_equal_properties,
+ &input_ordering_equal_properties,
+ )
+ })?;
+ if let Some(SortProperties::Ordered(sort_options)) =
transformed.state {
+ orderings.push(Some(PhysicalSortExpr {
+ expr: Arc::new(Column::new(name, index)),
+ options: sort_options,
+ }));
+ } else {
+ orderings.push(None);
+ }
+ }
+ }
+ Ok(orderings)
+}
+
#[cfg(test)]
mod tests {
+ use std::fmt::{Display, Formatter};
+ use std::ops::Not;
+ use std::sync::Arc;
+
use super::*;
+ use crate::equivalence::OrderingEquivalenceProperties;
use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
use crate::PhysicalSortExpr;
+
use arrow::compute::SortOptions;
use arrow_array::Int32Array;
+ use arrow_schema::{DataType, Field, Schema};
use datafusion_common::cast::{as_boolean_array, as_int32_array};
use datafusion_common::{Result, ScalarValue};
- use std::fmt::{Display, Formatter};
- use crate::equivalence::OrderingEquivalenceProperties;
- use arrow_schema::{DataType, Field, Schema};
use petgraph::visit::Bfs;
- use std::sync::Arc;
#[derive(Clone)]
struct DummyProperty {
@@ -1662,4 +1814,162 @@ mod tests {
assert_eq!(&expected, result);
Ok(())
}
+
+ #[test]
+ fn test_get_indices_of_matching_sort_exprs_with_order_eq() -> Result<()> {
+ let sort_options = SortOptions::default();
+ let sort_options_not = SortOptions::default().not();
+
+ let provided_sorts = [
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: sort_options_not,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: sort_options,
+ },
+ ];
+ let required_columns = [Column::new("b", 1), Column::new("a", 0)];
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ ]);
+ let equal_properties =
EquivalenceProperties::new(Arc::new(schema.clone()));
+ let ordering_equal_properties =
+ OrderingEquivalenceProperties::new(Arc::new(schema));
+ assert_eq!(
+ get_indices_of_matching_sort_exprs_with_order_eq(
+ &provided_sorts,
+ &required_columns,
+ &equal_properties,
+ &ordering_equal_properties,
+ ),
+ Some((vec![sort_options_not, sort_options], vec![0, 1]))
+ );
+
+ // required columns are provided in the equivalence classes
+ let provided_sorts = [PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: sort_options,
+ }];
+ let required_columns = [Column::new("b", 1), Column::new("a", 0)];
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ ]);
+ let equal_properties =
EquivalenceProperties::new(Arc::new(schema.clone()));
+ let mut ordering_equal_properties =
+ OrderingEquivalenceProperties::new(Arc::new(schema));
+ ordering_equal_properties.add_equal_conditions((
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: sort_options,
+ }],
+ &vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: sort_options_not,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: sort_options,
+ },
+ ],
+ ));
+ assert_eq!(
+ get_indices_of_matching_sort_exprs_with_order_eq(
+ &provided_sorts,
+ &required_columns,
+ &equal_properties,
+ &ordering_equal_properties,
+ ),
+ Some((vec![sort_options_not, sort_options], vec![0, 1]))
+ );
+
+ // not satisfied orders
+ let provided_sorts = [
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: sort_options_not,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: sort_options,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: sort_options,
+ },
+ ];
+ let required_columns = [Column::new("b", 1), Column::new("a", 0)];
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ ]);
+ let equal_properties =
EquivalenceProperties::new(Arc::new(schema.clone()));
+ let ordering_equal_properties =
+ OrderingEquivalenceProperties::new(Arc::new(schema));
+ assert_eq!(
+ get_indices_of_matching_sort_exprs_with_order_eq(
+ &provided_sorts,
+ &required_columns,
+ &equal_properties,
+ &ordering_equal_properties,
+ ),
+ None
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_normalize_ordering_equivalence_classes() -> Result<()> {
+ let sort_options = SortOptions::default();
+
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ ]);
+ let mut equal_properties =
EquivalenceProperties::new(Arc::new(schema.clone()));
+ let mut ordering_equal_properties =
+ OrderingEquivalenceProperties::new(Arc::new(schema.clone()));
+ let mut expected_oeq =
OrderingEquivalenceProperties::new(Arc::new(schema));
+
+ equal_properties
+ .add_equal_conditions((&Column::new("a", 0), &Column::new("c",
2)));
+ ordering_equal_properties.add_equal_conditions((
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: sort_options,
+ }],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: sort_options,
+ }],
+ ));
+ expected_oeq.add_equal_conditions((
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: sort_options,
+ }],
+ &vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: sort_options,
+ }],
+ ));
+
+ assert!(!normalize_ordering_equivalence_classes(
+ ordering_equal_properties.classes(),
+ &equal_properties,
+ )
+ .iter()
+ .zip(expected_oeq.classes())
+ .any(|(a, b)| a.head().ne(b.head()) || a.others().ne(b.others())));
+
+ Ok(())
+ }
}
diff --git a/datafusion/sqllogictest/test_files/order.slt
b/datafusion/sqllogictest/test_files/order.slt
index 47c7acde32..dffc06eaf4 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -410,3 +410,38 @@ SELECT DISTINCT time as "first_seen" FROM t ORDER BY 1;
## Cleanup
statement ok
drop table t;
+
+# Create a table having 3 columns which are ordering equivalent by the source.
In the next step,
+# we will expect to observe the removed SortExec by propagating the orders
across projection.
+statement ok
+CREATE EXTERNAL TABLE multiple_ordered_table (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC)
+WITH ORDER (b ASC)
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv';
+
+query TT
+EXPLAIN SELECT (b+a+c) AS result
+FROM multiple_ordered_table
+ORDER BY result;
+----
+logical_plan
+Sort: result ASC NULLS LAST
+--Projection: multiple_ordered_table.b + multiple_ordered_table.a +
multiple_ordered_table.c AS result
+----TableScan: multiple_ordered_table projection=[a, b, c]
+physical_plan
+SortPreservingMergeExec: [result@0 ASC NULLS LAST]
+--ProjectionExec: expr=[b@1 + a@0 + c@2 as result]
+----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table multiple_ordered_table;
\ No newline at end of file