mingmwang commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1095529675


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1748,17 +1748,20 @@ async fn test_window_partition_by_order_by() -> 
Result<()> {
 
     let msg = format!("Creating logical plan for '{}'", sql);
     let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await.unwrap();
+    let physical_plan = dataframe.create_physical_plan().await?;
     let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(1)) }]",
-            "    SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+            "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
+            "    SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2)",
-            "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 1 }], 2)",
+            "          WindowAggExec: wdw=[SUM(aggregate_test_100.c4): 
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",

Review Comment:
   Sorry that I still do not understand.  For this SQL, could you please 
explain what a better plan it should be and what was the old plan ?
   Yes, `PARTITION BY` columns are ordered and the ordering direction is not 
important, can we normalized that and make the sorting ordering to be 
consistent with the other window expressions and make them can be evaluated in 
one
   `WindowAggExec`.



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1748,17 +1748,20 @@ async fn test_window_partition_by_order_by() -> 
Result<()> {
 
     let msg = format!("Creating logical plan for '{}'", sql);
     let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await.unwrap();
+    let physical_plan = dataframe.create_physical_plan().await?;
     let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(1)) }]",
-            "    SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+            "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
+            "    SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2)",
-            "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 1 }], 2)",
+            "          WindowAggExec: wdw=[SUM(aggregate_test_100.c4): 
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",

Review Comment:
   Sorry that I still do not understand.  For this SQL, could you please 
explain what a better plan it should be and what was the old plan ?
   Yes, `PARTITION BY` columns are ordered and the ordering direction is not 
important, can we normalized that and make the sorting ordering to be 
consistent with the other window expressions and make them can be evaluated in 
one `WindowAggExec`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to