alamb commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099245227


##########
datafusion/common/src/config.rs:
##########
@@ -290,6 +290,17 @@ config_namespace! {
         /// functions in parallel using the provided `target_partitions` level"
         pub repartition_windows: bool, default = true
 
+        /// Should DataFusion parallelize Sort during physical plan creation.

Review Comment:
   👍 



##########
datafusion/common/src/config.rs:
##########
@@ -290,6 +290,17 @@ config_namespace! {
         /// functions in parallel using the provided `target_partitions` level"
         pub repartition_windows: bool, default = true
 
+        /// Should DataFusion parallelize Sort during physical plan creation.

Review Comment:
   Calling this option `repartition_sorts` is probably more consistent with 
`repartition_windows`, `repartition_joins`, etc -- however, I think we can 
rename it in a follow on PR as well



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn 
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> 
{
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> 
Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 2), input_partitions=2",

Review Comment:
   this double repartition still looks strange to me, but I understand it was 
not introduced by this PR



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn 
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",

Review Comment:
   this is a pretty clever plan (to repartition first, then do the sort in 
parallel, and then merge the results at the end) 👍 



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn 
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
     Ok(())
 }
 
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
+            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
+            "            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> 
{
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(false);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM 
aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
+            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
+            "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() -> 
Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(true)
+        .with_target_partitions(2)
+        .with_parallelize_sorts(true);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT c1, \
+    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 
3 FOLLOWING) as sum1, \
+    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2 \
+    FROM aggregate_test_100 ORDER BY c1 ASC";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "SortExec: [c1@0 ASC NULLS LAST]",
+            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
+            "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
+            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "              CoalesceBatchesExec: target_batch_size=8192",
+            "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 2), input_partitions=2",
+            "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
+        ]
+    };
+
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    let actual_len = actual.len();
+    let actual_trim_last = &actual[..actual_len - 1];
+    assert_eq!(
+        expected, actual_trim_last,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_with_global_limit() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_repartition_windows(false)
+        .with_target_partitions(1);
+    let ctx = SessionContext::with_config(config);
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM 
aggregate_test_100 ORDER BY c13 LIMIT 1)";
+
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+    // Only 1 SortExec was added
+    let expected = {
+        vec![
+            "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as 
array_agg1]",
+            "  AggregateExec: mode=Final, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "    AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
+            "      GlobalLimitExec: skip=0, fetch=1",
+            "        SortExec: [c13@0 ASC NULLS LAST]",
+            "          ProjectionExec: expr=[c13@0 as c13]",

Review Comment:
   We don't need to fix it in this PR, but this plan could be a lot better (it 
should be using a limit I think in the sort)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to