mingmwang commented on code in PR #4643:
URL: https://github.com/apache/arrow-datafusion/pull/4643#discussion_r1050531869
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1642,7 +1642,120 @@ async fn test_window_agg_sort() -> Result<()> {
assert_eq!(
expected, actual_trim_last,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
- expected, actual
+ expected, actual_trim_last
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
+ let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
+ register_aggregate_csv(&ctx).await?;
+
+ let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9)
OVER (ORDER BY c2, c9) from aggregate_test_100";
+
+ let msg = format!("Creating logical plan for '{}'", sql);
+ let plan = ctx.create_logical_plan(sql).expect(&msg);
+ let state = ctx.state();
+ let logical_plan = state.optimize(&plan)?;
+ let physical_plan = state.create_physical_plan(&logical_plan).await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ // Only 1 SortExec was added
+ let expected = {
+ vec![
+ "ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9),
SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as
MIN(aggregate_test_100.c9)]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }]",
+ " WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field {
name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field {
name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]"
+ ]
+ };
+
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual_len = actual.len();
+ let actual_trim_last = &actual[..actual_len - 1];
+ assert_eq!(
+ expected, actual_trim_last,
+ "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ expected, actual_trim_last
+ );
+ Ok(())
+}
+
+/// FIXME: for now we are not detecting prefix of sorting keys in order to
re-arrange with global and save one SortExec
+#[tokio::test]
+async fn over_order_by_sort_keys_sorting_global_order_compacting() ->
Result<()> {
+ let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
+ register_aggregate_csv(&ctx).await?;
+
+ let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (),
MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2";
+ let msg = format!("Creating logical plan for '{}'", sql);
+ let plan = ctx.create_logical_plan(sql).expect(&msg);
+ let state = ctx.state();
+ let logical_plan = state.optimize(&plan)?;
+ let physical_plan = state.create_physical_plan(&logical_plan).await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ // 3 SortExec are added
+ let expected = {
+ vec![
+ "SortExec: [c2@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9),
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@2 as MIN(aggregate_test_100.c9)]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2)",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }]",
+ " WindowAggExec: wdw=[MAX(aggregate_test_100.c9):
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c9@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ " WindowAggExec: wdw=[MIN(aggregate_test_100.c9):
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
+ " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS
LAST]",
+ ]
+ };
+
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual_len = actual.len();
+ let actual_trim_last = &actual[..actual_len - 1];
+ assert_eq!(
+ expected, actual_trim_last,
+ "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ expected, actual_trim_last
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_window_partition_by_order_by() -> Result<()> {
+ let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
+ register_aggregate_csv(&ctx).await?;
+
+ let sql = "SELECT \
+ SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING),\
+ COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) \
+ FROM aggregate_test_100";
+
+ let msg = format!("Creating logical plan for '{}'", sql);
+ let plan = ctx.create_logical_plan(sql).expect(&msg);
+ let state = ctx.state();
+ let logical_plan = state.optimize(&plan)?;
+ let physical_plan = state.create_physical_plan(&logical_plan).await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ // Only 1 SortExec was added
+ let expected = {
+ vec![
+ "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]",
+ " WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field {
name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }]",
+ " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=4096",
Review Comment:
Yes, without this PR, the original physical plan added additional
RepartitionExecs. this was because the additional Sort causes the two
WindowAggExecs can not be collapsed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]