This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1cc01a4131 Ordering Equivalence Builder (#6452)
1cc01a4131 is described below
commit 1cc01a4131e587f00da1783e7ad2e5dc699c1847
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat May 27 16:49:02 2023 +0300
Ordering Equivalence Builder (#6452)
* Convert ordering equivalence to vec (unit test)
* compiles
* Simplifications
* simplifications
* Remove unnecessary codes
* simplifications
* Add test cases
* fix bug
* simplifications
* Resolve linter errors
* remove unnecessary codes
* simplifications
* simplifications
* Remove unnecessary codes
* Add pruning to ordering_equivalence projection
* Remove unnecessary clones
* Convert get range to calculate compatible ranges
* Simplifications
* Update comments
* Update comments
* Use builder style for ordering equivalence creation
* Minor comment changes
* Address reviews
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/core/src/physical_plan/windows/mod.rs | 58 ++++++------------
datafusion/physical-expr/src/equivalence.rs | 78 +++++++++++++++++++++++-
2 files changed, 94 insertions(+), 42 deletions(-)
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index d4732cc1f6..d7eedf7f18 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -47,14 +47,14 @@ mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
use datafusion_common::utils::longest_consecutive_prefix;
+use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{convert_to_expr,
get_indices_of_matching_exprs};
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
use datafusion_physical_expr::{
- normalize_expr_with_equivalence_properties, OrderedColumn,
- OrderingEquivalenceProperties, PhysicalSortRequirement,
+ OrderedColumn, OrderingEquivalenceProperties, PhysicalSortRequirement,
};
pub use window_agg_exec::WindowAggExec;
@@ -254,30 +254,10 @@ pub(crate) fn window_ordering_equivalence(
) -> OrderingEquivalenceProperties {
// We need to update the schema, so we can not directly use
// `input.ordering_equivalence_properties()`.
- let mut result = OrderingEquivalenceProperties::new(schema.clone());
- result.extend(
- input
- .ordering_equivalence_properties()
- .classes()
- .iter()
- .cloned(),
- );
- let mut normalized_out_ordering = vec![];
- for item in input.output_ordering().unwrap_or(&[]) {
- // To account for ordering equivalences, first normalize the
expression:
- let normalized = normalize_expr_with_equivalence_properties(
- item.expr.clone(),
- input.equivalence_properties().classes(),
- );
- // Currently we only support, ordering equivalences for `Column`
expressions.
- // TODO: Add support for ordering equivalence for all `PhysicalExpr`s
- if let Some(column) = normalized.as_any().downcast_ref::<Column>() {
- normalized_out_ordering
- .push(OrderedColumn::new(column.clone(), item.options));
- } else {
- break;
- }
- }
+ let mut builder = OrderingEquivalenceBuilder::new(schema.clone())
+ .with_equivalences(input.equivalence_properties())
+ .with_existing_ordering(input.output_ordering().map(|elem|
elem.to_vec()))
+ .extend(input.ordering_equivalence_properties());
for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
@@ -289,25 +269,21 @@ pub(crate) fn window_ordering_equivalence(
.as_any()
.is::<RowNumber>()
{
- // If there is an existing ordering, add new ordering as an
equivalence:
- if !normalized_out_ordering.is_empty() {
- if let Some((idx, field)) =
- schema.column_with_name(expr.field().unwrap().name())
- {
- let column = Column::new(field.name(), idx);
- let options = SortOptions {
- descending: false,
- nulls_first: false,
- }; // ASC, NULLS LAST
- let rhs = OrderedColumn::new(column, options);
- result
- .add_equal_conditions((&normalized_out_ordering,
&vec![rhs]));
- }
+ if let Some((idx, field)) =
+ schema.column_with_name(expr.field().unwrap().name())
+ {
+ let column = Column::new(field.name(), idx);
+ let options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ }; // ASC, NULLS LAST
+ let rhs = OrderedColumn::new(column, options);
+ builder.add_equal_conditions(vec![rhs]);
}
}
}
}
- result
+ builder.build()
}
#[cfg(test)]
mod tests {
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index c3a55437dd..6af72efc5a 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -16,7 +16,9 @@
// under the License.
use crate::expressions::Column;
-use crate::{PhysicalSortExpr, PhysicalSortRequirement};
+use crate::{
+ normalize_expr_with_equivalence_properties, PhysicalSortExpr,
PhysicalSortRequirement,
+};
use arrow::datatypes::SchemaRef;
use arrow_schema::SortOptions;
@@ -269,6 +271,80 @@ impl OrderingEquivalentClass {
}
}
+/// This is a builder object facilitating incremental construction
+/// for ordering equivalences.
+pub struct OrderingEquivalenceBuilder {
+ eq_properties: EquivalenceProperties,
+ ordering_eq_properties: OrderingEquivalenceProperties,
+ existing_ordering: Vec<PhysicalSortExpr>,
+}
+
+impl OrderingEquivalenceBuilder {
+ pub fn new(schema: SchemaRef) -> Self {
+ let eq_properties = EquivalenceProperties::new(schema.clone());
+ let ordering_eq_properties =
OrderingEquivalenceProperties::new(schema);
+ Self {
+ eq_properties,
+ ordering_eq_properties,
+ existing_ordering: vec![],
+ }
+ }
+
+ pub fn extend(
+ mut self,
+ new_ordering_eq_properties: OrderingEquivalenceProperties,
+ ) -> Self {
+ self.ordering_eq_properties
+ .extend(new_ordering_eq_properties.classes().iter().cloned());
+ self
+ }
+
+ pub fn with_existing_ordering(
+ mut self,
+ existing_ordering: Option<Vec<PhysicalSortExpr>>,
+ ) -> Self {
+ if let Some(existing_ordering) = existing_ordering {
+ self.existing_ordering = existing_ordering;
+ }
+ self
+ }
+
+ pub fn with_equivalences(mut self, new_eq_properties:
EquivalenceProperties) -> Self {
+ self.eq_properties = new_eq_properties;
+ self
+ }
+
+ pub fn add_equal_conditions(&mut self, new_equivalent_ordering:
Vec<OrderedColumn>) {
+ let mut normalized_out_ordering = vec![];
+ for item in &self.existing_ordering {
+ // To account for ordering equivalences, first normalize the
expression:
+ let normalized = normalize_expr_with_equivalence_properties(
+ item.expr.clone(),
+ self.eq_properties.classes(),
+ );
+ // Currently we only support ordering equivalences for `Column`
expressions.
+ // TODO: Add support for ordering equivalence for all
`PhysicalExpr`s.
+ if let Some(column) = normalized.as_any().downcast_ref::<Column>()
{
+ normalized_out_ordering
+ .push(OrderedColumn::new(column.clone(), item.options));
+ } else {
+ break;
+ }
+ }
+ // If there is an existing ordering, add new ordering as an
equivalence:
+ if !normalized_out_ordering.is_empty() {
+ self.ordering_eq_properties.add_equal_conditions((
+ &normalized_out_ordering,
+ &new_equivalent_ordering,
+ ));
+ }
+ }
+
+ pub fn build(self) -> OrderingEquivalenceProperties {
+ self.ordering_eq_properties
+ }
+}
+
/// This function applies the given projection to the given equivalence
/// properties to compute the resulting (projected) equivalence properties;
e.g.
/// 1) Adding an alias, which can introduce additional equivalence properties,