mustafasrepo commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1096007564
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1748,17 +1748,20 @@ async fn test_window_partition_by_order_by() ->
Result<()> {
let msg = format!("Creating logical plan for '{}'", sql);
let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await.unwrap();
+ let physical_plan = dataframe.create_physical_plan().await?;
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field {
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }]",
- " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+ " WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name:
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
+ " SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 1 }], 2)",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c4):
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}]",
+ " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
Review Comment:
I have changed the query to reflect use case better.
`PARTITION BY` aware rule produces the plan below
```
"ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
\"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
" SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
```
previously it was producing the plan below
```
"ProjectionExec: expr=[c3@1 as c3, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2]",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
\"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" SortExec: [c3@1 ASC,c9@2 DESC]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
" SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
```
As you say, previously the query
```
SELECT SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 PRECEDING
AND 1 FOLLOWING),
COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING)
FROM aggregate_test_100
```
was producing single `WindowAggExec`. However, with the change in this PR we
no longer put window expressions `OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING)` and `COUNT(*) OVER(PARTITION BY c1 ORDER
BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)` to the same executor. Because
of this change new rule produces less optimized plan for this specific change.
However, I think unless `PARTITION BY` columns and `ORDER BY` are exactly
equal, different aggregations shouldn't end up in the same executor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]