ozankabak commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1096046687


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1748,17 +1748,20 @@ async fn test_window_partition_by_order_by() -> 
Result<()> {
 
     let msg = format!("Creating logical plan for '{}'", sql);
     let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await.unwrap();
+    let physical_plan = dataframe.create_physical_plan().await?;
     let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
     let expected = {
         vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: 
Following(UInt64(1)) }]",
-            "    SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+            "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
+            "    SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2)",
-            "          RepartitionExec: partitioning=RoundRobinBatch(2)",
+            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 1 }], 2)",
+            "          WindowAggExec: wdw=[SUM(aggregate_test_100.c4): 
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",

Review Comment:
   I would like to contribute to the discussion by explaining the general 
motivation here: An important reason as to why `PARTITION BY` and `ORDER BY` 
are not treated the same is that we may want to utilize hashing instead of 
sorting for `PARTITION BY` keys in the (quite near) future. In applications 
where a user needs to run queries without breaking the pipeline (such as 
applications involving unbounded sources), hashing is the right choice; in 
other cases sorting is the right choice. So we really shouldn't overfit to 
treating them uniformly -- the right strategy depends on the situation.
   
   Now, coming back to the specifics of this example: IMO, the root issue here 
is the intermediate `RepartitionExec`, which breaks the first sort and 
therefore induces the second sort. I am not 100% clear if this repartition is 
required -- if it is unnecessarily added, that is a problem of 
`EnforceDistribution`, not this rule. Even in cases where a repartitioning is 
actually required, the right thing to do would be to use a sort-preserving 
repartitioning, which would solve problems like this; the second sort will not 
come into the picture when we use the appropriate repartitioning executor. FYI, 
we are working on sort-preserving repartitioning currently.
   
   In the future, as we have more information available to us (e.g. are we 
using hashing or sorting for `PARTITION BY`s, does the raw input have a 
compatible ordering, what is our parallelism configuration, etc.), we can make 
the decision as to whether we want to have a single executor or multiple 
executors in cases like this. I would be definitely on board with that. 
However; the default choice (in the absence of these pieces of information) 
should be not to make any assumptions, so I agree with @mustafasrepo for the 
time being.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to