NGA-TRAN commented on code in PR #18352:
URL: https://github.com/apache/datafusion/pull/18352#discussion_r2478407869
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,152 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
-> Result<()> {
Review Comment:
```suggestion
// Test with `repartition_sorts` disabled, causing a full resort of the data
async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
-> Result<()> {
```
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,152 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
-> Result<()> {
+ assert_snapshot!(
+
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
+ @r#"
+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]]
|
+ | | Union
|
+ | | TableScan: sorted projection=[id]
|
+ | | Sort: unsorted.id ASC NULLS LAST
|
+ | | TableScan: unsorted projection=[id]
|
+ | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
|
+ | | SortExec: expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
|
+ | | CoalescePartitionsExec
|
+ | | AggregateExec: mode=Partial, gby=[id@0 as id],
aggr=[]
|
+ | | UnionExec
|
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
|
+ | |
|
+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ "#);
+ Ok(())
+}
+
+#[ignore] // See https://github.com/apache/datafusion/issues/18380
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
-> Result<()> {
Review Comment:
```suggestion
// Test with `repartition_sorts` enabled to preserve pre-sorted partitions
and avoid resorting
async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
-> Result<()> {
```
##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -359,6 +359,87 @@ async fn test_union_inputs_different_sorted2() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
-> Result<()> {
Review Comment:
```suggestion
// Test with `repartition_sorts` enabled to preserve pre-sorted partitions
and avoid resorting
async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
-> Result<()> {
```
##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -359,6 +359,87 @@ async fn test_union_inputs_different_sorted2() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
-> Result<()> {
+ assert_snapshot!(
+
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
+ @r"
+ Input Plan:
+ OutputRequirementExec: order_by=[(nullable_col@0, asc)],
dist_by=SinglePartition
+ CoalescePartitionsExec
+ UnionExec
+ SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC], file_type=parquet
+
+ Optimized Plan:
+ OutputRequirementExec: order_by=[(nullable_col@0, asc)],
dist_by=SinglePartition
+ SortPreservingMergeExec: [nullable_col@0 ASC]
+ UnionExec
+ SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC], file_type=parquet
+ ");
+ Ok(())
+}
+
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
-> Result<()> {
Review Comment:
```suggestion
// Test with `repartition_sorts` disabled, causing a full resort of the data
async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
-> Result<()> {
```
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,152 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false()
-> Result<()> {
+ assert_snapshot!(
+
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
+ @r#"
+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]]
|
+ | | Union
|
+ | | TableScan: sorted projection=[id]
|
+ | | Sort: unsorted.id ASC NULLS LAST
|
+ | | TableScan: unsorted projection=[id]
|
+ | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
|
+ | | SortExec: expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
|
+ | | CoalescePartitionsExec
|
+ | | AggregateExec: mode=Partial, gby=[id@0 as id],
aggr=[]
|
+ | | UnionExec
|
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
|
+ | |
|
+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ "#);
+ Ok(())
+}
+
+#[ignore] // See https://github.com/apache/datafusion/issues/18380
+#[tokio::test]
+async fn
union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true()
-> Result<()> {
+ assert_snapshot!(
+
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
+ @r#"
+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]]
|
+ | | Union
|
+ | | TableScan: sorted projection=[id]
|
+ | | Sort: unsorted.id ASC NULLS LAST
|
+ | | TableScan: unsorted projection=[id]
|
+ | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
|
+ | | SortPreservingMergeExec: [id@0 ASC NULLS LAST]
|
+ | | AggregateExec: mode=Partial, gby=[id@0 as id],
aggr=[], ordering_mode=Sorted
|
+ | | UnionExec
|
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
+ | | SortExec: expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
|
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
|
+ | |
|
+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ "#);
+
+ // đź’Ą Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+ //
+ //
+ // === Excerpt from the verbose explain ===
+ //
+ // Physical_plan after EnforceDistribution:
+ //
+ // OutputRequirementExec: order_by=[], dist_by=Unspecified
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // SortExec: expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+ //
+ // Physical_plan after EnforceSorting:
Review Comment:
You might want to keep the exact output, including the no-op optimizer step
in between—it helps others see exactly what’s happening.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]