alamb commented on code in PR #6160:
URL: https://github.com/apache/arrow-datafusion/pull/6160#discussion_r1181636859


##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -205,9 +209,56 @@ pub fn project_equivalence_properties(
     for class in ec_classes.iter_mut() {
         let mut columns_to_remove = vec![];
         for column in class.iter() {
-            if column.index() >= schema.fields().len()
-                || schema.fields()[column.index()].name() != column.name()
+            let fields = schema.fields();
+            let idx = column.index();
+            if idx >= fields.len() || fields[idx].name() != column.name() {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            class.remove(&column);
+        }
+    }
+    ec_classes.retain(|props| props.len() > 1);
+    output_eq.extend(ec_classes);
+}
+
+/// This function applies the given projection to the given ordering

Review Comment:
   When reading this PR, it seems to me that these functions (and those in 
datafusion/physical-expr/src/utils.rs that take a `EquivalenceProperties` / 
`OrderingEquivalenceProperties` as the first argument would be easier to find / 
use if they were methods on `EquivalenceProperties` / 
`OrderingEquivalenceProperties` 
   
   So like
   
   ```rust
   impl OrderingEquivalenceProperties {
     fn project_ordering_equivalence_properties(self, colums_map: ..., 
output_eq: ...) { ... }
   }
   ```
   
   I realize given how this code is currently structured as a typedef this 
would be hard to do 



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -205,9 +209,56 @@ pub fn project_equivalence_properties(
     for class in ec_classes.iter_mut() {
         let mut columns_to_remove = vec![];
         for column in class.iter() {
-            if column.index() >= schema.fields().len()
-                || schema.fields()[column.index()].name() != column.name()
+            let fields = schema.fields();
+            let idx = column.index();
+            if idx >= fields.len() || fields[idx].name() != column.name() {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            class.remove(&column);
+        }
+    }
+    ec_classes.retain(|props| props.len() > 1);
+    output_eq.extend(ec_classes);
+}
+
+/// This function applies the given projection to the given ordering
+/// equivalence properties to compute the resulting (projected) ordering
+/// equivalence properties; e.g.
+/// 1) Adding an alias, which can introduce additional ordering equivalence
+///    properties, as in Projection(a, a as a1, a as a2) extends global 
ordering
+///    of a to a1 and a2.
+/// 2) Truncate the [`OrderingEquivalentClass`]es that are not in the output 
schema.
+pub fn project_ordering_equivalence_properties(
+    input_eq: OrderingEquivalenceProperties,
+    columns_map: &HashMap<Column, Vec<Column>>,
+    output_eq: &mut OrderingEquivalenceProperties,
+) {
+    let mut ec_classes = input_eq.classes().to_vec();
+    for (column, columns) in columns_map {
+        for class in ec_classes.iter_mut() {
+            if let Some(OrderedColumn { options, .. }) =

Review Comment:
   this seems almost identical to `project_equivalence_properties` except that 
the check for equivalence and the insertion of new columns is different. I 
wonder if there is some way to combine them 🤔 



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -219,6 +270,26 @@ pub fn project_equivalence_properties(
     output_eq.extend(ec_classes);
 }
 
+/// Finds matching column inside OrderingEquivalentClass.
+fn get_matching_column(

Review Comment:
   This seems like a great candidate for a method
   
   ```rust
   impl OrderingEquivalentClass {
     fn get_matching_column(&self, column: &Column) -> T {
     ...
     }
   }
   ```



##########
datafusion/physical-expr/src/equivalence.rs:
##########
@@ -112,41 +102,45 @@ impl EquivalenceProperties {
     }
 }
 
-/// Equivalent Class is a set of Columns that are known to have the same value 
in all tuples in a relation
-/// Equivalent Class is generated by equality predicates, typically equijoin 
conditions and equality conditions in filters.
+pub type OrderingEquivalenceProperties = EquivalenceProperties<OrderedColumn>;
+
+/// EquivalentClass is a set of [`Column`]s or [`OrderedColumn`]s that are 
known
+/// to have the same value in all tuples in a relation. This object is 
generated
+/// by equality predicates, typically equijoin conditions and equality 
conditions
+/// in filters.

Review Comment:
   I think this documentation is not quite correct for 
`OrderingEquivalenceClass` as in that case the ordering comes from join 
predicates as well as (potentially window aggregates)
   
   
   
   



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -911,12 +1064,18 @@ mod tests {
         ];
         let finer = Some(&finer[..]);
         let empty_schema = &Arc::new(Schema::empty());

Review Comment:
   I wonder if some more unit tests would be valuable here mostly as a way to 
document how the functions were supposed to work -- trying to write more 
focused tests might also help be a forcing function to create fewer new 
functions / refactor functions to share more functionality



##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -196,27 +199,156 @@ pub fn 
normalize_sort_requirement_with_equivalence_properties(
     }
 }
 
+pub fn normalize_expr_with_ordering_equivalence_properties(

Review Comment:
   I wonder if there is some way to reduce the code replication here -- 
SortRequirement and SortExpr are very similar, as are `OrderingEquivalentClass` 
and `EquivalentClass`



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2204,3 +2204,78 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 
PRECEDING AND 1 FOLLOWING)
 2.994840293343 NULL
 9.674390599321 NULL
 7.728066219895 NULL
+
+# test_c9_rn_ordering_alias
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 ASC NULLS LAST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 ASC NULLS LAST]
+        CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 ASC)
+   ORDER BY rn1
+   LIMIT 5
+----
+28774375 1
+63044568 2
+141047417 3
+141680161 4
+145294611 5
+
+# test_c9_rn_ordering_alias_opposite_direction
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: rn1 ASC NULLS LAST, fetch=5
+    Sort: aggregate_test_100.c9 DESC NULLS FIRST
+      Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS rn1
+        WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@1 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted]
+      SortExec: expr=[c9@0 DESC]
+        CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c9]
+
+query II
+SELECT c9, rn1 FROM (SELECT c9,
+                   ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
+                   FROM aggregate_test_100
+                   ORDER BY c9 DESC)
+   ORDER BY rn1
+   LIMIT 5
+----
+4268716378 1
+4229654142 2
+4216440507 3
+4144173353 4
+4076864659 5
+

Review Comment:
   Could you pleas add some negative cases too, for example when the subquery 
is not ordered the same way as the window function?
   
   ```
   EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
                      ROW_NUMBER() OVER(ORDER BY c9 ASC) as rn1
                      FROM aggregate_test_100
                      ORDER BY c9 DESC)
      ORDER BY rn1
      LIMIT 5
   ```
   
   It would also be good I think to add something like 
   
   ```sql
   ORDER BY r1, cn9
   ```
   
   To  cover the 'more than one element in the equivalence class' case



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2204,3 +2204,78 @@ SELECT SUM(c12) OVER(ORDER BY c1, c2 GROUPS BETWEEN 1 
PRECEDING AND 1 FOLLOWING)
 2.994840293343 NULL
 9.674390599321 NULL
 7.728066219895 NULL
+
+# test_c9_rn_ordering_alias

Review Comment:
   Maybe worth a comment here saying  the intention is there is no `SortExec` 
after the `BoundedWindowAggExec`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to