mingmwang commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1095464959
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1748,17 +1748,20 @@ async fn test_window_partition_by_order_by() ->
Result<()> {
let msg = format!("Creating logical plan for '{}'", sql);
let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await.unwrap();
+ let physical_plan = dataframe.create_physical_plan().await?;
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
let expected = {
vec![
- "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field {
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }]",
- " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
+ " WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name:
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
+ " SortExec: [c1@1 ASC NULLS LAST,c2@2 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2)",
- " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 1 }], 2)",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c4):
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}]",
+ " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
Review Comment:
Ok, I find out the place. Could you please explain why we need to differ
the sort keys are from `PARTITION BY` or `ORDER BY `? This change cause the
window expressions are
evaluated in two `WindowAggExec`.
```rust
/// (expr, "is the SortExpr for window (either comes from PARTITION BY or
ORDER BY columns)")
/// if bool is true SortExpr comes from `PARTITION BY` column, if false
comes from `ORDER BY` column
type WindowSortKey = Vec<(Expr, bool)>;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]