This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a95e0ec2f minor: Port more window tests to sqlogictests  (#5434)
a95e0ec2f is described below

commit a95e0ec2fd929aae1c2f67148243eb4825d81a3b
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 2 16:12:04 2023 +0100

    minor: Port more window tests to sqlogictests  (#5434)
    
    * Port more window tests to slt
    
    * Update tests
    
    * remove dead code
    
    * Fix test to be deterministic
---
 datafusion/core/tests/sql/mod.rs                   |  17 -
 datafusion/core/tests/sql/window.rs                | 906 ---------------------
 .../core/tests/sqllogictests/test_files/window.slt | 882 +++++++++++++++++++-
 3 files changed, 865 insertions(+), 940 deletions(-)

diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index d8b6a83f2..456dc12a1 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -1051,23 +1051,6 @@ async fn register_aggregate_simple_csv(ctx: 
&SessionContext) -> Result<()> {
     Ok(())
 }
 
-async fn register_aggregate_null_cases_csv(ctx: &SessionContext) -> Result<()> 
{
-    // It's not possible to use aggregate_test_100, not enought similar values 
to test grouping on floats
-    let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Int64, true),
-        Field::new("c2", DataType::Float64, true),
-        Field::new("c3", DataType::Int64, false),
-    ]));
-
-    ctx.register_csv(
-        "null_cases",
-        "tests/data/null_cases.csv",
-        CsvReadOptions::new().schema(&schema),
-    )
-    .await?;
-    Ok(())
-}
-
 async fn register_aggregate_csv(ctx: &SessionContext) -> Result<()> {
     let testdata = datafusion::test_util::arrow_test_data();
     let schema = test_util::aggr_test_schema();
diff --git a/datafusion/core/tests/sql/window.rs 
b/datafusion/core/tests/sql/window.rs
index 5cd91e46e..8f95eba57 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -61,912 +61,6 @@ async fn window_frame_creation_type_checking() -> 
Result<()> {
     ).await
 }
 
-#[tokio::test]
-async fn test_window_agg_sort() -> Result<()> {
-    // We need to specify the target partition number.
-    // Otherwise, the default value used may vary on different environment
-    // with different cpu core number, which may cause the UT failure.
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-      c9,
-      SUM(c9) OVER(ORDER BY c9) as sum1,
-      SUM(c9) OVER(ORDER BY c9, c8) as sum2
-      FROM aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum2]",
-            "  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field 
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) 
OVER (ORDER BY c2, c9) from aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@14 as MAX(aggregate_test_100.c9), 
SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@15 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as [...]
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]",
-            "    BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): 
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): 
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-/// FIXME: for now we are not detecting prefix of sorting keys in order to 
re-arrange with global and save one SortExec
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_global_order_compacting() -> 
Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (), 
MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // 3 SortExec are added
-    let expected = {
-        vec![
-            "SortExec: expr=[c2@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as 
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9), 
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN U [...]
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): 
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c9@8 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): 
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS 
LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_partition_by_order_by() -> Result<()> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new()
-            .with_target_partitions(2)
-            .with_batch_size(4096),
-    );
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT \
-               SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING),\
-               COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING) \
-               FROM aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@14 as COUNT(UInt8(1))]",
-            "  BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
-            "    SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): 
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS 
LAST]",
-            "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2), 
input_partitions=2",
-            "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum1,
-    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@13 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4268716378 | 8498370520  | 24997484146 |",
-        "| 4229654142 | 12714811027 | 29012926487 |",
-        "| 4216440507 | 16858984380 | 28743001064 |",
-        "| 4144173353 | 20935849039 | 28472563256 |",
-        "| 4076864659 | 24997484146 | 28118515915 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    FIRST_VALUE(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv1,
-    FIRST_VALUE(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv2,
-    LAG(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lag1,
-    LAG(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lag2,
-    LEAD(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lead1,
-    LEAD(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lead2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, 
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as fv1, 
FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as fv2, 
LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@17 as lag1, LAG(aggregate_ [...]
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: 
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: 
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, 
nullable: true, dict_id [...]
-            "      BoundedWindowAggExec: 
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: 
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, 
nullable: true, dict_ [...]
-            "        SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        
"+------------+------------+------------+------------+------------+------------+------------+",
-        "| c9         | fv1        | fv2        | lag1       | lag2       | 
lead1      | lead2      |",
-        
"+------------+------------+------------+------------+------------+------------+------------+",
-        "| 4268716378 | 4229654142 | 4268716378 | 4216440507 | 10101      | 
10101      | 4216440507 |",
-        "| 4229654142 | 4216440507 | 4268716378 | 4144173353 | 10101      | 
10101      | 4144173353 |",
-        "| 4216440507 | 4144173353 | 4229654142 | 4076864659 | 4268716378 | 
4268716378 | 4076864659 |",
-        "| 4144173353 | 4076864659 | 4216440507 | 4061635107 | 4229654142 | 
4229654142 | 4061635107 |",
-        "| 4076864659 | 4061635107 | 4144173353 | 4015442341 | 4216440507 | 
4216440507 | 4015442341 |",
-        
"+------------+------------+------------+------------+------------+------------+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    ROW_NUMBER() OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn1,
-    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // We cannot reverse each window function (ROW_NUMBER is not reversible)
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as rn2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST]",
-            "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----------+-----+-----+",
-        "| c9        | rn1 | rn2 |",
-        "+-----------+-----+-----+",
-        "| 28774375  | 1   | 100 |",
-        "| 63044568  | 2   | 99  |",
-        "| 141047417 | 3   | 98  |",
-        "| 141680161 | 4   | 97  |",
-        "| 145294611 | 5   | 96  |",
-        "+-----------+-----+-----+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c9 ASC, c1 ASC, c2 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
-    SUM(c9) OVER(ORDER BY c9 DESC, c1 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum2,
-    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // We cannot reverse each window function (ROW_NUMBER is not reversible)
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS 
LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@15 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2, ROW_NUMBER() ORDER 
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BET [...]
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST,c1@0 ASC NULLS 
LAST,c2@1 ASC NULLS LAST]",
-            "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "            SortExec: expr=[c9@8 DESC,c1@0 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----------+------------+-----------+-----+",
-        "| c9        | sum1       | sum2      | rn2 |",
-        "+-----------+------------+-----------+-----+",
-        "| 28774375  | 745354217  | 91818943  | 100 |",
-        "| 63044568  | 988558066  | 232866360 | 99  |",
-        "| 141047417 | 1285934966 | 374546521 | 98  |",
-        "| 141680161 | 1654839259 | 519841132 | 97  |",
-        "| 145294611 | 1980231675 | 745354217 | 96  |",
-        "+-----------+------------+-----------+-----+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_complex_plan() -> Result<()> {
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_null_cases_csv(&ctx).await?;
-    let sql = "SELECT
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
a,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
b,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as c,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as d,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as e,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as f,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as g,
-    SUM(c1) OVER (ORDER BY c3) as h,
-    SUM(c1) OVER (ORDER BY c3 DESC) as i,
-    SUM(c1) OVER (ORDER BY c3 NULLS first) as j,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l,
-    SUM(c1) OVER (ORDER BY c3, c2) as m,
-    SUM(c1) OVER (ORDER BY c3, c1 DESC) as n,
-    SUM(c1) OVER (ORDER BY c3 DESC, c1) as o,
-    SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as a1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as b1,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as c1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as d1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as e1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as f1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as g1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as h1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as j1,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 
current row) as k1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as l1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as m1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as n1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as o1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as h11,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as j11,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as k11,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as l11,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as m11,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as n11,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as o11
-    FROM null_cases
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Unnecessary SortExecs are removed
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 
PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 
DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 
PRECEDING AND 11 FOLLOWING@12 as d, SUM(nul [...]
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preced [...]
-            "      BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field 
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]",
-            "              WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field 
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_b [...]
-            "                  WindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start [...]
-            "                    WindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, sta 
[...]
-            "                      BoundedWindowAggExec: 
wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), 
end_bound: CurrentRow }]",
-            "                        SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS 
LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> 
Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c1, c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum1,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4015442341 | 21907044499 | 21907044499 |",
-        "| 3998790955 | 24576419362 | 24576419362 |",
-        "| 3959216334 | 23063303501 | 23063303501 |",
-        "| 3717551163 | 21560567246 | 21560567246 |",
-        "| 3276123488 | 19815386638 | 19815386638 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@13 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4015442341 | 8014233296  | 21907044499 |",
-        "| 3998790955 | 11973449630 | 24576419362 |",
-        "| 3959216334 | 15691000793 | 23063303501 |",
-        "| 3717551163 | 18967124281 | 21560567246 |",
-        "| 3276123488 | 21907044499 | 19815386638 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c3,
-    SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
-    SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ord [...]
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
\"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBO [...]
-            "        SortExec: expr=[CAST(c3@2 AS Int16) + c4@3 DESC,c9@8 
DESC,c2@1 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----+-------------+--------------+",
-        "| c3  | sum1        | sum2         |",
-        "+-----+-------------+--------------+",
-        "| -86 | 2861911482  | 222089770060 |",
-        "| 13  | 5075947208  | 219227858578 |",
-        "| 125 | 8701233618  | 217013822852 |",
-        "| 123 | 11293564174 | 213388536442 |",
-        "| 97  | 14767488750 | 210796205886 |",
-        "+-----+-------------+--------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_target_partitions(8)
-        .with_batch_size(4096)
-        .with_repartition_windows(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT count(*) as global_count FROM
-                 (SELECT count(*), c1
-                  FROM aggregate_test_100
-                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
-                  GROUP BY c1
-                  ORDER BY c1 ) AS a ";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Unnecessary Sort in the sub query is removed
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
-            "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
-            "    CoalescePartitionsExec",
-            "      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]",
-            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=8",
-            "          ProjectionExec: expr=[c1@0 as c1]",
-            "            AggregateExec: mode=FinalPartitioned, gby=[c1@0 as 
c1], aggr=[COUNT(UInt8(1))]",
-            "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 8), input_partitions=8",
-            "                  AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]",
-            "                    ProjectionExec: expr=[c1@0 as c1]",
-            "                      CoalesceBatchesExec: 
target_batch_size=4096",
-            "                        FilterExec: c13@1 != 
C2GT5KVyOPZpgKVl110TyZO0NcJ434",
-            "                          RepartitionExec: 
partitioning=RoundRobinBatch(8), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------------+",
-        "| global_count |",
-        "+--------------+",
-        "| 5            |",
-        "+--------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> 
Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c3,
-    SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1,
-    SUM(c9) OVER(PARTITION BY c3 ORDER BY c9 DESC ) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC 
NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c3@2 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----+-------------+------------+",
-        "| c3  | sum1        | sum2       |",
-        "+-----+-------------+------------+",
-        "| 125 | 3625286410  | 3625286410 |",
-        "| 123 | 7192027599  | 3566741189 |",
-        "| 123 | 9784358155  | 6159071745 |",
-        "| 122 | 13845993262 | 4061635107 |",
-        "| 120 | 16676974334 | 2830981072 |",
-        "+-----+-------------+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@13 as rn1]",
-            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "      SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "        CoalesceBatchesExec: target_batch_size=8192",
-            "          RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
-            "            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> 
{
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(false);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  CoalescePartitionsExec",
-            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@13 as rn1]",
-            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "          CoalesceBatchesExec: target_batch_size=8192",
-            "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
-            "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_intermediate_parallel_sort() -> 
Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, \
-    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
3 FOLLOWING) as sum1, \
-    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2 \
-    FROM aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@13 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "      SortPreservingMergeExec: [c9@8 ASC NULLS LAST]",
-            "        SortExec: expr=[c9@8 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 ASC NULLS 
LAST]",
-            "              CoalesceBatchesExec: target_batch_size=8192",
-            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_with_global_limit() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(1);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM 
aggregate_test_100 ORDER BY c13 LIMIT 1)";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as 
array_agg1]",
-            "  AggregateExec: mode=Final, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
-            "    AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
-            "      GlobalLimitExec: skip=0, fetch=1",
-            "        SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+----------------------------------+",
-        "| array_agg1                       |",
-        "+----------------------------------+",
-        "| [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm] |",
-        "+----------------------------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_low_cardinality() -> Result<()> {
-    let config = SessionConfig::new().with_target_partitions(32);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-        SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING 
AND 3 FOLLOWING) as summation1,
-        SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) as summation2
-    FROM aggregate_test_100
-    ORDER BY c9
-    LIMIT 5";
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+",
-        "| summation1 | summation2  |",
-        "+------------+-------------+",
-        "| -16110     | 61035129    |",
-        "| 3917       | -108973366  |",
-        "| -16974     | 623103518   |",
-        "| -1114      | -1927628110 |",
-        "| 15673      | -1899175111 |",
-        "+------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
 fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {
     let ts_field = Field::new("ts", DataType::Int32, false);
     let inc_field = Field::new("inc_col", DataType::Int32, false);
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index a7054d4ae..148c7a4fd 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -92,7 +92,7 @@ limit 5
 
 
 
-# async fn csv_query_window_with_order_by
+# csv_query_window_with_order_by
 query IIRIIIIII
 select
 c9,
@@ -403,7 +403,7 @@ aa 3 2
 bb 7 2
 
 
-# async fn window_in_expression
+# window_in_expression
 query I rowsort
 select 1 - lag(amount, 1) over (order by idx) as column1 from (values ('a', 1, 
100), ('a', 2, 150)) as t (col1, idx, amount)
 ---
@@ -412,7 +412,7 @@ select 1 - lag(amount, 1) over (order by idx) as column1 
from (values ('a', 1, 1
 NULL
 
 
-# async fn window_with_agg_in_expression
+# window_with_agg_in_expression
 query TIIIII
 select col1, idx, count(*), sum(amount), lag(sum(amount), 1) over (order by 
idx) as prev_amount,
 sum(amount) - lag(sum(amount), 1) over (order by idx) as difference from (
@@ -424,7 +424,7 @@ a 1 1 100 NULL NULL
 a 2 1 150 100 50
 
 
-# async fn window_frame_empty
+# window_frame_empty
 query II
 SELECT
 SUM(c3) OVER() as sum1,
@@ -439,7 +439,7 @@ LIMIT 5
 781 100
 781 100
 
-# async fn window_frame_rows_preceding
+# window_frame_rows_preceding
 query IRI
 SELECT
 SUM(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
@@ -456,7 +456,7 @@ LIMIT 5
 46756 15585.333333333334 3
 
 
-# async fn window_frame_rows_preceding_stddev_variance
+# window_frame_rows_preceding_stddev_variance
 query RRRR
 SELECT
 VAR(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
@@ -473,7 +473,7 @@ LIMIT 5
 768422.9999999981 512281.9999999988 876.597399037893 715.738779164577
 66526.3333333288 44350.88888888587 257.926992254259 210.596507304575
 
-# async fn window_frame_rows_preceding_with_partition_unique_order_by
+# window_frame_rows_preceding_with_partition_unique_order_by
 query IRI
 SELECT
 SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING),
@@ -519,7 +519,7 @@ LIMIT 5
 #//     Ok(())
 #// }
 
-# async fn window_frame_ranges_preceding_following_desc
+# window_frame_ranges_preceding_following_desc
 query III
 SELECT
 SUM(c4) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING),
@@ -535,7 +535,7 @@ LIMIT 5
 260620 781 63
 260620 781 63
 
-# async fn window_frame_large_range
+# window_frame_large_range
 # Range offset 10000 is too big for Int8 (i.e. the type of c3).
 # In this case, we should be able to still produce correct results.
 # See the issue: https://github.com/apache/arrow-datafusion/issues/5346
@@ -554,7 +554,7 @@ LIMIT 5
 781
 781
 
-# async fn window_frame_order_by_asc_desc_large
+# window_frame_order_by_asc_desc_large
 query I
 SELECT
  SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC) as sum1
@@ -568,7 +568,7 @@ SELECT
 -4246910946
 
 
-# async fn window_frame_order_by_desc_large
+# window_frame_order_by_desc_large
 query I
 SELECT
  SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC) as sum1
@@ -582,7 +582,7 @@ SELECT
 15810962683
 18035025006
 
-# async fn window_frame_order_by_null_timestamp_order_by
+# window_frame_order_by_null_timestamp_order_by
 query I
 SELECT
  SUM(c1) OVER (ORDER BY c2 DESC) as summation1
@@ -595,7 +595,7 @@ SELECT
 962
 962
 
-# async fn window_frame_order_by_null_desc
+# window_frame_order_by_null_desc
 query I
 SELECT
  COUNT(c2) OVER (ORDER BY c1 DESC RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING)
@@ -608,7 +608,7 @@ SELECT
 9
 9
 
-# async fn window_frame_order_by_null_asc
+# window_frame_order_by_null_asc
 query I
 SELECT
  COUNT(c2) OVER (ORDER BY c1 RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING)
@@ -622,7 +622,7 @@ SELECT
 2
 5
 
-# async fn window_frame_order_by_null_asc_null_first
+# window_frame_order_by_null_asc_null_first
 query I
 SELECT
  COUNT(c2) OVER (ORDER BY c1 NULLS FIRST RANGE BETWEEN 5 PRECEDING AND 3 
FOLLOWING)
@@ -635,7 +635,7 @@ SELECT
 9
 9
 
-# async fn window_frame_order_by_null_desc_null_last
+# window_frame_order_by_null_desc_null_last
 query I
 SELECT
  COUNT(c2) OVER (ORDER BY c1 DESC NULLS LAST RANGE BETWEEN 5 PRECEDING AND 3 
FOLLOWING)
@@ -648,7 +648,7 @@ SELECT
 6
 6
 
-# async fn window_frame_rows_order_by_null
+# window_frame_rows_order_by_null
 query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
 SELECT
         SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) 
as a,
@@ -1190,3 +1190,851 @@ NULL 3917
 -16974 -16974
 -1114 -1114
 15673 15673
+
+
+
+
+# test_window_agg_sort
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+
+# Only 1 SortExec was added
+query TT
+EXPLAIN SELECT
+      c9,
+      SUM(c9) OVER(ORDER BY c9) as sum1,
+      SUM(c9) OVER(ORDER BY c9, c8) as sum2
+      FROM aggregate_test_100
+----
+logical_plan
+Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW AS sum2
+  WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum2]
+  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
+    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
+      SortExec: expr=[c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]
+        CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+# over_order_by_sort_keys_sorting_prefix_compacting
+
+# Only 1 SortExec was added
+query TT
+EXPLAIN SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) OVER 
(ORDER BY c2, c9) from aggregate_test_100
+----
+logical_plan
+Projection: aggregate_test_100.c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW, SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW
+  WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING]]
+    WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+      WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@14 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@15 as 
SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as MIN(aggregat [...]
+  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+    BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: 
"MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
+      BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: 
"MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
+        SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+# FIXME: for now we are not detecting prefix of sorting keys in order to 
re-arrange with global and save one SortExec
+# over_order_by_sort_keys_sorting_global_order_compacting
+
+# 3 SortExec are added
+query TT
+EXPLAIN SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (), MIN(c9) 
OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2
+----
+logical_plan
+Sort: aggregate_test_100.c2 ASC NULLS LAST
+  Projection: aggregate_test_100.c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(aggregate_test_100.c9) 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, 
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING]]
+      WindowAggr: windowExpr=[[MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+        WindowAggr: windowExpr=[[MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, 
c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+SortExec: expr=[c2@0 ASC NULLS LAST]
+  ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as 
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9), 
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PREC [...]
+    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+      BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: 
"MAX(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
+        SortExec: expr=[c9@8 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+          BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { 
name: "MIN(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]
+            SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+              CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+# test_window_partition_by_order_by
+statement ok
+set datafusion.execution.batch_size = 4096
+
+query TT
+EXPLAIN SELECT
+    SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 
1 FOLLOWING),
+    COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING)
+    FROM aggregate_test_100
+----
+logical_plan
+Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING
+  WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@14 as COUNT(UInt8(1))]
+  BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
"COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
+    SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+      CoalesceBatchesExec: target_batch_size=4096
+        RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 
2), input_partitions=2
+          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { 
name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]
+            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+              CoalesceBatchesExec: target_batch_size=4096
+                RepartitionExec: partitioning=Hash([Column { name: "c1", 
index: 0 }, Column { name: "c2", index: 1 }], 2), input_partitions=2
+                  RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+                    CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+# test_window_agg_sort_reversed_plan
+# Only 1 SortExec was added
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+logical_plan
+Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING 
AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING]]
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@13 as sum2]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        SortExec: expr=[c9@8 DESC]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+query III
+SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+4268716378 8498370520 24997484146
+4229654142 12714811027 29012926487
+4216440507 16858984380 28743001064
+4144173353 20935849039 28472563256
+4076864659 24997484146 28118515915
+
+# test_window_agg_sort_reversed_plan_builtin
+query TT
+EXPLAIN SELECT
+    c9,
+    FIRST_VALUE(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv1,
+    FIRST_VALUE(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv2,
+    LAG(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lag1,
+    LAG(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lag2,
+    LEAD(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lead1,
+    LEAD(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lead2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+logical_plan
+Projection: aggregate_test_100.c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING 
AS fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS fv2, 
LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW AS lag1, LAG(aggregate_test_100.c9,Int64(2), [...]
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+      WindowAggr: windowExpr=[[FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING, LAG(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING, LEAD(aggregate_test_100.c9, Int64(2), Int64(10101)) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 
FOLLOWING]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c9@8 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@16 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@13 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@17 as lag1, LAG(aggregate_test_100.c9,I [...]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { 
name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_orde [...]
+      BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field 
{ name: "FIRST_VALUE(aggregate_test_100.c9)", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_or [...]
+        SortExec: expr=[c9@8 DESC]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+query IIIIIII
+SELECT
+    c9,
+    FIRST_VALUE(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv1,
+    FIRST_VALUE(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as fv2,
+    LAG(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lag1,
+    LAG(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lag2,
+    LEAD(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lead1,
+    LEAD(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 
FOLLOWING) as lead2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+4268716378 4229654142 4268716378 4216440507 10101 10101 4216440507
+4229654142 4216440507 4268716378 4144173353 10101 10101 4144173353
+4216440507 4144173353 4229654142 4076864659 4268716378 4268716378 4076864659
+4144173353 4076864659 4216440507 4061635107 4229654142 4229654142 4061635107
+4076864659 4061635107 4144173353 4015442341 4216440507 4216440507 4015442341
+
+# test_window_agg_sort_non_reversed_plan
+# We cannot reverse each window function (ROW_NUMBER is not reversible)
+
+query TT
+EXPLAIN SELECT
+    c9,
+    ROW_NUMBER() OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn1,
+    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+logical_plan
+Projection: aggregate_test_100.c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING 
AS rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING AS rn2
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+      WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c9@8 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as rn1, 
ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING@13 as rn2]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+      SortExec: expr=[c9@8 ASC NULLS LAST]
+        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c9@8 DESC]
+            CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT
+    c9,
+    ROW_NUMBER() OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn1,
+    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+28774375 1 100
+63044568 2 99
+141047417 3 98
+141680161 4 97
+145294611 5 96
+
+# test_window_agg_sort_multi_layer_non_reversed_plan
+# We cannot reverse each window function (ROW_NUMBER is not reversible)
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC, c1 ASC, c2 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC, c1 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum2,
+    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+logical_plan
+Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING 
AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING AS sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING A [...]
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+      WindowAggr: windowExpr=[[ROW_NUMBER() ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, 
c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@15 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2, ROW_NUMBER() ORDER 
BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECED [...]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+      SortExec: expr=[c9@8 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS 
LAST]
+        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+            SortExec: expr=[c9@8 DESC,c1@0 DESC]
+              CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+query IIII
+SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC, c1 ASC, c2 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC, c1 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum2,
+    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as rn2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+28774375 745354217 91818943 100
+63044568 988558066 232866360 99
+141047417 1285934966 374546521 98
+141680161 1654839259 519841132 97
+145294611 1980231675 745354217 96
+
+# test_window_agg_complex_plan
+# Unnecessary SortExecs are removed
+query TT
+EXPLAIN SELECT
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
a,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
b,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as c,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as d,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as e,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as f,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as g,
+    SUM(c1) OVER (ORDER BY c3) as h,
+    SUM(c1) OVER (ORDER BY c3 DESC) as i,
+    SUM(c1) OVER (ORDER BY c3 NULLS first) as j,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l,
+    SUM(c1) OVER (ORDER BY c3, c2) as m,
+    SUM(c1) OVER (ORDER BY c3, c1 DESC) as n,
+    SUM(c1) OVER (ORDER BY c3 DESC, c1) as o,
+    SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as a1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as b1,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as c1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as d1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as e1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as f1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as g1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as h1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as j1,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 
current row) as k1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as l1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as m1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as n1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as o1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as h11,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as j11,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as k11,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as l11,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as m11,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as n11,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as o11
+    FROM null_cases
+    LIMIT 5
+----
+logical_plan
+Projection: SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE 
BETWEEN 10 PRECEDING AND 11 FOLLOWING AS a, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS 
b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 
10 PRECEDING AND 11 FOLLOWING AS c, SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING AS d, 
SUM(null_cases.c1) ORDER BY [null_cases.c [...]
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC 
NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) 
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED 
FOLLOWING]]
+      WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC 
NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+        WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+          WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+            WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 
ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING]]
+              WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 
DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING]]
+                WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW, SUM(null_cases.c1) ORDER BY [null_cases.c3 
DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING, 
SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 
CURRENT ROW AND UNBOUNDED FOLLOWING]]
+                  WindowAggr: windowExpr=[[SUM(null_cases.c1) ORDER BY 
[null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW]]
+                    TableScan: null_cases projection=[c1, c2, c3]
+physical_plan
+ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1) 
ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] 
RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY 
[null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 
as d, SUM(null_cases.c1) O [...]
+  GlobalLimitExec: skip=0, fetch=5
+    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), [...]
+      BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+        SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]
+          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+            SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]
+              BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+                SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 DESC]
+                  WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: CurrentRo [...]
+                    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
"SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Precedi [...]
+                      SortExec: expr=[c3@2 DESC NULLS LAST]
+                        WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Pre [...]
+                          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: "SUM(null_cases.c1)", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
+                            SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]
+                              CsvExec: files={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/null_cases.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3]
+
+query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
+SELECT
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
a,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as 
b,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as c,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as d,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as e,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 
11 FOLLOWING) as f,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 
FOLLOWING) as g,
+    SUM(c1) OVER (ORDER BY c3) as h,
+    SUM(c1) OVER (ORDER BY c3 DESC) as i,
+    SUM(c1) OVER (ORDER BY c3 NULLS first) as j,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l,
+    SUM(c1) OVER (ORDER BY c3, c2) as m,
+    SUM(c1) OVER (ORDER BY c3, c1 DESC) as n,
+    SUM(c1) OVER (ORDER BY c3 DESC, c1) as o,
+    SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as a1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as b1,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 
FOLLOWING) as c1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as d1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as e1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND 11 FOLLOWING) as f1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND 11 FOLLOWING) as g1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as h1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current 
row) as j1,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 
current row) as k1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as l1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as m1,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED 
PRECEDING AND current row) as n1,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING 
AND current row) as o1,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as h11,
+    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as j11,
+    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED 
FOLLOWING) as k11,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as l11,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as m11,
+    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as n11,
+    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND 
UNBOUNDED FOLLOWING) as o11
+    FROM null_cases
+    LIMIT 5
+----
+412 412 339 412 339 339 412 NULL 4627 NULL 4627 4627 NULL NULL 4627 NULL 412 
412 4627 412 4627 4627 412 NULL NULL 4627 NULL 4627 4627 NULL 4627 4627 NULL 
4627 NULL NULL 4627
+488 488 412 488 412 412 488 72 4627 72 4627 4627 72 72 4627 72 488 488 4627 
488 4627 4627 488 72 72 4627 72 4627 4627 72 4627 4627 72 4627 72 72 4627
+543 543 488 543 488 488 543 96 4555 96 4555 4555 96 96 4555 96 543 543 4627 
543 4627 4627 543 96 96 4555 96 4555 4555 96 4555 4555 96 4555 96 96 4555
+553 553 543 553 543 543 553 115 4531 115 4531 4531 115 115 4531 115 553 553 
4627 553 4627 4627 553 115 115 4531 115 4531 4531 115 4531 4531 115 4531 115 
115 4531
+553 553 553 553 553 553 553 140 4512 140 4512 4512 140 140 4512 140 553 553 
4627 553 4627 4627 553 140 140 4512 140 4512 4512 140 4512 4512 140 4512 140 
140 4512
+
+
+# test_window_agg_sort_orderby_reversed_partitionby_plan
+statement ok
+set datafusion.optimizer.repartition_windows = false;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c1, c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+logical_plan
+Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c1, c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+4015442341 21907044499 21907044499
+3998790955 24576419362 24576419362
+3959216334 23063303501 23063303501
+3717551163 21560567246 21560567246
+3276123488 19815386638 19815386638
+
+statement ok
+set datafusion.optimizer.repartition_windows = true;
+
+# test_window_agg_sort_partitionby_reversed_plan
+statement ok
+set datafusion.optimizer.repartition_windows = false;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+logical_plan
+Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+query III
+SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 
5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+4015442341 8014233296 21907044499
+3998790955 11973449630 24576419362
+3959216334 15691000793 23063303501
+3717551163 18967124281 21560567246
+3276123488 21907044499 19815386638
+
+
+statement ok
+set datafusion.optimizer.repartition_windows = false;
+
+# test_window_agg_sort_orderby_reversed_binary_expr
+
+statement ok
+set datafusion.optimizer.repartition_windows = false;
+
+query TT
+EXPLAIN SELECT c3,
+    SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
+    SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+logical_plan
+Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]]
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]
+  GlobalLimitExec: skip=0, fetch=5
+    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
+        SortExec: expr=[c3@2 + c4@3 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]
+          CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+query III
+SELECT c3,
+    SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
+    SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+-86 2861911482 222089770060
+13 5075947208 219227858578
+125 8701233618 217013822852
+123 11293564174 213388536442
+97 14767488750 210796205886
+
+statement ok
+set datafusion.optimizer.repartition_windows = true;
+
+# test_remove_unnecessary_sort_in_sub_query
+#    // Unnecessary Sort in the sub query is removed
+query TT
+EXPLAIN SELECT count(*) as global_count FROM
+                 (SELECT count(*), c1
+                  FROM aggregate_test_100
+                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
+                  GROUP BY c1
+                  ORDER BY c1 ) AS a
+----
+logical_plan
+Projection: COUNT(UInt8(1)) AS global_count
+  Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
+    SubqueryAlias: a
+      Sort: aggregate_test_100.c1 ASC NULLS LAST
+        Projection: aggregate_test_100.c1
+          Aggregate: groupBy=[[aggregate_test_100.c1]], 
aggr=[[COUNT(UInt8(1))]]
+            Projection: aggregate_test_100.c1
+              Filter: aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
+                TableScan: aggregate_test_100 projection=[c1, c13], 
partial_filters=[aggregate_test_100.c13 != 
Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
+physical_plan
+ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]
+  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
+    CoalescePartitionsExec
+      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
+        RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=2
+          ProjectionExec: expr=[c1@0 as c1]
+            AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]
+              CoalesceBatchesExec: target_batch_size=4096
+                RepartitionExec: partitioning=Hash([Column { name: "c1", 
index: 0 }], 2), input_partitions=2
+                  AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[COUNT(UInt8(1))]
+                    ProjectionExec: expr=[c1@0 as c1]
+                      CoalesceBatchesExec: target_batch_size=4096
+                        FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434
+                          RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+                            CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c13]
+
+query I
+SELECT count(*) as global_count FROM
+                 (SELECT count(*), c1
+                  FROM aggregate_test_100
+                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
+                  GROUP BY c1
+                  ORDER BY c1 ) AS a
+----
+5
+
+# test_window_agg_sort_orderby_reversed_partitionby_reversed_plan
+
+
+statement ok
+set datafusion.optimizer.repartition_windows = true;
+
+query TT
+EXPLAIN SELECT c3,
+    SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1,
+    SUM(c9) OVER(PARTITION BY c3 ORDER BY c9 DESC ) as sum2
+    FROM aggregate_test_100
+    ORDER BY c3
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS 
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS 
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, 
c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortPreservingMergeExec: [c3@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS 
FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }]
+        SortExec: expr=[c3@2 ASC NULLS LAST,c9@8 DESC]
+          CoalesceBatchesExec: target_batch_size=4096
+            RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 
}], 2), input_partitions=2
+              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+                BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]
+                  SortExec: expr=[c3@2 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]
+                    CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c3,
+    SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1,
+    SUM(c9) OVER(PARTITION BY c3 ORDER BY c9 DESC ) as sum2
+    FROM aggregate_test_100
+    ORDER BY c3
+    LIMIT 5
+----
+-117 219796664156 3023531799
+-117 222089770060 5316637703
+-111 216773132357 1243785310
+-107 215529347047 1157161427
+-106 214372185620 141680161
+
+
+statement ok
+set datafusion.optimizer.repartition_windows = true;
+
+# test_window_agg_global_sort
+query TT
+EXPLAIN SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC
+----
+logical_plan
+Sort: aggregate_test_100.c1 ASC NULLS LAST
+  Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING AS rn1
+    WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
+  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@13 as rn1]
+    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+      SortExec: expr=[c1@0 ASC NULLS LAST]
+        CoalesceBatchesExec: target_batch_size=4096
+          RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 
}], 2), input_partitions=2
+            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+              CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+query TI
+SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 
ORDER BY c1 ASC
+----
+a 1
+a 2
+a 3
+a 4
+a 5
+a 6
+a 7
+a 8
+a 9
+a 10
+a 11
+a 12
+a 13
+a 14
+a 15
+a 16
+a 17
+a 18
+a 19
+a 20
+a 21
+b 1
+b 2
+b 3
+b 4
+b 5
+b 6
+b 7
+b 8
+b 9
+b 10
+b 11
+b 12
+b 13
+b 14
+b 15
+b 16
+b 17
+b 18
+b 19
+c 1
+c 2
+c 3
+c 4
+c 5
+c 6
+c 7
+c 8
+c 9
+c 10
+c 11
+c 12
+c 13
+c 14
+c 15
+c 16
+c 17
+c 18
+c 19
+c 20
+c 21
+d 1
+d 2
+d 3
+d 4
+d 5
+d 6
+d 7
+d 8
+d 9
+d 10
+d 11
+d 12
+d 13
+d 14
+d 15
+d 16
+d 17
+d 18
+e 1
+e 2
+e 3
+e 4
+e 5
+e 6
+e 7
+e 8
+e 9
+e 10
+e 11
+e 12
+e 13
+e 14
+e 15
+e 16
+e 17
+e 18
+e 19
+e 20
+e 21
+
+# test_window_agg_global_sort_parallelize_sort_disabled
+statement ok
+set datafusion.optimizer.repartition_sorts = false;
+
+query TT
+EXPLAIN SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC
+----
+logical_plan
+Sort: aggregate_test_100.c1 ASC NULLS LAST
+  Projection: aggregate_test_100.c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING AS rn1
+    WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+      TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+SortExec: expr=[c1@0 ASC NULLS LAST]
+  CoalescePartitionsExec
+    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@13 as rn1]
+      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+        SortExec: expr=[c1@0 ASC NULLS LAST]
+          CoalesceBatchesExec: target_batch_size=4096
+            RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 
}], 2), input_partitions=2
+              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+                CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+statement ok
+set datafusion.optimizer.repartition_sorts = true;
+
+# test_window_agg_global_sort_intermediate_parallel_sort
+query TT
+EXPLAIN SELECT c1,
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
3 FOLLOWING) as sum1,
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
+    FROM aggregate_test_100 ORDER BY c1 ASC
+----
+logical_plan
+Sort: aggregate_test_100.c1 ASC NULLS LAST
+  Projection: aggregate_test_100.c1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 3 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING AS sum2
+    WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING]]
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 3 FOLLOWING]]
+        TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
+physical_plan
+SortExec: expr=[c1@0 ASC NULLS LAST]
+  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 3 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@14 as sum2]
+    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+      SortPreservingMergeExec: [c9@8 ASC NULLS LAST]
+        SortExec: expr=[c9@8 ASC NULLS LAST]
+          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]
+            SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+              CoalesceBatchesExec: target_batch_size=4096
+                RepartitionExec: partitioning=Hash([Column { name: "c1", 
index: 0 }], 2), input_partitions=2
+                  RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+                    CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+# test_window_agg_with_global_limit
+statement ok
+set datafusion.optimizer.repartition_windows = false;
+
+query TT
+EXPLAIN SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM 
aggregate_test_100 ORDER BY c13 LIMIT 1)
+----
+logical_plan
+Projection: ARRAYAGG(aggregate_test_100.c13) AS array_agg1
+  Aggregate: groupBy=[[]], aggr=[[ARRAYAGG(aggregate_test_100.c13)]]
+    Limit: skip=0, fetch=1
+      Sort: aggregate_test_100.c13 ASC NULLS LAST, fetch=1
+        TableScan: aggregate_test_100 projection=[c13]
+physical_plan
+ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]
+  AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]
+    CoalescePartitionsExec
+      AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]
+        RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+          GlobalLimitExec: skip=0, fetch=1
+            SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]
+              CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c13]
+
+
+query ?
+SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM aggregate_test_100 
ORDER BY c13 LIMIT 1)
+----
+[0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm]
+
+statement ok
+set datafusion.optimizer.repartition_windows = true;
+
+# test_window_agg_low_cardinality
+statement ok
+set datafusion.execution.target_partitions = 32;
+
+query II
+SELECT
+        SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING 
AND 3 FOLLOWING) as summation1,
+        SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) as summation2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+-16110 61035129
+3917 -108973366
+-16974 623103518
+-1114 -1927628110
+15673 -1899175111
+
+statement ok
+set datafusion.execution.target_partitions = 2;

Reply via email to