alamb commented on code in PR #5434:
URL: https://github.com/apache/arrow-datafusion/pull/5434#discussion_r1120885388
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -61,912 +61,6 @@ async fn window_frame_creation_type_checking() ->
Result<()> {
).await
}
-#[tokio::test]
-async fn test_window_agg_sort() -> Result<()> {
- // We need to specify the target partition number.
- // Otherwise, the default value used may vary on different environment
- // with different cpu core number, which may cause the UT failure.
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- c9,
- SUM(c9) OVER(ORDER BY c9) as sum1,
- SUM(c9) OVER(ORDER BY c9, c8) as sum2
- FROM aggregate_test_100";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum2]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
- );
- Ok(())
-}
-
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_csv(&ctx).await?;
-
- let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9)
OVER (ORDER BY c2, c9) from aggregate_test_100";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@14 as MAX(aggregate_test_100.c9),
SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@15 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as
MIN(aggregate_test_100.c9)]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }]",
- " BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9):
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9):
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
- );
- Ok(())
-}
-
-/// FIXME: for now we are not detecting prefix of sorting keys in order to
re-arrange with global and save one SortExec
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_global_order_compacting() ->
Result<()> {
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_csv(&ctx).await?;
-
- let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (),
MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // 3 SortExec are added
- let expected = {
- vec![
- "SortExec: expr=[c2@0 ASC NULLS LAST]",
- " ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9),
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@13 as MIN(aggregate_test_100.c9)]",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }]",
- " BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9):
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[c9@8 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9):
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS
LAST]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
- );
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_partition_by_order_by() -> Result<()> {
- let ctx = SessionContext::with_config(
- SessionConfig::new()
- .with_target_partitions(2)
- .with_batch_size(4096),
- );
- register_aggregate_csv(&ctx).await?;
-
- let sql = "SELECT \
- SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING),\
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) \
- FROM aggregate_test_100";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@14 as COUNT(UInt8(1))]",
- " BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name:
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4):
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS
LAST]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2),
input_partitions=2",
- " RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
- );
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan() -> Result<()> {
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- c9,
- SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as
sum1,
- SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as
sum2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@13 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1))
}]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: expr=[c9@8 DESC]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------------+-------------+-------------+",
- "| c9 | sum1 | sum2 |",
- "+------------+-------------+-------------+",
- "| 4268716378 | 8498370520 | 24997484146 |",
- "| 4229654142 | 12714811027 | 29012926487 |",
- "| 4216440507 | 16858984380 | 28743001064 |",
- "| 4144173353 | 20935849039 | 28472563256 |",
- "| 4076864659 | 24997484146 | 28118515915 |",
- "+------------+-------------+-------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- c9,
- FIRST_VALUE(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING) as fv1,
- FIRST_VALUE(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING) as fv2,
- LAG(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lag1,
- LAG(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1
FOLLOWING) as lag2,
- LEAD(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lead1,
- LEAD(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1
FOLLOWING) as lead2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c9@8 as c9,
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as fv1,
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as fv2,
LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@17 as lag1, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1
FOLLOWING@14 as lag2, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER
BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@18 as lead1, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))
ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND
1 FOLLOWING@15 as lead2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec:
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name:
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1))
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name:
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: CurrentRow, end_bound:
Following(UInt32(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)):
Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\",
data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow,
end_bound: Following(UInt32(NULL)) }]",
- " BoundedWindowAggExec:
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name:
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name:
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound:
Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)):
Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\",
data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }]",
- " SortExec: expr=[c9@8 DESC]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
-
"+------------+------------+------------+------------+------------+------------+------------+",
- "| c9 | fv1 | fv2 | lag1 | lag2 |
lead1 | lead2 |",
-
"+------------+------------+------------+------------+------------+------------+------------+",
- "| 4268716378 | 4229654142 | 4268716378 | 4216440507 | 10101 |
10101 | 4216440507 |",
- "| 4229654142 | 4216440507 | 4268716378 | 4144173353 | 10101 |
10101 | 4144173353 |",
- "| 4216440507 | 4144173353 | 4229654142 | 4076864659 | 4268716378 |
4268716378 | 4076864659 |",
- "| 4144173353 | 4076864659 | 4216440507 | 4061635107 | 4229654142 |
4229654142 | 4061635107 |",
- "| 4076864659 | 4061635107 | 4144173353 | 4015442341 | 4216440507 |
4216440507 | 4015442341 |",
-
"+------------+------------+------------+------------+------------+------------+------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- c9,
- ROW_NUMBER() OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING) as rn1,
- ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING) as rn2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // We cannot reverse each window function (ROW_NUMBER is not reversible)
- let expected = {
- vec![
- "ProjectionExec: expr=[c9@8 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@14 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as rn2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: expr=[c9@8 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: expr=[c9@8 DESC]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-----------+-----+-----+",
- "| c9 | rn1 | rn2 |",
- "+-----------+-----+-----+",
- "| 28774375 | 1 | 100 |",
- "| 63044568 | 2 | 99 |",
- "| 141047417 | 3 | 98 |",
- "| 141680161 | 4 | 97 |",
- "| 145294611 | 5 | 96 |",
- "+-----------+-----+-----+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- c9,
- SUM(c9) OVER(ORDER BY c9 ASC, c1 ASC, c2 ASC ROWS BETWEEN 1 PRECEDING AND
5 FOLLOWING) as sum1,
- SUM(c9) OVER(ORDER BY c9 DESC, c1 DESC ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING) as sum2,
- ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING) as rn2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // We cannot reverse each window function (ROW_NUMBER is not reversible)
- let expected = {
- vec![
- "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS
LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@15 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2, ROW_NUMBER() ORDER
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@14 as rn2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: expr=[c9@8 ASC NULLS LAST,c1@0 ASC NULLS
LAST,c2@1 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: expr=[c9@8 DESC,c1@0 DESC]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-----------+------------+-----------+-----+",
- "| c9 | sum1 | sum2 | rn2 |",
- "+-----------+------------+-----------+-----+",
- "| 28774375 | 745354217 | 91818943 | 100 |",
- "| 63044568 | 988558066 | 232866360 | 99 |",
- "| 141047417 | 1285934966 | 374546521 | 98 |",
- "| 141680161 | 1654839259 | 519841132 | 97 |",
- "| 145294611 | 1980231675 | 745354217 | 96 |",
- "+-----------+------------+-----------+-----+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_complex_plan() -> Result<()> {
- let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
- register_aggregate_null_cases_csv(&ctx).await?;
- let sql = "SELECT
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as
a,
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as
b,
- SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11
FOLLOWING) as c,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11
FOLLOWING) as d,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND
11 FOLLOWING) as e,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND
11 FOLLOWING) as f,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11
FOLLOWING) as g,
- SUM(c1) OVER (ORDER BY c3) as h,
- SUM(c1) OVER (ORDER BY c3 DESC) as i,
- SUM(c1) OVER (ORDER BY c3 NULLS first) as j,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l,
- SUM(c1) OVER (ORDER BY c3, c2) as m,
- SUM(c1) OVER (ORDER BY c3, c1 DESC) as n,
- SUM(c1) OVER (ORDER BY c3 DESC, c1) as o,
- SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p,
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11
FOLLOWING) as a1,
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11
FOLLOWING) as b1,
- SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11
FOLLOWING) as c1,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING
AND 11 FOLLOWING) as d1,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED
PRECEDING AND 11 FOLLOWING) as e1,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED
PRECEDING AND 11 FOLLOWING) as f1,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING
AND 11 FOLLOWING) as g1,
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current
row) as h1,
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current
row) as j1,
- SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND
current row) as k1,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING
AND current row) as l1,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED
PRECEDING AND current row) as m1,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED
PRECEDING AND current row) as n1,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING
AND current row) as o1,
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED
FOLLOWING) as h11,
- SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED
FOLLOWING) as j11,
- SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED
FOLLOWING) as k11,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND
UNBOUNDED FOLLOWING) as l11,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND
UNBOUNDED FOLLOWING) as m11,
- SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND
UNBOUNDED FOLLOWING) as n11,
- SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND
UNBOUNDED FOLLOWING) as o11
- FROM null_cases
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Unnecessary SortExecs are removed
- let expected = {
- vec![
- "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10
PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10
PRECEDING AND 11 FOLLOWING@12 as d, SUM(null_cases.c1) ORDER BY [null_cases.c3
DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@8 as e,
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10
PRECEDING AND 11 FOLLOWING@4 as f, SUM(null_cases.c1) ORDER BY [null_cases.c3
ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as g,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@20 as h, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FI
RST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as i,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@13 as j, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW@5 as k, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@9 as l, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@18 as m, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@16 as n, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@3 as o, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CUR
RENT ROW@17 as p, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@21 as a1, SUM(null_cases.c1)
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
11 FOLLOWING@21 as b1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@6 as c1,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND 11 FOLLOWING@14 as d1, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11
FOLLOWING@10 as e1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@6 as f1,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND 11 FOLLOWING@14 as g1, SUM(null_cases.c1) ORDER BY
[null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW@20 as h1, SUM(null_cases.
c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@20 as j1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as k1,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW@13 as l1, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW@9 as m1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as n1, SUM(null_cases.c1) ORDER
BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@13 as o1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS
LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@22 as h11,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING@22 as j11, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS
FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@7 as k11,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING@15 as l11, SUM(null_cases.c1) ORDER BY
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED
FOLLOWING@11 as m11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS
FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@7 as n11,
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN
CURRENT ROW AND UNBOUNDED FOLLOWING@15 as o11]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) },
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound:
CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)),
end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: CurrentRow,
end_bound: Following(Int64(NULL)) }]",
- " BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field {
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]",
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field {
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
- " WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) },
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: CurrentRow, end_bound:
Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(NULL)) },
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound:
Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " WindowAggExec: wdw=[SUM(null_cases.c1):
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(10)), end_bound:
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow },
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound:
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_boun
d: CurrentRow, end_bound: Following(Int64(NULL)) }]",
- " WindowAggExec: wdw=[SUM(null_cases.c1):
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(10)), end_bound:
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow },
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound:
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bo
und: CurrentRow, end_bound: Following(Int64(NULL)) }]",
- " BoundedWindowAggExec:
wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)),
end_bound: CurrentRow }]",
- " SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS
LAST]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_plan() ->
Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(false)
- .with_target_partitions(2);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- c9,
- SUM(c9) OVER(ORDER BY c1, c9 DESC ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING) as sum1,
- SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND
5 FOLLOWING) as sum2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@14 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------------+-------------+-------------+",
- "| c9 | sum1 | sum2 |",
- "+------------+-------------+-------------+",
- "| 4015442341 | 21907044499 | 21907044499 |",
- "| 3998790955 | 24576419362 | 24576419362 |",
- "| 3959216334 | 23063303501 | 23063303501 |",
- "| 3717551163 | 21560567246 | 21560567246 |",
- "| 3276123488 | 19815386638 | 19815386638 |",
- "+------------+-------------+-------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(false)
- .with_target_partitions(2);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- c9,
- SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND
5 FOLLOWING) as sum1,
- SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND
5 FOLLOWING) as sum2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@13 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1))
}]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------------+-------------+-------------+",
- "| c9 | sum1 | sum2 |",
- "+------------+-------------+-------------+",
- "| 4015442341 | 8014233296 | 21907044499 |",
- "| 3998790955 | 11973449630 | 24576419362 |",
- "| 3959216334 | 15691000793 | 23063303501 |",
- "| 3717551163 | 18967124281 | 21560567246 |",
- "| 3276123488 | 21907044499 | 19815386638 |",
- "+------------+-------------+-------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(false)
- .with_target_partitions(2);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT c3,
- SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
- SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
CurrentRow, end_bound: Following(Int16(NULL)) }]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
\"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int16(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[CAST(c3@2 AS Int16) + c4@3 DESC,c9@8
DESC,c2@1 ASC NULLS LAST]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-----+-------------+--------------+",
- "| c3 | sum1 | sum2 |",
- "+-----+-------------+--------------+",
- "| -86 | 2861911482 | 222089770060 |",
- "| 13 | 5075947208 | 219227858578 |",
- "| 125 | 8701233618 | 217013822852 |",
- "| 123 | 11293564174 | 213388536442 |",
- "| 97 | 14767488750 | 210796205886 |",
- "+-----+-------------+--------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
- let config = SessionConfig::new()
- .with_target_partitions(8)
- .with_batch_size(4096)
- .with_repartition_windows(true);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT count(*) as global_count FROM
- (SELECT count(*), c1
- FROM aggregate_test_100
- WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
- GROUP BY c1
- ORDER BY c1 ) AS a ";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Unnecessary Sort in the sub query is removed
- let expected = {
- vec![
- "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
- " AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
- " CoalescePartitionsExec",
- " AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]",
- " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=8",
- " ProjectionExec: expr=[c1@0 as c1]",
- " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as
c1], aggr=[COUNT(UInt8(1))]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }], 8), input_partitions=8",
- " AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[COUNT(UInt8(1))]",
- " ProjectionExec: expr=[c1@0 as c1]",
- " CoalesceBatchesExec:
target_batch_size=4096",
- " FilterExec: c13@1 !=
C2GT5KVyOPZpgKVl110TyZO0NcJ434",
- " RepartitionExec:
partitioning=RoundRobinBatch(8), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+--------------+",
- "| global_count |",
- "+--------------+",
- "| 5 |",
- "+--------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() ->
Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(false)
- .with_target_partitions(2);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT c3,
- SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1,
- SUM(c9) OVER(PARTITION BY c3 ORDER BY c9 DESC ) as sum2
- FROM aggregate_test_100
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC
NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
- " SortExec: expr=[c3@2 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-----+-------------+------------+",
- "| c3 | sum1 | sum2 |",
- "+-----+-------------+------------+",
- "| 125 | 3625286410 | 3625286410 |",
- "| 123 | 7192027599 | 3566741189 |",
- "| 123 | 9784358155 | 6159071745 |",
- "| 122 | 13845993262 | 4061635107 |",
- "| 120 | 16676974334 | 2830981072 |",
- "+-----+-------------+------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort() -> Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(true)
- .with_target_partitions(2)
- .with_repartition_sorts(true);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM
aggregate_test_100 ORDER BY c1 ASC";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
- " ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@13 as rn1]",
- " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
- " SortExec: expr=[c1@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()>
{
- let config = SessionConfig::new()
- .with_repartition_windows(true)
- .with_target_partitions(2)
- .with_repartition_sorts(false);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM
aggregate_test_100 ORDER BY c1 ASC";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "SortExec: expr=[c1@0 ASC NULLS LAST]",
- " CoalescePartitionsExec",
- " ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@13 as rn1]",
- " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
- " SortExec: expr=[c1@0 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_intermediate_parallel_sort() ->
Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(true)
- .with_target_partitions(2)
- .with_repartition_sorts(true);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT c1, \
- SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND
3 FOLLOWING) as sum1, \
- SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as
sum2 \
- FROM aggregate_test_100 ORDER BY c1 ASC";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "SortExec: expr=[c1@0 ASC NULLS LAST]",
- " ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@13 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortPreservingMergeExec: [c9@8 ASC NULLS LAST]",
- " SortExec: expr=[c9@8 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3))
}]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 ASC NULLS
LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }], 2), input_partitions=2",
- " RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_with_global_limit() -> Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(false)
- .with_target_partitions(1);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM
aggregate_test_100 ORDER BY c13 LIMIT 1)";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Only 1 SortExec was added
- let expected = {
- vec![
- "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as
array_agg1]",
- " AggregateExec: mode=Final, gby=[],
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
- " AggregateExec: mode=Partial, gby=[],
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
- " GlobalLimitExec: skip=0, fetch=1",
- " SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+----------------------------------+",
- "| array_agg1 |",
- "+----------------------------------+",
- "| [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm] |",
- "+----------------------------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_low_cardinality() -> Result<()> {
- let config = SessionConfig::new().with_target_partitions(32);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT
- SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING
AND 3 FOLLOWING) as summation1,
- SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING) as summation2
- FROM aggregate_test_100
- ORDER BY c9
- LIMIT 5";
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------------+-------------+",
- "| summation1 | summation2 |",
- "+------------+-------------+",
- "| -16110 | 61035129 |",
- "| 3917 | -108973366 |",
- "| -16974 | 623103518 |",
- "| -1114 | -1927628110 |",
- "| 15673 | -1899175111 |",
- "+------------+-------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {
Review Comment:
this file has only a few hundred lines after this one 😅
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]