alamb commented on code in PR #6956:
URL: https://github.com/apache/arrow-datafusion/pull/6956#discussion_r1264092547


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -381,6 +376,34 @@ impl ExecutionPlan for HashJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{

Review Comment:
   I think this calculation should be the same for other `ExecutionPlans` like 
CrossJoin / NestedLoopsJoin as well as the ordering equivalence is a function 
of the join predicates and type (INNER/OUTER, etc), not the algorithms?  
   
   I wonder if we should move this to a common location rather than just for 
`HashJoinExec`



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -3055,6 +3078,28 @@ statement error DataFusion error: Error during planning: 
Aggregate ORDER BY is n
 EXPLAIN SELECT a, b, LAST_VALUE(c ORDER BY a ASC) OVER (order by a ASC) as 
last_c
        FROM annotated_data_infinite2
 
+# ordering equivalence information
+# should propagate through FilterExec, LimitExec, etc.

Review Comment:
   This comment implies to me that the test should have a `WHERE` clause as 
well, but the query does not have one. The plan looks pretty much the same to 
me as the first plan in this file
   
   I think we should correct the test or the comment so they are consistent



##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{

Review Comment:
   Even though `CoalescePartitionsExec` doesn't preserve the input sort order I 
agree it preserves the input equivalences 👍 



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2429,6 +2429,29 @@ GlobalLimitExec: skip=0, fetch=5
 ------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
 --------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, 
c9], has_header=true
 
+# Ordering equivalence should be preserved during cast expression
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                       CAST(ROW_NUMBER() OVER(ORDER BY c9 DESC) as BIGINT) as 
rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY rn1 ASC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, CAST(ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS Int64) AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5

Review Comment:
   they key is that the final sort on `rn1` has been optimized away 👍 



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -295,6 +321,65 @@ impl ExecutionPlan for SortMergeJoinExec {
         )
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        let mut new_properties = 
OrderingEquivalenceProperties::new(self.schema());

Review Comment:
   Does this same logic apply to "SymmetricHashJoin" too?



##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -107,6 +107,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{

Review Comment:
   Even though `CoalescePartitionsExec` doesn't preserve the input sort order I 
agree it preserves the input equivalences 👍 



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -134,10 +166,26 @@ impl SortMergeJoinExec {
             .unzip();
 
         let output_ordering = match join_type {
-            JoinType::Inner
-            | JoinType::Left
-            | JoinType::LeftSemi
-            | JoinType::LeftAnti => {
+            JoinType::Inner => {
+                match (left.output_ordering(), right.output_ordering()) {
+                    // If both sides have orderings, ordering of the right 
hand side

Review Comment:
   👍 



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2429,6 +2429,29 @@ GlobalLimitExec: skip=0, fetch=5
 ------SortExec: expr=[CAST(c9@1 AS Int32) + c5@0 DESC]
 --------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5, 
c9], has_header=true
 
+# Ordering equivalence should be preserved during cast expression
+query TT
+EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
+                       CAST(ROW_NUMBER() OVER(ORDER BY c9 DESC) as BIGINT) as 
rn1
+                       FROM aggregate_test_100
+                       ORDER BY c9 DESC)
+       ORDER BY rn1 ASC
+       LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: rn1 ASC NULLS LAST, fetch=5
+----Sort: aggregate_test_100.c9 DESC NULLS FIRST
+------Projection: aggregate_test_100.c9, CAST(ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS Int64) AS rn1
+--------WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5

Review Comment:
   they key is that the final sort on `rn1` has been optimized away 👍 



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -134,10 +166,26 @@ impl SortMergeJoinExec {
             .unzip();
 
         let output_ordering = match join_type {
-            JoinType::Inner
-            | JoinType::Left
-            | JoinType::LeftSemi
-            | JoinType::LeftAnti => {
+            JoinType::Inner => {
+                match (left.output_ordering(), right.output_ordering()) {
+                    // If both sides have orderings, ordering of the right 
hand side

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to