This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9397553731 MINOR: Bug fix, Use correct ordering equivalence when 
window expr contains partition by (#7011)
9397553731 is described below

commit 9397553731505710a2d56b2e3e39c6c605555546
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Jul 19 13:39:05 2023 +0300

    MINOR: Bug fix, Use correct ordering equivalence when window expr contains 
partition by (#7011)
    
    * initial commit
    
    * Add partition by check for introducing ordering for windows.
    
    * Add prefix handling
    
    * Change API to return result ordering
    
    * Add ordering for rank, Simplifications and new tests
    
    * Add ordering for NTILE window function
    
    * Update datafusion/physical-expr/src/window/row_number.rs
    
    Co-authored-by: Metehan Yıldırım 
<[email protected]>
    
    * Update datafusion/physical-expr/src/window/row_number.rs
    
    Co-authored-by: Metehan Yıldırım 
<[email protected]>
    
    * Simplifications
    
    * Add wrongly deleted test during merge
    
    * Remove duplicated test
    
    * Minor changes
    
    * Update comments
    
    * More idiomatic Rust, improved comments
    
    ---------
    
    Co-authored-by: Metehan Yıldırım 
<[email protected]>
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/core/src/physical_plan/windows/mod.rs   |  3 +-
 .../core/tests/sqllogictests/test_files/window.slt | 89 ++++++++++++++++++++++
 datafusion/physical-expr/src/equivalence.rs        |  5 ++
 datafusion/physical-expr/src/window/built_in.rs    | 50 +++++++++++-
 .../src/window/built_in_window_function_expr.rs    | 18 +++--
 datafusion/physical-expr/src/window/ntile.rs       | 19 ++++-
 datafusion/physical-expr/src/window/rank.rs        | 18 ++++-
 datafusion/physical-expr/src/window/row_number.rs  | 23 +++---
 8 files changed, 198 insertions(+), 27 deletions(-)

diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index ff7936e5ce..9b44ac615c 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -350,8 +350,7 @@ pub(crate) fn window_ordering_equivalence(
             expr.as_any().downcast_ref::<BuiltInWindowExpr>()
         {
             builtin_window_expr
-                .get_built_in_func_expr()
-                .add_equal_orderings(&mut builder);
+                .add_equal_orderings(&mut builder, || 
input.equivalence_properties());
         }
     }
     builder.build()
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 5d97b311d1..a2b2038ace 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -3105,6 +3105,95 @@ CoalesceBatchesExec: target_batch_size=4096
 --------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY 
[annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
 ----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC 
NULLS LAST, c@3 ASC NULLS LAST], has_header=true
 
+# this is a negative test for asserting that window functions (other than 
ROW_NUMBER)
+# are not added to ordering equivalence
+# physical plan should contain SortExec.
+query TT
+EXPLAIN SELECT c9, sum1 FROM (SELECT c9,
+                       SUM(c9) OVER(ORDER BY c9 DESC) as sum1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY sum1, c9 DESC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: sum1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS sum1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--SortExec: fetch=5, expr=[sum1@1 ASC NULLS LAST,c9@0 DESC]
+----ProjectionExec: expr=[c9@0 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as sum1]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: "SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow } [...]
+--------SortExec: expr=[c9@0 DESC]
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
has_header=true
+
+# Query below should work when its input is unbounded
+# because ordering of ROW_NUMBER, RANK result is added to the ordering 
equivalence
+# and final plan doesn't contain SortExec.
+query IIII
+SELECT a, d, rn1, rank1 FROM (SELECT a, d,
+                       ROW_NUMBER() OVER(ORDER BY a ASC) as rn1,
+                       RANK() OVER(ORDER BY a ASC) as rank1
+                       FROM annotated_data_infinite2
+                       ORDER BY a ASC)
+       ORDER BY rn1, rank1, a ASC
+       LIMIT 5
+----
+0 0 1 1
+0 2 2 1
+0 0 3 1
+0 0 4 1
+0 1 5 1
+
+# this is a negative test for asserting that ROW_NUMBER is not
+# added to the ordering equivalence when it contains partition by.
+# physical plan should contain SortExec. Since source is unbounded
+# pipeline checker should raise error, when plan contains SortExec.
+statement error DataFusion error: PipelineChecker
+SELECT a, d, rn1 FROM (SELECT a, d,
+                       ROW_NUMBER() OVER(PARTITION BY d ORDER BY a ASC) as rn1
+                       FROM annotated_data_infinite2
+                       ORDER BY a ASC)
+       ORDER BY rn1, a ASC
+       LIMIT 5
+
+# when partition by expressions match with existing ordering
+# row number can be appended to existing ordering
+# below query should work, without breaking pipeline.
+query III
+SELECT a, d, rn1 FROM (SELECT a, b, c, d,
+                       ROW_NUMBER() OVER(PARTITION BY b, c, a) as rn1
+                       FROM annotated_data_infinite2
+                       ORDER BY a ASC)
+       ORDER BY a, b, c, rn1
+       LIMIT 5
+----
+0 0 1
+0 2 1
+0 0 1
+0 0 1
+0 1 1
+
+# projection should propagate ordering equivalence successfully
+# when expression contains alias
+query III
+SELECT a_new, d, rn1 FROM (SELECT d, a as a_new,
+                       ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
+                       FROM annotated_data_infinite2
+                       ORDER BY a_new ASC)
+       ORDER BY a_new ASC, rn1
+       LIMIT 5
+----
+0 0 1
+0 2 2
+0 0 3
+0 0 4
+0 1 5
+
 statement ok
 drop table annotated_data_finite2
 
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index f458f4c709..d08b2e2c4a 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -339,6 +339,11 @@ impl OrderingEquivalenceBuilder {
         &self.schema
     }
 
+    /// Return a reference to the existing ordering
+    pub fn existing_ordering(&self) -> &LexOrdering {
+        &self.existing_ordering
+    }
+
     pub fn build(self) -> OrderingEquivalenceProperties {
         self.ordering_eq_properties
     }
diff --git a/datafusion/physical-expr/src/window/built_in.rs 
b/datafusion/physical-expr/src/window/built_in.rs
index e81ffe59b8..2958b21cad 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -23,9 +23,12 @@ use std::sync::Arc;
 
 use super::BuiltInWindowFunctionExpr;
 use super::WindowExpr;
+use crate::equivalence::OrderingEquivalenceBuilder;
+use crate::expressions::PhysicalSortExpr;
+use crate::utils::{convert_to_expr, get_indices_of_matching_exprs};
 use crate::window::window_expr::{get_orderby_values, WindowFn};
 use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
-use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
+use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
 use arrow::array::{new_empty_array, ArrayRef};
 use arrow::compute::SortOptions;
 use arrow::datatypes::Field;
@@ -65,6 +68,51 @@ impl BuiltInWindowExpr {
     pub fn get_built_in_func_expr(&self) -> &Arc<dyn 
BuiltInWindowFunctionExpr> {
         &self.expr
     }
+
+    /// Adds any equivalent orderings generated by the `self.expr`
+    /// to `builder`.
+    ///
+    /// If `self.expr` doesn't have an ordering, ordering equivalence 
properties
+    /// are not updated. Otherwise, ordering equivalence properties are updated
+    /// by the ordering of `self.expr`.
+    pub fn add_equal_orderings<F: FnOnce() -> EquivalenceProperties>(
+        &self,
+        builder: &mut OrderingEquivalenceBuilder,
+        equal_properties: F,
+    ) {
+        let schema = builder.schema();
+        if let Some(fn_res_ordering) = self.expr.get_result_ordering(schema) {
+            if self.partition_by.is_empty() {
+                // In the absence of a PARTITION BY, ordering of `self.expr` 
is global:
+                builder.add_equal_conditions(vec![fn_res_ordering]);
+            } else {
+                // If we have a PARTITION BY, built-in functions can not 
introduce
+                // a global ordering unless the existing ordering is compatible
+                // with PARTITION BY expressions. To elaborate, when PARTITION 
BY
+                // expressions and existing ordering expressions are equal 
(w.r.t.
+                // set equality), we can prefix the ordering of `self.expr` 
with
+                // the existing ordering.
+                let existing_ordering = builder.existing_ordering();
+                let existing_ordering_exprs = 
convert_to_expr(existing_ordering);
+                // Get indices of the PARTITION BY expressions among input 
ordering expressions:
+                let pb_indices = get_indices_of_matching_exprs(
+                    &self.partition_by,
+                    &existing_ordering_exprs,
+                    equal_properties,
+                );
+                // Existing ordering should match exactly with PARTITION BY 
expressions.
+                // There should be no missing/extra entries in the existing 
ordering.
+                // Otherwise, prefixing wouldn't work.
+                if pb_indices.len() == self.partition_by.len()
+                    && pb_indices.len() == existing_ordering.len()
+                {
+                    let mut new_ordering = existing_ordering.to_vec();
+                    new_ordering.push(fn_res_ordering);
+                    builder.add_equal_conditions(new_ordering);
+                }
+            }
+        }
+    }
 }
 
 impl WindowExpr for BuiltInWindowExpr {
diff --git 
a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs 
b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 73e0658267..1a060817d2 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -15,13 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::equivalence::OrderingEquivalenceBuilder;
-use crate::PhysicalExpr;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
 use arrow::array::ArrayRef;
 use arrow::datatypes::Field;
 use arrow::record_batch::RecordBatch;
+use arrow_schema::SchemaRef;
 use datafusion_common::Result;
 use datafusion_expr::PartitionEvaluator;
+
 use std::any::Any;
 use std::sync::Arc;
 
@@ -80,9 +82,11 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + 
std::fmt::Debug {
         None
     }
 
-    /// Adds any equivalent orderings generated by this expression
-    /// to `builder`.
-    ///
-    /// The default implementation does nothing
-    fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}
+    /// Returns the ordering introduced by the window function, if applicable.
+    /// Most window functions don't introduce an ordering, hence the default
+    /// value is `None`. Note that this information is used to update ordering
+    /// equivalences.
+    fn get_result_ordering(&self, _schema: &SchemaRef) -> 
Option<PhysicalSortExpr> {
+        None
+    }
 }
diff --git a/datafusion/physical-expr/src/window/ntile.rs 
b/datafusion/physical-expr/src/window/ntile.rs
index 6019ffbeef..008da422da 100644
--- a/datafusion/physical-expr/src/window/ntile.rs
+++ b/datafusion/physical-expr/src/window/ntile.rs
@@ -18,13 +18,16 @@
 //! Defines physical expression for `ntile` that can evaluated
 //! at runtime during query execution
 
+use crate::expressions::Column;
 use crate::window::BuiltInWindowFunctionExpr;
-use crate::PhysicalExpr;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
 use arrow::array::{ArrayRef, UInt64Array};
 use arrow::datatypes::Field;
-use arrow_schema::DataType;
+use arrow_schema::{DataType, SchemaRef, SortOptions};
 use datafusion_common::Result;
 use datafusion_expr::PartitionEvaluator;
+
 use std::any::Any;
 use std::sync::Arc;
 
@@ -62,6 +65,18 @@ impl BuiltInWindowFunctionExpr for Ntile {
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(NtileEvaluator { n: self.n }))
     }
+
+    fn get_result_ordering(&self, schema: &SchemaRef) -> 
Option<PhysicalSortExpr> {
+        // The built-in NTILE window function introduces a new ordering:
+        schema.column_with_name(self.name()).map(|(idx, field)| {
+            let expr = Arc::new(Column::new(field.name(), idx));
+            let options = SortOptions {
+                descending: false,
+                nulls_first: false,
+            }; // ASC, NULLS LAST
+            PhysicalSortExpr { expr, options }
+        })
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/physical-expr/src/window/rank.rs 
b/datafusion/physical-expr/src/window/rank.rs
index 48ff872078..9eb442f8f9 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -18,15 +18,19 @@
 //! Defines physical expression for `rank`, `dense_rank`, and `percent_rank` 
that can evaluated
 //! at runtime during query execution
 
+use crate::expressions::Column;
 use crate::window::window_expr::RankState;
 use crate::window::BuiltInWindowFunctionExpr;
-use crate::PhysicalExpr;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
 use arrow::array::ArrayRef;
 use arrow::array::{Float64Array, UInt64Array};
 use arrow::datatypes::{DataType, Field};
+use arrow_schema::{SchemaRef, SortOptions};
 use datafusion_common::utils::get_row_at_idx;
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::PartitionEvaluator;
+
 use std::any::Any;
 use std::iter;
 use std::ops::Range;
@@ -106,6 +110,18 @@ impl BuiltInWindowFunctionExpr for Rank {
             rank_type: self.rank_type,
         }))
     }
+
+    fn get_result_ordering(&self, schema: &SchemaRef) -> 
Option<PhysicalSortExpr> {
+        // The built-in RANK window function (in all modes) introduces a new 
ordering:
+        schema.column_with_name(self.name()).map(|(idx, field)| {
+            let expr = Arc::new(Column::new(field.name(), idx));
+            let options = SortOptions {
+                descending: false,
+                nulls_first: false,
+            }; // ASC, NULLS LAST
+            PhysicalSortExpr { expr, options }
+        })
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/physical-expr/src/window/row_number.rs 
b/datafusion/physical-expr/src/window/row_number.rs
index b115e9f149..f5e2f65a65 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -17,16 +17,17 @@
 
 //! Defines physical expression for `row_number` that can evaluated at runtime 
during query execution
 
-use crate::equivalence::OrderingEquivalenceBuilder;
 use crate::expressions::Column;
 use crate::window::window_expr::NumRowsState;
 use crate::window::BuiltInWindowFunctionExpr;
 use crate::{PhysicalExpr, PhysicalSortExpr};
+
 use arrow::array::{ArrayRef, UInt64Array};
 use arrow::datatypes::{DataType, Field};
-use arrow_schema::SortOptions;
+use arrow_schema::{SchemaRef, SortOptions};
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::PartitionEvaluator;
+
 use std::any::Any;
 use std::ops::Range;
 use std::sync::Arc;
@@ -64,22 +65,16 @@ impl BuiltInWindowFunctionExpr for RowNumber {
         &self.name
     }
 
-    fn add_equal_orderings(&self, builder: &mut OrderingEquivalenceBuilder) {
-        // The built-in RowNumber window function introduces a new
-        // ordering:
-        let schema = builder.schema();
-        if let Some((idx, field)) = schema.column_with_name(self.name()) {
-            let column = Column::new(field.name(), idx);
+    fn get_result_ordering(&self, schema: &SchemaRef) -> 
Option<PhysicalSortExpr> {
+        // The built-in ROW_NUMBER window function introduces a new ordering:
+        schema.column_with_name(self.name()).map(|(idx, field)| {
+            let expr = Arc::new(Column::new(field.name(), idx));
             let options = SortOptions {
                 descending: false,
                 nulls_first: false,
             }; // ASC, NULLS LAST
-            let rhs = PhysicalSortExpr {
-                expr: Arc::new(column) as _,
-                options,
-            };
-            builder.add_equal_conditions(vec![rhs]);
-        }
+            PhysicalSortExpr { expr, options }
+        })
     }
 
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {

Reply via email to