ozankabak commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1096046687
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1748,17 +1748,20 @@ async fn test_window_partition_by_order_by() ->
Result<()> {
let msg = format!("Creating logical plan for '{}'", sql);
let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await.unwrap();
+ let physical_plan = dataframe.create_physical_plan().await?;
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field {
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }]",
- " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+ " WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name:
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
+ " SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 1 }], 2)",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c4):
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}]",
+ " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
Review Comment:
I would like to contribute to the discussion by explaining the general
motivation here: An important reason as to why `PARTITION BY` and `ORDER BY`
are not treated the same is that we may want to utilize hashing instead of
sorting for `PARTITION BY` keys in the (quite near) future. In applications
where a user needs to run queries without breaking the pipeline (such as
applications involving unbounded sources), hashing is the right choice; in
other cases sorting is the right choice. So we really shouldn't overfit to
treating them uniformly -- the right strategy depends on the situation.
Now, coming back to the specifics of this example: IMO, the root issue here
is the intermediate `RepartitionExec`, which breaks the first sort and
therefore induces the second sort. I am not 100% clear if this repartition is
required -- if it is unnecessarily added, that is a problem of
`EnforceDistribution`, not this rule. Even in cases where a repartitioning is
actually required, the right thing to do would be to use a sort-preserving
repartitioning, which would solve problems like this; the second sort will not
come into the picture when we use the appropriate repartitioning executor. FYI,
we are working on sort-preserving repartitioning currently.
In the future, as we have more information available to us (e.g. are we
using hashing or sorting for `PARTITION BY`s, does the raw input have a
compatible ordering, what is our parallelism configuration, etc.), we can make
the decision as to whether we want to have a single executor or multiple
executors in cases like this. I would be definitely on board with that.
However; the default choice (in the absence of these pieces of information)
should be not to make any assumptions, so I agree with @mustafasrepo for the
time being.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]