ozankabak commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1099378094
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2385,6 +2384,173 @@ async fn
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
Ok(())
}
+#[tokio::test]
+async fn test_window_agg_global_sort() -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_windows(true)
+ .with_target_partitions(2)
+ .with_parallelize_sorts(true);
+ let ctx = SessionContext::with_config(config);
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM
aggregate_test_100 ORDER BY c1 ASC";
+
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ // Only 1 SortExec was added
+ let expected = {
+ vec![
+ "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
+ " ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@1 as rn1]",
+ " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+ " SortExec: [c1@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ ]
+ };
+
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual_len = actual.len();
+ let actual_trim_last = &actual[..actual_len - 1];
+ assert_eq!(
+ expected, actual_trim_last,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()>
{
+ let config = SessionConfig::new()
+ .with_repartition_windows(true)
+ .with_target_partitions(2)
+ .with_parallelize_sorts(false);
+ let ctx = SessionContext::with_config(config);
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM
aggregate_test_100 ORDER BY c1 ASC";
+
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ // Only 1 SortExec was added
+ let expected = {
+ vec![
+ "SortExec: [c1@0 ASC NULLS LAST]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@1 as rn1]",
+ " BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
+ " SortExec: [c1@0 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
+ " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ ]
+ };
+
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual_len = actual.len();
+ let actual_trim_last = &actual[..actual_len - 1];
+ assert_eq!(
+ expected, actual_trim_last,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_window_agg_global_sort_intermediate_parallel_sort() ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_windows(true)
+ .with_target_partitions(2)
+ .with_parallelize_sorts(true);
+ let ctx = SessionContext::with_config(config);
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT c1, \
+ SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND
3 FOLLOWING) as sum1, \
+ SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as
sum2 \
+ FROM aggregate_test_100 ORDER BY c1 ASC";
+
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+ // Only 1 SortExec was added
+ let expected = {
+ vec![
+ "SortExec: [c1@0 ASC NULLS LAST]",
+ " ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
+ " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
+ " SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
+ " SortExec: [c9@1 ASC NULLS LAST]",
+ " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3))
}]",
+ " SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }], 2), input_partitions=2",
Review Comment:
Yes, this is an unrelated behavior which only makes sense when cost of
hashing is significant. It is on our roadmap to make `EnforceDistribution`
smarter, maybe we can touch on this within that scope and make a single
multi-threaded hash repartitioner that does achieves the same purpose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]