This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 93a4941b05 Add Support for Ordering Equivalence (#6160)
93a4941b05 is described below
commit 93a4941b05c2b468f886dc50e799b2d6a80a46c6
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed May 3 13:38:24 2023 +0300
Add Support for Ordering Equivalence (#6160)
* Initial commit
* All functionality
* minor changes
* simplifications
* simplifications
* simplifications
* tmp
* simplifications
* simplifications
* fix errors
* simplifications
* simplifications
* simplifications
* resolve merge error
* fix index of matching input column
* simplifications, update comments. move tests to the window.slt
* separate equivalent and ordering equivalent expressions
* Change order of the arguments
* simplifications
* Minor changes, update comments
* Simplifications and improved comments
* Address reviews
* Add unit tests
* simplifications
* Code simplifications, comments improvements
* Update comment, change test to make intent clearer
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../src/physical_optimizer/sort_enforcement.rs | 2 +
.../core/src/physical_optimizer/sort_pushdown.rs | 36 +-
datafusion/core/src/physical_optimizer/utils.rs | 9 +-
datafusion/core/src/physical_plan/mod.rs | 6 +
datafusion/core/src/physical_plan/projection.rs | 34 +-
.../windows/bounded_window_agg_exec.rs | 10 +-
datafusion/core/src/physical_plan/windows/mod.rs | 64 ++-
.../src/physical_plan/windows/window_agg_exec.rs | 9 +-
.../core/tests/sqllogictests/test_files/window.slt | 165 +++++-
datafusion/physical-expr/src/equivalence.rs | 191 +++++--
datafusion/physical-expr/src/lib.rs | 10 +-
datafusion/physical-expr/src/utils.rs | 609 ++++++++++++++++++---
12 files changed, 1004 insertions(+), 141 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 8bb71445a4..1032a16c7f 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -473,6 +473,7 @@ fn ensure_sorting(
physical_ordering,
&required_ordering,
|| child.equivalence_properties(),
+ || child.ordering_equivalence_properties(),
) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child,
sort_onwards, &plan)?;
@@ -539,6 +540,7 @@ fn analyze_immediate_sort_removal(
sort_input.output_ordering(),
sort_exec.output_ordering(),
|| sort_input.equivalence_properties(),
+ || sort_input.ordering_equivalence_properties(),
) {
// Since we know that a `SortExec` has exactly one child,
// we can use the zero index safely:
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index f9ca976299..a0490fa64e 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -125,9 +125,12 @@ pub(crate) fn pushdown_sorts(
let err = || DataFusionError::Plan(ERR_MSG.to_string());
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let mut new_plan = plan.clone();
- if !ordering_satisfy_requirement(plan.output_ordering(),
parent_required, || {
- plan.equivalence_properties()
- }) {
+ if !ordering_satisfy_requirement(
+ plan.output_ordering(),
+ parent_required,
+ || plan.equivalence_properties(),
+ || plan.ordering_equivalence_properties(),
+ ) {
// If the current plan is a SortExec, modify it to satisfy parent
requirements:
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
@@ -155,9 +158,12 @@ pub(crate) fn pushdown_sorts(
}
} else {
// Executors other than SortExec
- if ordering_satisfy_requirement(plan.output_ordering(),
parent_required, || {
- plan.equivalence_properties()
- }) {
+ if ordering_satisfy_requirement(
+ plan.output_ordering(),
+ parent_required,
+ || plan.equivalence_properties(),
+ || plan.ordering_equivalence_properties(),
+ ) {
// Satisfies parent requirements, immediately return.
return Ok(Transformed::Yes(SortPushDown {
required_ordering: None,
@@ -280,14 +286,20 @@ fn determine_children_requirement(
request_child: Option<&[PhysicalSortRequirement]>,
child_plan: Arc<dyn ExecutionPlan>,
) -> RequirementsCompatibility {
- if requirements_compatible(request_child, parent_required, || {
- child_plan.equivalence_properties()
- }) {
+ if requirements_compatible(
+ request_child,
+ parent_required,
+ || child_plan.ordering_equivalence_properties(),
+ || child_plan.equivalence_properties(),
+ ) {
// request child requirements are more specific, no need to push down
the parent requirements
RequirementsCompatibility::Satisfy
- } else if requirements_compatible(parent_required, request_child, || {
- child_plan.equivalence_properties()
- }) {
+ } else if requirements_compatible(
+ parent_required,
+ request_child,
+ || child_plan.ordering_equivalence_properties(),
+ || child_plan.equivalence_properties(),
+ ) {
// parent requirements are more specific, adjust the request child
requirements and push down the new requirements
let adjusted = parent_required.map(|r| r.to_vec());
RequirementsCompatibility::Compatible(adjusted)
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index b323e13679..68efa06c3f 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -41,9 +41,12 @@ pub fn add_sort_above(
sort_expr: Vec<PhysicalSortExpr>,
) -> Result<()> {
// If the ordering requirement is already satisfied, do not add a sort.
- if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
- node.equivalence_properties()
- }) {
+ if !ordering_satisfy(
+ node.output_ordering(),
+ Some(&sort_expr),
+ || node.equivalence_properties(),
+ || node.ordering_equivalence_properties(),
+ ) {
let new_sort = SortExec::new(sort_expr, node.clone());
*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 01ece80aca..00850f3237 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -32,6 +32,7 @@ use arrow::record_batch::RecordBatch;
pub use datafusion_expr::Accumulator;
pub use datafusion_expr::ColumnarValue;
pub use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
+use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
pub use display::DisplayFormatType;
use futures::stream::{Stream, TryStreamExt};
use std::fmt;
@@ -187,6 +188,11 @@ pub trait ExecutionPlan: Debug + Send + Sync {
EquivalenceProperties::new(self.schema())
}
+ /// Get the OrderingEquivalenceProperties within the plan
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ OrderingEquivalenceProperties::new(self.schema())
+ }
+
/// Get a list of child execution plans that provide the input for this
plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary
nodes, or two
/// values for binary nodes (such as joins).
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index 49c429f94d..e70e4faebd 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -27,22 +27,24 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::Result;
+use crate::execution::context::TaskContext;
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
Partitioning, PhysicalExpr,
};
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use futures::stream::{Stream, StreamExt};
use log::debug;
use super::expressions::{Column, PhysicalSortExpr};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
-use crate::execution::context::TaskContext;
-use datafusion_physical_expr::equivalence::project_equivalence_properties;
-use datafusion_physical_expr::normalize_out_expr_with_columns_map;
-use futures::stream::Stream;
-use futures::stream::StreamExt;
+
+use datafusion_physical_expr::{
+ normalize_out_expr_with_columns_map, project_equivalence_properties,
+ project_ordering_equivalence_properties, OrderingEquivalenceProperties,
+};
/// Execution plan for a projection
#[derive(Debug)]
@@ -95,8 +97,18 @@ impl ProjectionExec {
let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
for (expression, name) in expr.iter() {
if let Some(column) = expression.as_any().downcast_ref::<Column>()
{
+ // For some executors, logical and physical plan schema fields
+ // are not the same. The information in a `Column` comes from
+ // the logical plan schema. Therefore, to produce correct
results
+ // we use the field in the input schema with the same index.
This
+ // corresponds to the physical plan `Column`.
+ let idx = column.index();
+ let matching_input_field = input_schema.field(idx);
+ let matching_input_column =
Column::new(matching_input_field.name(), idx);
let new_col_idx = schema.index_of(name)?;
- let entry =
columns_map.entry(column.clone()).or_insert_with(Vec::new);
+ let entry = columns_map
+ .entry(matching_input_column)
+ .or_insert_with(Vec::new);
entry.push(Column::new(name, new_col_idx));
};
}
@@ -204,6 +216,16 @@ impl ExecutionPlan for ProjectionExec {
new_properties
}
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ let mut new_properties =
OrderingEquivalenceProperties::new(self.schema());
+ project_ordering_equivalence_properties(
+ self.input.ordering_equivalence_properties(),
+ &self.columns_map,
+ &mut new_properties,
+ );
+ new_properties
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index ac84a6a395..95b482ef24 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::windows::{
- calc_requirements, get_ordered_partition_by_indices,
+ calc_requirements, get_ordered_partition_by_indices,
window_ordering_equivalence,
};
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning,
@@ -66,7 +66,8 @@ use datafusion_physical_expr::window::{
WindowAggState, WindowState,
};
use datafusion_physical_expr::{
- EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement,
+ EquivalenceProperties, OrderingEquivalenceProperties, PhysicalExpr,
+ PhysicalSortRequirement,
};
#[derive(Debug, Clone, PartialEq)]
@@ -259,6 +260,11 @@ impl ExecutionPlan for BoundedWindowAggExec {
self.input().equivalence_properties()
}
+ /// Get the OrderingEquivalenceProperties within the plan
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ window_ordering_equivalence(&self.schema, &self.input,
&self.window_expr)
+ }
+
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index 65d7729e18..40ef8a3e6a 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -29,6 +29,7 @@ use crate::physical_plan::{
};
use crate::scalar::ScalarValue;
use arrow::datatypes::Schema;
+use arrow_schema::{SchemaRef, SortOptions};
use datafusion_expr::{
window_function::{signature_for_built_in, BuiltInWindowFunction,
WindowFunction},
WindowFrame,
@@ -46,11 +47,15 @@ mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
use datafusion_common::utils::longest_consecutive_prefix;
+use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{convert_to_expr,
get_indices_of_matching_exprs};
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
-use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_expr::{
+ normalize_expr_with_equivalence_properties, OrderedColumn,
+ OrderingEquivalenceProperties, PhysicalSortRequirement,
+};
pub use window_agg_exec::WindowAggExec;
/// Create a physical expression for window function
@@ -242,6 +247,63 @@ pub(crate) fn get_ordered_partition_by_indices(
input_places[0..first_n].to_vec()
}
+pub(crate) fn window_ordering_equivalence(
+ schema: &SchemaRef,
+ input: &Arc<dyn ExecutionPlan>,
+ window_expr: &[Arc<dyn WindowExpr>],
+) -> OrderingEquivalenceProperties {
+ // We need to update the schema, so we can not directly use
+ // `input.ordering_equivalence_properties()`.
+ let mut result = OrderingEquivalenceProperties::new(schema.clone());
+ result.extend(
+ input
+ .ordering_equivalence_properties()
+ .classes()
+ .iter()
+ .cloned(),
+ );
+ let out_ordering = input.output_ordering().unwrap_or(&[]);
+ for expr in window_expr {
+ if let Some(builtin_window_expr) =
+ expr.as_any().downcast_ref::<BuiltInWindowExpr>()
+ {
+ // Only the built-in `RowNumber` window function introduces a new
+ // ordering:
+ if builtin_window_expr
+ .get_built_in_func_expr()
+ .as_any()
+ .is::<RowNumber>()
+ {
+ // If there is an existing ordering, add new ordering as an
equivalence:
+ if let Some(first) = out_ordering.first() {
+ // Normalize expression, as we search for ordering
equivalences
+ // on normalized versions:
+ let normalized =
normalize_expr_with_equivalence_properties(
+ first.expr.clone(),
+ input.equivalence_properties().classes(),
+ );
+ if let Some(column) =
normalized.as_any().downcast_ref::<Column>() {
+ let column_info =
+
schema.column_with_name(expr.field().unwrap().name());
+ if let Some((idx, field)) = column_info {
+ let lhs = OrderedColumn::new(column.clone(),
first.options);
+ let options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ }; // ASC, NULLS LAST
+ let rhs = OrderedColumn::new(
+ Column::new(field.name(), idx),
+ options,
+ );
+ result.add_equal_conditions((&lhs, &rhs));
+ }
+ }
+ }
+ }
+ }
+ }
+ result
+}
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 7ca95954ce..dc0302d77b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -25,7 +25,7 @@ use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::windows::{
- calc_requirements, get_ordered_partition_by_indices,
+ calc_requirements, get_ordered_partition_by_indices,
window_ordering_equivalence,
};
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
@@ -42,7 +42,7 @@ use arrow::{
};
use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
use datafusion_common::DataFusionError;
-use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_expr::{OrderingEquivalenceProperties,
PhysicalSortRequirement};
use futures::stream::Stream;
use futures::{ready, StreamExt};
use std::any::Any;
@@ -191,6 +191,11 @@ impl ExecutionPlan for WindowAggExec {
self.input().equivalence_properties()
}
+ /// Get the OrderingEquivalenceProperties within the plan
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ window_ordering_equivalence(&self.schema, &self.input,
&self.window_expr)
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 496d2b50db..5ba3644923 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2050,8 +2050,8 @@ SELECT
15673 -1899175111
# test_window_agg_partition_by_set
-# These tests checks for whether BoundedWindowAggExec and WindowAggExec treats
partition by expressions as set.
-# Physical plan shouldn't have any SortExec in between Window Executors.
+# These tests check whether BoundedWindowAggExec and WindowAggExec treat
PARTITION BY expressions as a set.
+# Physical plan shouldn't have any SortExec in between window executors.
statement ok
set datafusion.execution.target_partitions = 1;
@@ -2204,3 +2204,164 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1
PRECEDING AND 1 FOLLOWING)
2.994840293343 NULL
9.674390599321 NULL
7.728066219895 NULL
+
+# test_c9_rn_ordering_alias
+# These tests check whether Datafusion is aware of the ordering generated by
the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since
the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 ASC)
+ ORDER BY rn1
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: rn1 ASC NULLS LAST, fetch=5
+ Sort: aggregate_test_100.c9 ASC NULLS LAST
+ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS rn1
+ WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 as rn1]
+ BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()",
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+ SortExec: expr=[c9@0 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 ASC)
+ ORDER BY rn1
+ LIMIT 5
+----
+28774375 1
+63044568 2
+141047417 3
+141680161 4
+145294611 5
+
+# test_c9_rn_ordering_alias_opposite_direction
+# These tests check whether Datafusion is aware of the ordering generated by
the ROW_NUMBER() window function.
+# Physical plan shouldn't have a SortExec after the BoundedWindowAggExec since
the table after BoundedWindowAggExec is already ordered by rn1 ASC and c9 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: rn1 ASC NULLS LAST, fetch=5
+ Sort: aggregate_test_100.c9 DESC NULLS FIRST
+ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS rn1
+ WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 as rn1]
+ BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()",
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+ SortExec: expr=[c9@0 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1
+ LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
+
+# test_c9_rn_ordering_alias_opposite_direction2
+# These tests check whether Datafusion is aware of the ordering generated by
the ROW_NUMBER() window function.
+# Physical plan _should_ have a SortExec after BoundedWindowAggExec since the
table after BoundedWindowAggExec is ordered by rn1 ASC and c9 DESC, which is
conflicting with the requirement rn1 DESC.
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1 DESC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: rn1 DESC NULLS FIRST, fetch=5
+ Sort: aggregate_test_100.c9 DESC NULLS FIRST
+ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS rn1
+ WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[rn1@1 DESC]
+ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 as rn1]
+ BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+ SortExec: expr=[c9@0 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1 DESC
+ LIMIT 5
+----
+28774375 100
+63044568 99
+141047417 98
+141680161 97
+145294611 96
+
+# test_c9_rn_ordering_alias_opposite_direction3
+# These test check for whether datafusion is aware of the ordering of the
column generated by ROW_NUMBER() window function.
+# Physical plan should have a SortExec after BoundedWindowAggExec.
+# The reason is that ordering of the table after BoundedWindowAggExec can be
described as rn1 ASC, and also c9 DESC.
+# However, the requirement is rn1 ASC, c9 ASC (lexicographical order). Hence
existing ordering cannot satisfy requirement
+# (Requirement is finer than existing ordering)
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1, c9 ASC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+ Sort: aggregate_test_100.c9 DESC NULLS FIRST
+ Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS rn1
+ WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+ TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[rn1@1 ASC NULLS LAST,c9@0 ASC NULLS LAST]
+ ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 as rn1]
+ BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+ SortExec: expr=[c9@0 DESC]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+ ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY rn1 ASC, c9 DESC
+ LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index 4651c4054b..d46f56e46a 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -18,18 +18,19 @@
use crate::expressions::Column;
use arrow::datatypes::SchemaRef;
+use arrow_schema::SortOptions;
-use std::collections::HashMap;
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
+use std::hash::Hash;
/// Equivalence Properties is a vec of EquivalentClass.
#[derive(Debug, Clone)]
-pub struct EquivalenceProperties {
- classes: Vec<EquivalentClass>,
+pub struct EquivalenceProperties<T = Column> {
+ classes: Vec<EquivalentClass<T>>,
schema: SchemaRef,
}
-impl EquivalenceProperties {
+impl<T: Eq + Hash + Clone> EquivalenceProperties<T> {
pub fn new(schema: SchemaRef) -> Self {
EquivalenceProperties {
classes: vec![],
@@ -37,7 +38,7 @@ impl EquivalenceProperties {
}
}
- pub fn classes(&self) -> &[EquivalentClass] {
+ pub fn classes(&self) -> &[EquivalentClass<T>] {
&self.classes
}
@@ -45,26 +46,15 @@ impl EquivalenceProperties {
self.schema.clone()
}
- pub fn extend<I: IntoIterator<Item = EquivalentClass>>(&mut self, iter: I)
{
+ pub fn extend<I: IntoIterator<Item = EquivalentClass<T>>>(&mut self, iter:
I) {
for ec in iter {
- for column in ec.iter() {
- assert_eq!(column.name(),
self.schema.fields()[column.index()].name());
- }
self.classes.push(ec)
}
}
/// Add new equal conditions into the EquivalenceProperties, the new equal
conditions are usually comming from the
/// equality predicates in Join or Filter
- pub fn add_equal_conditions(&mut self, new_conditions: (&Column, &Column))
{
- assert_eq!(
- new_conditions.0.name(),
- self.schema.fields()[new_conditions.0.index()].name()
- );
- assert_eq!(
- new_conditions.1.name(),
- self.schema.fields()[new_conditions.1.index()].name()
- );
+ pub fn add_equal_conditions(&mut self, new_conditions: (&T, &T)) {
let mut idx1: Option<usize> = None;
let mut idx2: Option<usize> = None;
for (idx, class) in self.classes.iter_mut().enumerate() {
@@ -102,7 +92,7 @@ impl EquivalenceProperties {
}
(None, None) => {
// adding new pairs
- self.classes.push(EquivalentClass::new(
+ self.classes.push(EquivalentClass::<T>::new(
new_conditions.0.clone(),
vec![new_conditions.1.clone()],
));
@@ -112,41 +102,61 @@ impl EquivalenceProperties {
}
}
-/// Equivalent Class is a set of Columns that are known to have the same value
in all tuples in a relation
-/// Equivalent Class is generated by equality predicates, typically equijoin
conditions and equality conditions in filters.
+/// `OrderingEquivalenceProperties` keeps track of columns that describe the
+/// global ordering of the schema. These columns are not necessarily same; e.g.
+/// ```text
+/// ┌-------┐
+/// | a | b |
+/// |---|---|
+/// | 1 | 9 |
+/// | 2 | 8 |
+/// | 3 | 7 |
+/// | 5 | 5 |
+/// └---┴---┘
+/// ```
+/// where both `a ASC` and `b DESC` can describe the table ordering. With
+/// `OrderingEquivalenceProperties`, we can keep track of these equivalences
+/// and treat `a ASC` and `b DESC` as the same ordering requirement.
+pub type OrderingEquivalenceProperties = EquivalenceProperties<OrderedColumn>;
+
+/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are
known
+/// to have the same value in all tuples in a relation.
`EquivalentClass<Column>`
+/// is generated by equality predicates, typically equijoin conditions and
equality
+/// conditions in filters. `EquivalentClass<OrderedColumn>` is generated by the
+/// `ROW_NUMBER` window function.
#[derive(Debug, Clone)]
-pub struct EquivalentClass {
+pub struct EquivalentClass<T = Column> {
/// First element in the EquivalentClass
- head: Column,
+ head: T,
/// Other equal columns
- others: HashSet<Column>,
+ others: HashSet<T>,
}
-impl EquivalentClass {
- pub fn new(head: Column, others: Vec<Column>) -> Self {
+impl<T: Eq + Hash + Clone> EquivalentClass<T> {
+ pub fn new(head: T, others: Vec<T>) -> EquivalentClass<T> {
EquivalentClass {
head,
others: HashSet::from_iter(others),
}
}
- pub fn head(&self) -> &Column {
+ pub fn head(&self) -> &T {
&self.head
}
- pub fn others(&self) -> &HashSet<Column> {
+ pub fn others(&self) -> &HashSet<T> {
&self.others
}
- pub fn contains(&self, col: &Column) -> bool {
+ pub fn contains(&self, col: &T) -> bool {
self.head == *col || self.others.contains(col)
}
- pub fn insert(&mut self, col: Column) -> bool {
+ pub fn insert(&mut self, col: T) -> bool {
self.others.insert(col)
}
- pub fn remove(&mut self, col: &Column) -> bool {
+ pub fn remove(&mut self, col: &T) -> bool {
let removed = self.others.remove(col);
if !removed && *col == self.head {
let one_col = self.others.iter().next().cloned();
@@ -162,7 +172,7 @@ impl EquivalentClass {
}
}
- pub fn iter(&self) -> impl Iterator<Item = &'_ Column> {
+ pub fn iter(&self) -> impl Iterator<Item = &'_ T> {
std::iter::once(&self.head).chain(self.others.iter())
}
@@ -175,10 +185,58 @@ impl EquivalentClass {
}
}
-/// Project Equivalence Properties.
-/// 1) Add Alias, Alias can introduce additional equivalence properties,
-/// For example: Projection(a, a as a1, a as a2)
-/// 2) Truncate the EquivalentClasses that are not in the output schema
+/// This object represents a [`Column`] with a definite ordering.
+#[derive(Debug, Hash, PartialEq, Eq, Clone)]
+pub struct OrderedColumn {
+ pub col: Column,
+ pub options: SortOptions,
+}
+
+impl OrderedColumn {
+ pub fn new(col: Column, options: SortOptions) -> Self {
+ Self { col, options }
+ }
+}
+
+trait ColumnAccessor {
+ fn column(&self) -> &Column;
+}
+
+impl ColumnAccessor for Column {
+ fn column(&self) -> &Column {
+ self
+ }
+}
+
+impl ColumnAccessor for OrderedColumn {
+ fn column(&self) -> &Column {
+ &self.col
+ }
+}
+
+pub type OrderingEquivalentClass = EquivalentClass<OrderedColumn>;
+
+impl OrderingEquivalentClass {
+ /// Finds the matching column inside the `OrderingEquivalentClass`.
+ fn get_matching_column(&self, column: &Column) -> Option<OrderedColumn> {
+ if self.head.col.eq(column) {
+ Some(self.head.clone())
+ } else {
+ for item in &self.others {
+ if item.col.eq(column) {
+ return Some(item.clone());
+ }
+ }
+ None
+ }
+ }
+}
+
+/// This function applies the given projection to the given equivalence
+/// properties to compute the resulting (projected) equivalence properties;
e.g.
+/// 1) Adding an alias, which can introduce additional equivalence properties,
+/// as in Projection(a, a as a1, a as a2).
+/// 2) Truncate the [`EquivalentClass`]es that are not in the output schema.
pub fn project_equivalence_properties(
input_eq: EquivalenceProperties,
alias_map: &HashMap<Column, Vec<Column>>,
@@ -201,22 +259,63 @@ pub fn project_equivalence_properties(
}
}
- let schema = output_eq.schema();
- for class in ec_classes.iter_mut() {
- let mut columns_to_remove = vec![];
- for column in class.iter() {
- if column.index() >= schema.fields().len()
- || schema.fields()[column.index()].name() != column.name()
+ prune_columns_to_remove(output_eq, &mut ec_classes);
+ output_eq.extend(ec_classes);
+}
+
+/// This function applies the given projection to the given ordering
+/// equivalence properties to compute the resulting (projected) ordering
+/// equivalence properties; e.g.
+/// 1) Adding an alias, which can introduce additional ordering equivalence
+/// properties, as in Projection(a, a as a1, a as a2) extends global
ordering
+/// of a to a1 and a2.
+/// 2) Truncate the [`OrderingEquivalentClass`]es that are not in the output
schema.
+pub fn project_ordering_equivalence_properties(
+ input_eq: OrderingEquivalenceProperties,
+ columns_map: &HashMap<Column, Vec<Column>>,
+ output_eq: &mut OrderingEquivalenceProperties,
+) {
+ let mut ec_classes = input_eq.classes().to_vec();
+ for (column, columns) in columns_map {
+ for class in ec_classes.iter_mut() {
+ if let Some(OrderedColumn { options, .. }) =
class.get_matching_column(column)
{
- columns_to_remove.push(column.clone());
+ for col in columns {
+ class.insert(OrderedColumn {
+ col: col.clone(),
+ options,
+ });
+ }
+ break;
}
}
+ }
+
+ prune_columns_to_remove(output_eq, &mut ec_classes);
+ output_eq.extend(ec_classes);
+}
+
+fn prune_columns_to_remove<T: Eq + Hash + Clone + ColumnAccessor>(
+ eq_properties: &EquivalenceProperties<T>,
+ eq_classes: &mut Vec<EquivalentClass<T>>,
+) {
+ let schema = eq_properties.schema();
+ let fields = schema.fields();
+ for class in eq_classes.iter_mut() {
+ let columns_to_remove = class
+ .iter()
+ .filter(|elem| {
+ let column = elem.column();
+ let idx = column.index();
+ idx >= fields.len() || fields[idx].name() != column.name()
+ })
+ .cloned()
+ .collect::<Vec<_>>();
for column in columns_to_remove {
class.remove(&column);
}
}
- ec_classes.retain(|props| props.len() > 1);
- output_eq.extend(ec_classes);
+ eq_classes.retain(|props| props.len() > 1);
}
#[cfg(test)]
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index 5cbd40cc8c..b65a4e892f 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -48,8 +48,11 @@ pub mod window;
// reexport this to maintain compatibility with anything that used from_slice
previously
pub use aggregate::AggregateExpr;
pub use datafusion_common::from_slice;
-pub use equivalence::EquivalenceProperties;
-pub use equivalence::EquivalentClass;
+pub use equivalence::{
+ project_equivalence_properties, project_ordering_equivalence_properties,
+ EquivalenceProperties, EquivalentClass, OrderedColumn,
OrderingEquivalenceProperties,
+ OrderingEquivalentClass,
+};
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr,
PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
@@ -57,6 +60,5 @@ pub use sort_expr::{PhysicalSortExpr,
PhysicalSortRequirement};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_expr_with_equivalence_properties,
normalize_out_expr_with_columns_map,
- normalize_sort_expr_with_equivalence_properties,
sort_expr_list_eq_strict_order,
- split_conjunction,
+ sort_expr_list_eq_strict_order, split_conjunction,
};
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index 70297bce78..c842f8521d 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -15,18 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-use crate::equivalence::EquivalentClass;
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::{
- EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
+use crate::equivalence::{
+ EquivalenceProperties, EquivalentClass, OrderedColumn,
OrderingEquivalenceProperties,
+ OrderingEquivalentClass,
};
-use arrow::datatypes::SchemaRef;
-use datafusion_common::Result;
-use datafusion_expr::Operator;
+use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
+use crate::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
+use arrow::datatypes::SchemaRef;
+use arrow_schema::SortOptions;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
};
+use datafusion_common::Result;
+use datafusion_expr::Operator;
+
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;
use std::borrow::Borrow;
@@ -164,7 +167,7 @@ pub fn normalize_expr_with_equivalence_properties(
.unwrap_or(expr)
}
-pub fn normalize_sort_expr_with_equivalence_properties(
+fn normalize_sort_expr_with_equivalence_properties(
sort_expr: PhysicalSortExpr,
eq_properties: &[EquivalentClass],
) -> PhysicalSortExpr {
@@ -181,7 +184,7 @@ pub fn normalize_sort_expr_with_equivalence_properties(
}
}
-pub fn normalize_sort_requirement_with_equivalence_properties(
+fn normalize_sort_requirement_with_equivalence_properties(
sort_requirement: PhysicalSortRequirement,
eq_properties: &[EquivalentClass],
) -> PhysicalSortRequirement {
@@ -196,27 +199,137 @@ pub fn
normalize_sort_requirement_with_equivalence_properties(
}
}
+fn normalize_expr_with_ordering_equivalence_properties(
+ expr: Arc<dyn PhysicalExpr>,
+ sort_options: Option<SortOptions>,
+ eq_properties: &[OrderingEquivalentClass],
+) -> Arc<dyn PhysicalExpr> {
+ expr.clone()
+ .transform(&|expr| {
+ let normalized_form =
+ expr.as_any().downcast_ref::<Column>().and_then(|column| {
+ if let Some(options) = sort_options {
+ for class in eq_properties {
+ let ordered_column = OrderedColumn {
+ col: column.clone(),
+ options,
+ };
+ if class.contains(&ordered_column) {
+ return Some(class.head().clone());
+ }
+ }
+ }
+ None
+ });
+ Ok(if let Some(normalized_form) = normalized_form {
+ Transformed::Yes(Arc::new(normalized_form.col) as _)
+ } else {
+ Transformed::No(expr)
+ })
+ })
+ .unwrap_or(expr)
+}
+
+fn normalize_sort_expr_with_ordering_equivalence_properties(
+ sort_expr: PhysicalSortExpr,
+ eq_properties: &[OrderingEquivalentClass],
+) -> PhysicalSortExpr {
+ let requirement =
normalize_sort_requirement_with_ordering_equivalence_properties(
+ PhysicalSortRequirement::from(sort_expr),
+ eq_properties,
+ );
+ requirement.into_sort_expr()
+}
+
+fn normalize_sort_requirement_with_ordering_equivalence_properties(
+ sort_requirement: PhysicalSortRequirement,
+ eq_properties: &[OrderingEquivalentClass],
+) -> PhysicalSortRequirement {
+ let normalized_expr = normalize_expr_with_ordering_equivalence_properties(
+ sort_requirement.expr().clone(),
+ sort_requirement.options(),
+ eq_properties,
+ );
+ if sort_requirement.expr().eq(&normalized_expr) {
+ sort_requirement
+ } else {
+ let mut options = sort_requirement.options();
+ if let Some(col) = normalized_expr.as_any().downcast_ref::<Column>() {
+ for eq_class in eq_properties.iter() {
+ let head = eq_class.head();
+ if head.col.eq(col) {
+ // If there is a requirement, update it with the
requirement of its normalized version.
+ if let Some(options) = &mut options {
+ *options = head.options;
+ }
+ break;
+ }
+ }
+ }
+ PhysicalSortRequirement::new(normalized_expr, options)
+ }
+}
+
+pub fn normalize_sort_expr(
+ sort_expr: PhysicalSortExpr,
+ eq_properties: &[EquivalentClass],
+ ordering_eq_properties: &[OrderingEquivalentClass],
+) -> PhysicalSortExpr {
+ let normalized =
+ normalize_sort_expr_with_equivalence_properties(sort_expr,
eq_properties);
+ normalize_sort_expr_with_ordering_equivalence_properties(
+ normalized,
+ ordering_eq_properties,
+ )
+}
+
+pub fn normalize_sort_requirement(
+ sort_requirement: PhysicalSortRequirement,
+ eq_classes: &[EquivalentClass],
+ ordering_eq_classes: &[OrderingEquivalentClass],
+) -> PhysicalSortRequirement {
+ let normalized = normalize_sort_requirement_with_equivalence_properties(
+ sort_requirement,
+ eq_classes,
+ );
+ normalize_sort_requirement_with_ordering_equivalence_properties(
+ normalized,
+ ordering_eq_classes,
+ )
+}
+
/// Checks whether given ordering requirements are satisfied by provided
[PhysicalSortExpr]s.
-pub fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
+pub fn ordering_satisfy<
+ F: FnOnce() -> EquivalenceProperties,
+ F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
provided: Option<&[PhysicalSortExpr]>,
required: Option<&[PhysicalSortExpr]>,
equal_properties: F,
+ ordering_equal_properties: F2,
) -> bool {
match (provided, required) {
(_, None) => true,
(None, Some(_)) => false,
- (Some(provided), Some(required)) => {
- ordering_satisfy_concrete(provided, required, equal_properties)
- }
+ (Some(provided), Some(required)) => ordering_satisfy_concrete(
+ provided,
+ required,
+ equal_properties,
+ ordering_equal_properties,
+ ),
}
}
/// Checks whether the required [`PhysicalSortExpr`]s are satisfied by the
/// provided [`PhysicalSortExpr`]s.
-fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
+fn ordering_satisfy_concrete<
+ F: FnOnce() -> EquivalenceProperties,
+ F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
provided: &[PhysicalSortExpr],
required: &[PhysicalSortExpr],
equal_properties: F,
+ ordering_equal_properties: F2,
) -> bool {
if required.len() > provided.len() {
false
@@ -226,43 +339,56 @@ fn ordering_satisfy_concrete<F: FnOnce() ->
EquivalenceProperties>(
.all(|(req, given)| req.eq(given))
{
true
- } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+ } else {
+ let oeq_properties = ordering_equal_properties();
+ let ordering_eq_classes = oeq_properties.classes();
+ let eq_properties = equal_properties();
+ let eq_classes = eq_properties.classes();
required
.iter()
- .map(|e| {
- normalize_sort_expr_with_equivalence_properties(e.clone(),
eq_classes)
- })
- .zip(provided.iter().map(|e| {
- normalize_sort_expr_with_equivalence_properties(e.clone(),
eq_classes)
- }))
- .all(|(req, given)| req.eq(&given))
- } else {
- false
+ .map(|e| normalize_sort_expr(e.clone(), eq_classes,
ordering_eq_classes))
+ .zip(
+ provided.iter().map(|e| {
+ normalize_sort_expr(e.clone(), eq_classes,
ordering_eq_classes)
+ }),
+ )
+ .all(|(req, given)| req == given)
}
}
/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
/// provided [`PhysicalSortExpr`]s.
-pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+pub fn ordering_satisfy_requirement<
+ F: FnOnce() -> EquivalenceProperties,
+ F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
provided: Option<&[PhysicalSortExpr]>,
required: Option<&[PhysicalSortRequirement]>,
equal_properties: F,
+ ordering_equal_properties: F2,
) -> bool {
match (provided, required) {
(_, None) => true,
(None, Some(_)) => false,
- (Some(provided), Some(required)) => {
- ordering_satisfy_requirement_concrete(provided, required,
equal_properties)
- }
+ (Some(provided), Some(required)) =>
ordering_satisfy_requirement_concrete(
+ provided,
+ required,
+ equal_properties,
+ ordering_equal_properties,
+ ),
}
}
/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
/// provided [`PhysicalSortExpr`]s.
-pub fn ordering_satisfy_requirement_concrete<F: FnOnce() ->
EquivalenceProperties>(
+pub fn ordering_satisfy_requirement_concrete<
+ F: FnOnce() -> EquivalenceProperties,
+ F2: FnOnce() -> OrderingEquivalenceProperties,
+>(
provided: &[PhysicalSortExpr],
required: &[PhysicalSortRequirement],
equal_properties: F,
+ ordering_equal_properties: F2,
) -> bool {
if required.len() > provided.len() {
false
@@ -272,46 +398,58 @@ pub fn ordering_satisfy_requirement_concrete<F: FnOnce()
-> EquivalencePropertie
.all(|(req, given)| given.satisfy(req))
{
true
- } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+ } else {
+ let oeq_properties = ordering_equal_properties();
+ let ordering_eq_classes = oeq_properties.classes();
+ let eq_properties = equal_properties();
+ let eq_classes = eq_properties.classes();
required
.iter()
.map(|e| {
- normalize_sort_requirement_with_equivalence_properties(
- e.clone(),
- eq_classes,
- )
+ normalize_sort_requirement(e.clone(), eq_classes,
ordering_eq_classes)
})
- .zip(provided.iter().map(|e| {
- normalize_sort_expr_with_equivalence_properties(e.clone(),
eq_classes)
- }))
+ .zip(
+ provided.iter().map(|e| {
+ normalize_sort_expr(e.clone(), eq_classes,
ordering_eq_classes)
+ }),
+ )
.all(|(req, given)| given.satisfy(&req))
- } else {
- false
}
}
/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
/// specific than the provided [`PhysicalSortRequirement`]s.
-pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>(
+pub fn requirements_compatible<
+ F: FnOnce() -> OrderingEquivalenceProperties,
+ F2: FnOnce() -> EquivalenceProperties,
+>(
provided: Option<&[PhysicalSortRequirement]>,
required: Option<&[PhysicalSortRequirement]>,
- equal_properties: F,
+ ordering_equal_properties: F,
+ equal_properties: F2,
) -> bool {
match (provided, required) {
(_, None) => true,
(None, Some(_)) => false,
- (Some(provided), Some(required)) => {
- requirements_compatible_concrete(provided, required,
equal_properties)
- }
+ (Some(provided), Some(required)) => requirements_compatible_concrete(
+ provided,
+ required,
+ ordering_equal_properties,
+ equal_properties,
+ ),
}
}
/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more
/// specific than the provided [`PhysicalSortRequirement`]s.
-fn requirements_compatible_concrete<F: FnOnce() -> EquivalenceProperties>(
+fn requirements_compatible_concrete<
+ F: FnOnce() -> OrderingEquivalenceProperties,
+ F2: FnOnce() -> EquivalenceProperties,
+>(
provided: &[PhysicalSortRequirement],
required: &[PhysicalSortRequirement],
- equal_properties: F,
+ ordering_equal_properties: F,
+ equal_properties: F2,
) -> bool {
if required.len() > provided.len() {
false
@@ -321,24 +459,20 @@ fn requirements_compatible_concrete<F: FnOnce() ->
EquivalenceProperties>(
.all(|(req, given)| given.compatible(req))
{
true
- } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+ } else {
+ let oeq_properties = ordering_equal_properties();
+ let ordering_eq_classes = oeq_properties.classes();
+ let eq_properties = equal_properties();
+ let eq_classes = eq_properties.classes();
required
.iter()
.map(|e| {
- normalize_sort_requirement_with_equivalence_properties(
- e.clone(),
- eq_classes,
- )
+ normalize_sort_requirement(e.clone(), eq_classes,
ordering_eq_classes)
})
.zip(provided.iter().map(|e| {
- normalize_sort_requirement_with_equivalence_properties(
- e.clone(),
- eq_classes,
- )
+ normalize_sort_requirement(e.clone(), eq_classes,
ordering_eq_classes)
}))
.all(|(req, given)| given.compatible(&req))
- } else {
- false
}
}
@@ -643,6 +777,55 @@ mod tests {
}
}
+ // Generate a schema which consists of 5 columns (a, b, c, d, e)
+ fn create_test_schema() -> Result<SchemaRef> {
+ let a = Field::new("a", DataType::Int32, true);
+ let b = Field::new("b", DataType::Int32, true);
+ let c = Field::new("c", DataType::Int32, true);
+ let d = Field::new("d", DataType::Int32, true);
+ let e = Field::new("e", DataType::Int32, true);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+ Ok(schema)
+ }
+
+ fn create_test_params() -> Result<(
+ SchemaRef,
+ EquivalenceProperties,
+ OrderingEquivalenceProperties,
+ )> {
+ // Assume schema satisfies ordering a ASC NULLS LAST
+ // and d ASC NULLS LAST and e DESC NULLS FIRST
+ // Assume that column a and c are aliases.
+ let col_a = &Column::new("a", 0);
+ let _col_b = &Column::new("b", 1);
+ let col_c = &Column::new("c", 2);
+ let col_d = &Column::new("d", 3);
+ let col_e = &Column::new("e", 4);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option2 = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ let test_schema = create_test_schema()?;
+ let mut eq_properties =
EquivalenceProperties::new(test_schema.clone());
+ eq_properties.add_equal_conditions((col_a, col_c));
+ let mut ordering_eq_properties =
+ OrderingEquivalenceProperties::new(test_schema.clone());
+ ordering_eq_properties.add_equal_conditions((
+ &OrderedColumn::new(col_a.clone(), option1),
+ &OrderedColumn::new(col_d.clone(), option1),
+ ));
+ ordering_eq_properties.add_equal_conditions((
+ &OrderedColumn::new(col_a.clone(), option1),
+ &OrderedColumn::new(col_e.clone(), option2),
+ ));
+ Ok((test_schema, eq_properties, ordering_eq_properties))
+ }
+
#[test]
fn test_build_dag() -> Result<()> {
let schema = Schema::new(vec![
@@ -911,12 +1094,86 @@ mod tests {
];
let finer = Some(&finer[..]);
let empty_schema = &Arc::new(Schema::empty());
- assert!(ordering_satisfy(finer, crude, || {
- EquivalenceProperties::new(empty_schema.clone())
- }));
- assert!(!ordering_satisfy(crude, finer, || {
- EquivalenceProperties::new(empty_schema.clone())
- }));
+ assert!(ordering_satisfy(
+ finer,
+ crude,
+ || { EquivalenceProperties::new(empty_schema.clone()) },
+ || { OrderingEquivalenceProperties::new(empty_schema.clone()) },
+ ));
+ assert!(!ordering_satisfy(
+ crude,
+ finer,
+ || { EquivalenceProperties::new(empty_schema.clone()) },
+ || { OrderingEquivalenceProperties::new(empty_schema.clone()) },
+ ));
+ Ok(())
+ }
+
+ #[test]
+ fn test_ordering_satisfy_with_equivalence() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let col_b = &Column::new("b", 1);
+ let col_c = &Column::new("c", 2);
+ let col_d = &Column::new("d", 3);
+ let col_e = &Column::new("e", 4);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option2 = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ // The schema is ordered by a ASC NULLS LAST, b ASC NULLS LAST
+ let provided = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(col_a.clone()),
+ options: option1,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(col_b.clone()),
+ options: option1,
+ },
+ ];
+ let provided = Some(&provided[..]);
+ let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
+ // First element in the tuple stores vector of requirement, second
element is the expected return value for ordering_satisfy function
+ let requirements = vec![
+ // `a ASC NULLS LAST`, expects `ordering_satisfy` to be `true`,
since existing ordering `a ASC NULLS LAST, b ASC NULLS LAST` satisfies it
+ (vec![(col_a, option1)], true),
+ (vec![(col_a, option2)], false),
+ // Test whether equivalence works as expected
+ (vec![(col_c, option1)], true),
+ (vec![(col_c, option2)], false),
+ // Test whether ordering equivalence works as expected
+ (vec![(col_d, option1)], true),
+ (vec![(col_d, option2)], false),
+ (vec![(col_e, option2)], true),
+ (vec![(col_e, option1)], false),
+ ];
+ for (cols, expected) in requirements {
+ let err_msg = format!("Error in test case:{:?}", cols);
+ let required = cols
+ .into_iter()
+ .map(|(col, options)| PhysicalSortExpr {
+ expr: Arc::new(col.clone()),
+ options,
+ })
+ .collect::<Vec<_>>();
+
+ let required = Some(&required[..]);
+ assert_eq!(
+ ordering_satisfy(
+ provided,
+ required,
+ || eq_properties.clone(),
+ || ordering_eq_properties.clone(),
+ ),
+ expected,
+ "{}",
+ err_msg
+ );
+ }
Ok(())
}
@@ -956,4 +1213,230 @@ mod tests {
assert_eq!(actual.as_ref(), expected.as_any());
}
+
+ #[test]
+ fn test_normalize_expr_with_equivalence() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let _col_b = &Column::new("b", 1);
+ let col_c = &Column::new("c", 2);
+ let col_d = &Column::new("d", 3);
+ let col_e = &Column::new("e", 4);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option2 = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ // Assume schema satisfies ordering a ASC NULLS LAST
+ // and d ASC NULLS LAST and e DESC NULLS FIRST
+ // Assume that column a and c are aliases.
+ let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
+
+ let col_a_expr = Arc::new(col_a.clone()) as Arc<dyn PhysicalExpr>;
+ let col_c_expr = Arc::new(col_c.clone()) as Arc<dyn PhysicalExpr>;
+ let col_d_expr = Arc::new(col_d.clone()) as Arc<dyn PhysicalExpr>;
+ let col_e_expr = Arc::new(col_e.clone()) as Arc<dyn PhysicalExpr>;
+ // Test cases for equivalence normalization,
+ // First entry in the tuple is argument, second entry is expected
result after normalization.
+ let expressions = vec![(&col_a_expr, &col_a_expr), (&col_c_expr,
&col_a_expr)];
+ for (expr, expected_eq) in expressions {
+ assert!(
+ expected_eq.eq(&normalize_expr_with_equivalence_properties(
+ expr.clone(),
+ eq_properties.classes()
+ )),
+ "error in test: expr: {:?}",
+ expr
+ );
+ }
+
+ // Test cases for ordering equivalence normalization
+ // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
+ let expressions = vec![
+ (&col_d_expr, option1, &col_a_expr),
+ (&col_e_expr, option2, &col_a_expr),
+ // Cannot normalize, hence should return itself.
+ (&col_e_expr, option1, &col_e_expr),
+ ];
+ for (expr, sort_options, expected_ordering_eq) in expressions {
+ assert!(
+ expected_ordering_eq.eq(
+ &normalize_expr_with_ordering_equivalence_properties(
+ expr.clone(),
+ Some(sort_options),
+ ordering_eq_properties.classes()
+ )
+ ),
+ "error in test: expr: {:?}, sort_options: {:?}",
+ expr,
+ sort_options
+ );
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_normalize_sort_expr_with_equivalence() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let _col_b = &Column::new("b", 1);
+ let col_c = &Column::new("c", 2);
+ let col_d = &Column::new("d", 3);
+ let col_e = &Column::new("e", 4);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option2 = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ // Assume schema satisfies ordering a ASC NULLS LAST
+ // and d ASC NULLS LAST and e DESC NULLS FIRST
+ // Assume that column a and c are aliases.
+ let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
+
+ // Test cases for equivalence normalization
+ // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
+ let expressions = vec![
+ (&col_a, option1, &col_a, option1),
+ (&col_c, option1, &col_a, option1),
+ // Cannot normalize column d, since it is not in equivalence
properties.
+ (&col_d, option1, &col_d, option1),
+ ];
+ for (expr, sort_options, expected_col, expected_options) in
+ expressions.into_iter()
+ {
+ let expected = PhysicalSortExpr {
+ expr: Arc::new((*expected_col).clone()) as _,
+ options: expected_options,
+ };
+ let arg = PhysicalSortExpr {
+ expr: Arc::new((*expr).clone()) as _,
+ options: sort_options,
+ };
+ assert!(
+ expected.eq(&normalize_sort_expr_with_equivalence_properties(
+ arg.clone(),
+ eq_properties.classes()
+ )),
+ "error in test: expr: {:?}, sort_options: {:?}",
+ expr,
+ sort_options
+ );
+ }
+
+ // Test cases for ordering equivalence normalization
+ // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
+ let expressions = vec![
+ (&col_d, option1, &col_a, option1),
+ (&col_e, option2, &col_a, option1),
+ ];
+ for (expr, sort_options, expected_col, expected_options) in
+ expressions.into_iter()
+ {
+ let expected = PhysicalSortExpr {
+ expr: Arc::new((*expected_col).clone()) as _,
+ options: expected_options,
+ };
+ let arg = PhysicalSortExpr {
+ expr: Arc::new((*expr).clone()) as _,
+ options: sort_options,
+ };
+ assert!(
+
expected.eq(&normalize_sort_expr_with_ordering_equivalence_properties(
+ arg.clone(),
+ ordering_eq_properties.classes()
+ )),
+ "error in test: expr: {:?}, sort_options: {:?}",
+ expr,
+ sort_options
+ );
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn test_normalize_sort_requirement_with_equivalence() -> Result<()> {
+ let col_a = &Column::new("a", 0);
+ let _col_b = &Column::new("b", 1);
+ let col_c = &Column::new("c", 2);
+ let col_d = &Column::new("d", 3);
+ let col_e = &Column::new("e", 4);
+ let option1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option2 = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ // Assume schema satisfies ordering a ASC NULLS LAST
+ // and d ASC NULLS LAST and e DESC NULLS FIRST
+ // Assume that column a and c are aliases.
+ let (_test_schema, eq_properties, ordering_eq_properties) =
create_test_params()?;
+
+ // Test cases for equivalence normalization
+ // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
+ let expressions = vec![
+ (&col_a, Some(option1), &col_a, Some(option1)),
+ (&col_c, Some(option1), &col_a, Some(option1)),
+ (&col_c, None, &col_a, None),
+ // Cannot normalize column d, since it is not in equivalence
properties.
+ (&col_d, Some(option1), &col_d, Some(option1)),
+ ];
+ for (expr, sort_options, expected_col, expected_options) in
+ expressions.into_iter()
+ {
+ let expected = PhysicalSortRequirement::new(
+ Arc::new((*expected_col).clone()) as _,
+ expected_options,
+ );
+ let arg = PhysicalSortRequirement::new(
+ Arc::new((*expr).clone()) as _,
+ sort_options,
+ );
+ assert!(
+
expected.eq(&normalize_sort_requirement_with_equivalence_properties(
+ arg.clone(),
+ eq_properties.classes()
+ )),
+ "error in test: expr: {:?}, sort_options: {:?}",
+ expr,
+ sort_options
+ );
+ }
+
+ // Test cases for ordering equivalence normalization
+ // First entry in the tuple is PhysicalExpr, second entry is its
ordering, third entry is result after normalization.
+ let expressions = vec![
+ (&col_d, Some(option1), &col_a, Some(option1)),
+ (&col_e, Some(option2), &col_a, Some(option1)),
+ ];
+ for (expr, sort_options, expected_col, expected_options) in
+ expressions.into_iter()
+ {
+ let expected = PhysicalSortRequirement::new(
+ Arc::new((*expected_col).clone()) as _,
+ expected_options,
+ );
+ let arg = PhysicalSortRequirement::new(
+ Arc::new((*expr).clone()) as _,
+ sort_options,
+ );
+ assert!(
+ expected.eq(
+
&normalize_sort_requirement_with_ordering_equivalence_properties(
+ arg.clone(),
+ ordering_eq_properties.classes()
+ )
+ ),
+ "error in test: expr: {:?}, sort_options: {:?}",
+ expr,
+ sort_options
+ );
+ }
+ Ok(())
+ }
}