mustafasrepo commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1056236187


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1748,17 +1748,20 @@ async fn test_window_partition_by_order_by() -> 
Result<()> {
 
     let msg = format!("Creating logical plan for '{}'", sql);
     let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await.unwrap();
+    let physical_plan = dataframe.create_physical_plan().await?;
     let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(1)) }]",
-            "    SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+            "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
+            "    SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2)",
-            "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 1 }], 2)",
+            "          WindowAggExec: wdw=[SUM(aggregate_test_100.c4): 
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",

Review Comment:
   In the previous version expressions below was going to the same 
`WindowAggExec`. 
   ```sql
        SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 PRECEDING 
AND 1 FOLLOWING),
        COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 
1 FOLLOWING) 
   ```
   However, this behavior is contradictory to POSTGRE's behavior. When I run 
the query
   ```sql
   SELECT 
       SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 PRECEDING 
AND 1 FOLLOWING),
       COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING) 
       FROM aggregate_test_100
   ```
   POSTGRE produces physical plan below
   ```sql
   WindowAgg (cost=34.94..53.34 rows=460 width=52) (actual time=0.112..0.243 
rows=100 loops=1)
   -> WindowAgg (cost=34.94..45.29 rows=460 width=46) (actual time=0.105..0.187 
rows=100 loops=1)
   -> Sort (cost=34.94..36.09 rows=460 width=38) (actual time=0.097..0.103 
rows=100 loops=1)
   Sort Key: c1, c2
   Sort Method: quicksort Memory: 29kB
   -> Seq Scan on aggregate_test_100 (cost=0.00..14.60 rows=460 width=38) 
(actual time=0.012..0.026 rows=100 loops=1)
   ```
   The reason we cannot optimize away second `SortExec` is that. 
`RepartitionExec` doesn't maintain ordering. In the future, we plan to add an 
order preserving `RepartitionExec`, when this happens. Second `SortExec` will 
be unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to