alamb commented on code in PR #5434:
URL: https://github.com/apache/arrow-datafusion/pull/5434#discussion_r1120885388


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -61,912 +61,6 @@ async fn window_frame_creation_type_checking() -> 
Result<()> {
     ).await
 }
 
-#[tokio::test]
-async fn test_window_agg_sort() -> Result<()> {
-    // We need to specify the target partition number.
-    // Otherwise, the default value used may vary on different environment
-    // with different cpu core number, which may cause the UT failure.
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-      c9,
-      SUM(c9) OVER(ORDER BY c9) as sum1,
-      SUM(c9) OVER(ORDER BY c9, c8) as sum2
-      FROM aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum2]",
-            "  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field 
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) 
OVER (ORDER BY c2, c9) from aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@14 as MAX(aggregate_test_100.c9), 
SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@15 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as 
MIN(aggregate_test_100.c9)]",
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]",
-            "    BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): 
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): 
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-/// FIXME: for now we are not detecting prefix of sorting keys in order to 
re-arrange with global and save one SortExec
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_global_order_compacting() -> 
Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (), 
MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // 3 SortExec are added
-    let expected = {
-        vec![
-            "SortExec: expr=[c2@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as 
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9), 
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@13 as MIN(aggregate_test_100.c9)]",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): 
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c9@8 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): 
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS 
LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_partition_by_order_by() -> Result<()> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new()
-            .with_target_partitions(2)
-            .with_batch_size(4096),
-    );
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT \
-               SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING),\
-               COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING) \
-               FROM aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@14 as COUNT(UInt8(1))]",
-            "  BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
-            "    SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): 
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS 
LAST]",
-            "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2), 
input_partitions=2",
-            "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum1,
-    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@13 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4268716378 | 8498370520  | 24997484146 |",
-        "| 4229654142 | 12714811027 | 29012926487 |",
-        "| 4216440507 | 16858984380 | 28743001064 |",
-        "| 4144173353 | 20935849039 | 28472563256 |",
-        "| 4076864659 | 24997484146 | 28118515915 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    FIRST_VALUE(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv1,
-    FIRST_VALUE(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv2,
-    LAG(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lag1,
-    LAG(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lag2,
-    LEAD(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lead1,
-    LEAD(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lead2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, 
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as fv1, 
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as fv2, 
LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@17 as lag1, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER 
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING@14 as lag2, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@18 as lead1, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) 
ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 
1 FOLLOWING@15 as lead2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: 
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: 
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(UInt32(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): 
Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", 
data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, 
end_bound: Following(UInt32(NULL)) }]",
-            "      BoundedWindowAggExec: 
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: 
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: 
Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): 
Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", 
data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }]",
-            "        SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        
"+------------+------------+------------+------------+------------+------------+------------+",
-        "| c9         | fv1        | fv2        | lag1       | lag2       | 
lead1      | lead2      |",
-        
"+------------+------------+------------+------------+------------+------------+------------+",
-        "| 4268716378 | 4229654142 | 4268716378 | 4216440507 | 10101      | 
10101      | 4216440507 |",
-        "| 4229654142 | 4216440507 | 4268716378 | 4144173353 | 10101      | 
10101      | 4144173353 |",
-        "| 4216440507 | 4144173353 | 4229654142 | 4076864659 | 4268716378 | 
4268716378 | 4076864659 |",
-        "| 4144173353 | 4076864659 | 4216440507 | 4061635107 | 4229654142 | 
4229654142 | 4061635107 |",
-        "| 4076864659 | 4061635107 | 4144173353 | 4015442341 | 4216440507 | 
4216440507 | 4015442341 |",
-        
"+------------+------------+------------+------------+------------+------------+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    ROW_NUMBER() OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn1,
-    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // We cannot reverse each window function (ROW_NUMBER is not reversible)
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as rn2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST]",
-            "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----------+-----+-----+",
-        "| c9        | rn1 | rn2 |",
-        "+-----------+-----+-----+",
-        "| 28774375  | 1   | 100 |",
-        "| 63044568  | 2   | 99  |",
-        "| 141047417 | 3   | 98  |",
-        "| 141680161 | 4   | 97  |",
-        "| 145294611 | 5   | 96  |",
-        "+-----------+-----+-----+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c9 ASC, c1 ASC, c2 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
-    SUM(c9) OVER(ORDER BY c9 DESC, c1 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum2,
-    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // We cannot reverse each window function (ROW_NUMBER is not reversible)
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS 
LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@15 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2, ROW_NUMBER() ORDER 
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as rn2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST,c1@0 ASC NULLS 
LAST,c2@1 ASC NULLS LAST]",
-            "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "            SortExec: expr=[c9@8 DESC,c1@0 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----------+------------+-----------+-----+",
-        "| c9        | sum1       | sum2      | rn2 |",
-        "+-----------+------------+-----------+-----+",
-        "| 28774375  | 745354217  | 91818943  | 100 |",
-        "| 63044568  | 988558066  | 232866360 | 99  |",
-        "| 141047417 | 1285934966 | 374546521 | 98  |",
-        "| 141680161 | 1654839259 | 519841132 | 97  |",
-        "| 145294611 | 1980231675 | 745354217 | 96  |",
-        "+-----------+------------+-----------+-----+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_complex_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_null_cases_csv(&ctx).await?;
-    let sql = "SELECT
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
a,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
b,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as c,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as d,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as e,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as f,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as g,
-    SUM(c1) OVER (ORDER BY c3) as h,
-    SUM(c1) OVER (ORDER BY c3 DESC) as i,
-    SUM(c1) OVER (ORDER BY c3 NULLS first) as j,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l,
-    SUM(c1) OVER (ORDER BY c3, c2) as m,
-    SUM(c1) OVER (ORDER BY c3, c1 DESC) as n,
-    SUM(c1) OVER (ORDER BY c3 DESC, c1) as o,
-    SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as a1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as b1,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as c1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as d1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as e1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as f1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as g1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as h1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as j1,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 
current row) as k1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as l1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as m1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as n1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as o1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as h11,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as j11,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as k11,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as l11,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as m11,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as n11,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as o11
-    FROM null_cases
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Unnecessary SortExecs are removed
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 
PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 
DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 
PRECEDING AND 11 FOLLOWING@12 as d, SUM(null_cases.c1) ORDER BY [null_cases.c3 
DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@8 as e, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 
PRECEDING AND 11 FOLLOWING@4 as f, SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as g, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@20 as h, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS FI
 RST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as i, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@13 as j, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW@5 as k, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@9 as l, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@18 as m, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@16 as n, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@3 as o, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CUR
 RENT ROW@17 as p, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@21 as a1, SUM(null_cases.c1) 
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
11 FOLLOWING@21 as b1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS 
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@6 as c1, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND 11 FOLLOWING@14 as d1, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING@10 as e1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS 
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@6 as f1, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND 11 FOLLOWING@14 as g1, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW@20 as h1, SUM(null_cases.
 c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@20 as j1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS 
FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as k1, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW@13 as l1, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW@9 as m1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as n1, SUM(null_cases.c1) ORDER 
BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@13 as o1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS 
LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@22 as h11, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING@22 as j11, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS 
 FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@7 as k11, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING@15 as l11, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED 
FOLLOWING@11 as m11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS 
FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@7 as n11, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING@15 as o11]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: 
CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), 
end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: CurrentRow,
  end_bound: Following(Int64(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field 
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]",
-            "              WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field 
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(NULL)) }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound:
  Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "                  WindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_boun
 d: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                    WindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bo
 und: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                      BoundedWindowAggExec: 
wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), 
end_bound: CurrentRow }]",
-            "                        SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS 
LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> 
Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c1, c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum1,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4015442341 | 21907044499 | 21907044499 |",
-        "| 3998790955 | 24576419362 | 24576419362 |",
-        "| 3959216334 | 23063303501 | 23063303501 |",
-        "| 3717551163 | 21560567246 | 21560567246 |",
-        "| 3276123488 | 19815386638 | 19815386638 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@13 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4015442341 | 8014233296  | 21907044499 |",
-        "| 3998790955 | 11973449630 | 24576419362 |",
-        "| 3959216334 | 15691000793 | 23063303501 |",
-        "| 3717551163 | 18967124281 | 21560567246 |",
-        "| 3276123488 | 21907044499 | 19815386638 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c3,
-    SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
-    SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
CurrentRow, end_bound: Following(Int16(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
\"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int16(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[CAST(c3@2 AS Int16) + c4@3 DESC,c9@8 
DESC,c2@1 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----+-------------+--------------+",
-        "| c3  | sum1        | sum2         |",
-        "+-----+-------------+--------------+",
-        "| -86 | 2861911482  | 222089770060 |",
-        "| 13  | 5075947208  | 219227858578 |",
-        "| 125 | 8701233618  | 217013822852 |",
-        "| 123 | 11293564174 | 213388536442 |",
-        "| 97  | 14767488750 | 210796205886 |",
-        "+-----+-------------+--------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_target_partitions(8)
-        .with_batch_size(4096)
-        .with_repartition_windows(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT count(*) as global_count FROM
-                 (SELECT count(*), c1
-                  FROM aggregate_test_100
-                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
-                  GROUP BY c1
-                  ORDER BY c1 ) AS a ";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Unnecessary Sort in the sub query is removed
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
-            "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
-            "    CoalescePartitionsExec",
-            "      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]",
-            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=8",
-            "          ProjectionExec: expr=[c1@0 as c1]",
-            "            AggregateExec: mode=FinalPartitioned, gby=[c1@0 as 
c1], aggr=[COUNT(UInt8(1))]",
-            "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 8), input_partitions=8",
-            "                  AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]",
-            "                    ProjectionExec: expr=[c1@0 as c1]",
-            "                      CoalesceBatchesExec: 
target_batch_size=4096",
-            "                        FilterExec: c13@1 != 
C2GT5KVyOPZpgKVl110TyZO0NcJ434",
-            "                          RepartitionExec: 
partitioning=RoundRobinBatch(8), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------------+",
-        "| global_count |",
-        "+--------------+",
-        "| 5            |",
-        "+--------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> 
Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c3,
-    SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1,
-    SUM(c9) OVER(PARTITION BY c3 ORDER BY c9 DESC ) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC 
NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c3@2 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----+-------------+------------+",
-        "| c3  | sum1        | sum2       |",
-        "+-----+-------------+------------+",
-        "| 125 | 3625286410  | 3625286410 |",
-        "| 123 | 7192027599  | 3566741189 |",
-        "| 123 | 9784358155  | 6159071745 |",
-        "| 122 | 13845993262 | 4061635107 |",
-        "| 120 | 16676974334 | 2830981072 |",
-        "+-----+-------------+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@13 as rn1]",
-            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "      SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "        CoalesceBatchesExec: target_batch_size=8192",
-            "          RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
-            "            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> 
{
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(false);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  CoalescePartitionsExec",
-            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@13 as rn1]",
-            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "          CoalesceBatchesExec: target_batch_size=8192",
-            "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
-            "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_intermediate_parallel_sort() -> 
Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, \
-    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
3 FOLLOWING) as sum1, \
-    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2 \
-    FROM aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@13 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "      SortPreservingMergeExec: [c9@8 ASC NULLS LAST]",
-            "        SortExec: expr=[c9@8 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 ASC NULLS 
LAST]",
-            "              CoalesceBatchesExec: target_batch_size=8192",
-            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_with_global_limit() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(1);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM 
aggregate_test_100 ORDER BY c13 LIMIT 1)";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as 
array_agg1]",
-            "  AggregateExec: mode=Final, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
-            "    AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
-            "      GlobalLimitExec: skip=0, fetch=1",
-            "        SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+----------------------------------+",
-        "| array_agg1                       |",
-        "+----------------------------------+",
-        "| [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm] |",
-        "+----------------------------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_low_cardinality() -> Result<()> {
-    let config = SessionConfig::new().with_target_partitions(32);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-        SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING 
AND 3 FOLLOWING) as summation1,
-        SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) as summation2
-    FROM aggregate_test_100
-    ORDER BY c9
-    LIMIT 5";
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+",
-        "| summation1 | summation2  |",
-        "+------------+-------------+",
-        "| -16110     | 61035129    |",
-        "| 3917       | -108973366  |",
-        "| -16974     | 623103518   |",
-        "| -1114      | -1927628110 |",
-        "| 15673      | -1899175111 |",
-        "+------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
 fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {

Review Comment:
   this file has only a few hundred lines after this one 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to