This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9397553731 MINOR: Bug fix, Use correct ordering equivalence when
window expr contains partition by (#7011)
9397553731 is described below
commit 9397553731505710a2d56b2e3e39c6c605555546
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Jul 19 13:39:05 2023 +0300
MINOR: Bug fix, Use correct ordering equivalence when window expr contains
partition by (#7011)
* initial commit
* Add partition by check for introducing ordering for windows.
* Add prefix handling
* Change API to return result ordering
* Add ordering for rank, Simplifications and new tests
* Add ordering for NTILE window function
* Update datafusion/physical-expr/src/window/row_number.rs
Co-authored-by: Metehan Yıldırım
<[email protected]>
* Update datafusion/physical-expr/src/window/row_number.rs
Co-authored-by: Metehan Yıldırım
<[email protected]>
* Simplifications
* Add wrongly deleted test during merge
* Remove duplicated test
* Minor changes
* Update comments
* More idiomatic Rust, improved comments
---------
Co-authored-by: Metehan Yıldırım
<[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/core/src/physical_plan/windows/mod.rs | 3 +-
.../core/tests/sqllogictests/test_files/window.slt | 89 ++++++++++++++++++++++
datafusion/physical-expr/src/equivalence.rs | 5 ++
datafusion/physical-expr/src/window/built_in.rs | 50 +++++++++++-
.../src/window/built_in_window_function_expr.rs | 18 +++--
datafusion/physical-expr/src/window/ntile.rs | 19 ++++-
datafusion/physical-expr/src/window/rank.rs | 18 ++++-
datafusion/physical-expr/src/window/row_number.rs | 23 +++---
8 files changed, 198 insertions(+), 27 deletions(-)
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index ff7936e5ce..9b44ac615c 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -350,8 +350,7 @@ pub(crate) fn window_ordering_equivalence(
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
{
builtin_window_expr
- .get_built_in_func_expr()
- .add_equal_orderings(&mut builder);
+ .add_equal_orderings(&mut builder, ||
input.equivalence_properties());
}
}
builder.build()
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 5d97b311d1..a2b2038ace 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -3105,6 +3105,95 @@ CoalesceBatchesExec: target_batch_size=4096
--------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC
NULLS LAST, c@3 ASC NULLS LAST], has_header=true
+# this is a negative test for asserting that window functions (other than
ROW_NUMBER)
+# are not added to ordering equivalence
+# physical plan should contain SortExec.
+query TT
+EXPLAIN SELECT c9, sum1 FROM (SELECT c9,
+ SUM(c9) OVER(ORDER BY c9 DESC) as sum1
+ FROM aggregate_test_100
+ ORDER BY c9 DESC)
+ ORDER BY sum1, c9 DESC
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: sum1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW AS sum1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--SortExec: fetch=5, expr=[sum1@1 ASC NULLS LAST,c9@0 DESC]
+----ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@1 as sum1]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(UInt64(NULL)), end_bound: CurrentRow } [...]
+--------SortExec: expr=[c9@0 DESC]
+----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9],
has_header=true
+
+# Query below should work when its input is unbounded
+# because ordering of ROW_NUMBER, RANK result is added to the ordering
equivalence
+# and final plan doesn't contain SortExec.
+query IIII
+SELECT a, d, rn1, rank1 FROM (SELECT a, d,
+ ROW_NUMBER() OVER(ORDER BY a ASC) as rn1,
+ RANK() OVER(ORDER BY a ASC) as rank1
+ FROM annotated_data_infinite2
+ ORDER BY a ASC)
+ ORDER BY rn1, rank1, a ASC
+ LIMIT 5
+----
+0 0 1 1
+0 2 2 1
+0 0 3 1
+0 0 4 1
+0 1 5 1
+
+# this is a negative test for asserting that ROW_NUMBER is not
+# added to the ordering equivalence when it contains partition by.
+# physical plan should contain SortExec. Since source is unbounded
+# pipeline checker should raise error, when plan contains SortExec.
+statement error DataFusion error: PipelineChecker
+SELECT a, d, rn1 FROM (SELECT a, d,
+ ROW_NUMBER() OVER(PARTITION BY d ORDER BY a ASC) as rn1
+ FROM annotated_data_infinite2
+ ORDER BY a ASC)
+ ORDER BY rn1, a ASC
+ LIMIT 5
+
+# when partition by expressions match with existing ordering
+# row number can be appended to existing ordering
+# below query should work, without breaking pipeline.
+query III
+SELECT a, d, rn1 FROM (SELECT a, b, c, d,
+ ROW_NUMBER() OVER(PARTITION BY b, c, a) as rn1
+ FROM annotated_data_infinite2
+ ORDER BY a ASC)
+ ORDER BY a, b, c, rn1
+ LIMIT 5
+----
+0 0 1
+0 2 1
+0 0 1
+0 0 1
+0 1 1
+
+# projection should propagate ordering equivalence successfully
+# when expression contains alias
+query III
+SELECT a_new, d, rn1 FROM (SELECT d, a as a_new,
+ ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
+ FROM annotated_data_infinite2
+ ORDER BY a_new ASC)
+ ORDER BY a_new ASC, rn1
+ LIMIT 5
+----
+0 0 1
+0 2 2
+0 0 3
+0 0 4
+0 1 5
+
statement ok
drop table annotated_data_finite2
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index f458f4c709..d08b2e2c4a 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -339,6 +339,11 @@ impl OrderingEquivalenceBuilder {
&self.schema
}
+ /// Return a reference to the existing ordering
+ pub fn existing_ordering(&self) -> &LexOrdering {
+ &self.existing_ordering
+ }
+
pub fn build(self) -> OrderingEquivalenceProperties {
self.ordering_eq_properties
}
diff --git a/datafusion/physical-expr/src/window/built_in.rs
b/datafusion/physical-expr/src/window/built_in.rs
index e81ffe59b8..2958b21cad 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -23,9 +23,12 @@ use std::sync::Arc;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
+use crate::equivalence::OrderingEquivalenceBuilder;
+use crate::expressions::PhysicalSortExpr;
+use crate::utils::{convert_to_expr, get_indices_of_matching_exprs};
use crate::window::window_expr::{get_orderby_values, WindowFn};
use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
-use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
+use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
use arrow::array::{new_empty_array, ArrayRef};
use arrow::compute::SortOptions;
use arrow::datatypes::Field;
@@ -65,6 +68,51 @@ impl BuiltInWindowExpr {
pub fn get_built_in_func_expr(&self) -> &Arc<dyn
BuiltInWindowFunctionExpr> {
&self.expr
}
+
+ /// Adds any equivalent orderings generated by the `self.expr`
+ /// to `builder`.
+ ///
+ /// If `self.expr` doesn't have an ordering, ordering equivalence
properties
+ /// are not updated. Otherwise, ordering equivalence properties are updated
+ /// by the ordering of `self.expr`.
+ pub fn add_equal_orderings<F: FnOnce() -> EquivalenceProperties>(
+ &self,
+ builder: &mut OrderingEquivalenceBuilder,
+ equal_properties: F,
+ ) {
+ let schema = builder.schema();
+ if let Some(fn_res_ordering) = self.expr.get_result_ordering(schema) {
+ if self.partition_by.is_empty() {
+ // In the absence of a PARTITION BY, ordering of `self.expr`
is global:
+ builder.add_equal_conditions(vec![fn_res_ordering]);
+ } else {
+ // If we have a PARTITION BY, built-in functions can not
introduce
+ // a global ordering unless the existing ordering is compatible
+ // with PARTITION BY expressions. To elaborate, when PARTITION
BY
+ // expressions and existing ordering expressions are equal
(w.r.t.
+ // set equality), we can prefix the ordering of `self.expr`
with
+ // the existing ordering.
+ let existing_ordering = builder.existing_ordering();
+ let existing_ordering_exprs =
convert_to_expr(existing_ordering);
+ // Get indices of the PARTITION BY expressions among input
ordering expressions:
+ let pb_indices = get_indices_of_matching_exprs(
+ &self.partition_by,
+ &existing_ordering_exprs,
+ equal_properties,
+ );
+ // Existing ordering should match exactly with PARTITION BY
expressions.
+ // There should be no missing/extra entries in the existing
ordering.
+ // Otherwise, prefixing wouldn't work.
+ if pb_indices.len() == self.partition_by.len()
+ && pb_indices.len() == existing_ordering.len()
+ {
+ let mut new_ordering = existing_ordering.to_vec();
+ new_ordering.push(fn_res_ordering);
+ builder.add_equal_conditions(new_ordering);
+ }
+ }
+ }
+ }
}
impl WindowExpr for BuiltInWindowExpr {
diff --git
a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 73e0658267..1a060817d2 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -15,13 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use crate::equivalence::OrderingEquivalenceBuilder;
-use crate::PhysicalExpr;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
use arrow::array::ArrayRef;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
+use arrow_schema::SchemaRef;
use datafusion_common::Result;
use datafusion_expr::PartitionEvaluator;
+
use std::any::Any;
use std::sync::Arc;
@@ -80,9 +82,11 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync +
std::fmt::Debug {
None
}
- /// Adds any equivalent orderings generated by this expression
- /// to `builder`.
- ///
- /// The default implementation does nothing
- fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}
+ /// Returns the ordering introduced by the window function, if applicable.
+ /// Most window functions don't introduce an ordering, hence the default
+ /// value is `None`. Note that this information is used to update ordering
+ /// equivalences.
+ fn get_result_ordering(&self, _schema: &SchemaRef) ->
Option<PhysicalSortExpr> {
+ None
+ }
}
diff --git a/datafusion/physical-expr/src/window/ntile.rs
b/datafusion/physical-expr/src/window/ntile.rs
index 6019ffbeef..008da422da 100644
--- a/datafusion/physical-expr/src/window/ntile.rs
+++ b/datafusion/physical-expr/src/window/ntile.rs
@@ -18,13 +18,16 @@
//! Defines physical expression for `ntile` that can evaluated
//! at runtime during query execution
+use crate::expressions::Column;
use crate::window::BuiltInWindowFunctionExpr;
-use crate::PhysicalExpr;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::Field;
-use arrow_schema::DataType;
+use arrow_schema::{DataType, SchemaRef, SortOptions};
use datafusion_common::Result;
use datafusion_expr::PartitionEvaluator;
+
use std::any::Any;
use std::sync::Arc;
@@ -62,6 +65,18 @@ impl BuiltInWindowFunctionExpr for Ntile {
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(NtileEvaluator { n: self.n }))
}
+
+ fn get_result_ordering(&self, schema: &SchemaRef) ->
Option<PhysicalSortExpr> {
+ // The built-in NTILE window function introduces a new ordering:
+ schema.column_with_name(self.name()).map(|(idx, field)| {
+ let expr = Arc::new(Column::new(field.name(), idx));
+ let options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ }; // ASC, NULLS LAST
+ PhysicalSortExpr { expr, options }
+ })
+ }
}
#[derive(Debug)]
diff --git a/datafusion/physical-expr/src/window/rank.rs
b/datafusion/physical-expr/src/window/rank.rs
index 48ff872078..9eb442f8f9 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -18,15 +18,19 @@
//! Defines physical expression for `rank`, `dense_rank`, and `percent_rank`
that can evaluated
//! at runtime during query execution
+use crate::expressions::Column;
use crate::window::window_expr::RankState;
use crate::window::BuiltInWindowFunctionExpr;
-use crate::PhysicalExpr;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
use arrow::array::ArrayRef;
use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
+use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::PartitionEvaluator;
+
use std::any::Any;
use std::iter;
use std::ops::Range;
@@ -106,6 +110,18 @@ impl BuiltInWindowFunctionExpr for Rank {
rank_type: self.rank_type,
}))
}
+
+ fn get_result_ordering(&self, schema: &SchemaRef) ->
Option<PhysicalSortExpr> {
+ // The built-in RANK window function (in all modes) introduces a new
ordering:
+ schema.column_with_name(self.name()).map(|(idx, field)| {
+ let expr = Arc::new(Column::new(field.name(), idx));
+ let options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ }; // ASC, NULLS LAST
+ PhysicalSortExpr { expr, options }
+ })
+ }
}
#[derive(Debug)]
diff --git a/datafusion/physical-expr/src/window/row_number.rs
b/datafusion/physical-expr/src/window/row_number.rs
index b115e9f149..f5e2f65a65 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -17,16 +17,17 @@
//! Defines physical expression for `row_number` that can evaluated at runtime
during query execution
-use crate::equivalence::OrderingEquivalenceBuilder;
use crate::expressions::Column;
use crate::window::window_expr::NumRowsState;
use crate::window::BuiltInWindowFunctionExpr;
use crate::{PhysicalExpr, PhysicalSortExpr};
+
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{DataType, Field};
-use arrow_schema::SortOptions;
+use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::PartitionEvaluator;
+
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
@@ -64,22 +65,16 @@ impl BuiltInWindowFunctionExpr for RowNumber {
&self.name
}
- fn add_equal_orderings(&self, builder: &mut OrderingEquivalenceBuilder) {
- // The built-in RowNumber window function introduces a new
- // ordering:
- let schema = builder.schema();
- if let Some((idx, field)) = schema.column_with_name(self.name()) {
- let column = Column::new(field.name(), idx);
+ fn get_result_ordering(&self, schema: &SchemaRef) ->
Option<PhysicalSortExpr> {
+ // The built-in ROW_NUMBER window function introduces a new ordering:
+ schema.column_with_name(self.name()).map(|(idx, field)| {
+ let expr = Arc::new(Column::new(field.name(), idx));
let options = SortOptions {
descending: false,
nulls_first: false,
}; // ASC, NULLS LAST
- let rhs = PhysicalSortExpr {
- expr: Arc::new(column) as _,
- options,
- };
- builder.add_equal_conditions(vec![rhs]);
- }
+ PhysicalSortExpr { expr, options }
+ })
}
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {