NGA-TRAN commented on code in PR #18352:
URL: https://github.com/apache/datafusion/pull/18352#discussion_r2474561646
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+ reproducer_e2e_impl(true).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
+ let config = SessionConfig::default()
+ .with_target_partitions(1)
+ .with_repartition_sorts(repartition_sorts);
Review Comment:
👍 This is the key here: the setting that forces DF to execute sorts in a
per-partition fashion and merge afterwards
https://github.com/apache/datafusion/blob/e432d5561e5ef16348a3d0ab7cfb553795e31683/datafusion/common/src/config.rs#L919
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+ reproducer_e2e_impl(true).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
+ let config = SessionConfig::default()
+ .with_target_partitions(1)
+ .with_repartition_sorts(repartition_sorts);
+ let ctx = SessionContext::new_with_config(config);
+
+ let testdata = parquet_test_data();
+
+ // Register "sorted" table, that is sorted
+ ctx.register_parquet(
+ "sorted",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default()
+ .file_sort_order(vec![vec![col("id").sort(true, false)]]),
Review Comment:
Do you know whether the file is actually sorted or you just add this
function to trick the planner to plan this file as it is sorted?
##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -359,6 +359,79 @@ async fn test_union_inputs_different_sorted2() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_with_repartition_sorts_true() -> Result<()> {
+ reproducer_impl(true).await?; // ✅ Passes
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_with_repartition_sorts_false() -> Result<()> {
+ reproducer_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // OutputRequirementExec: order_by=[(nullable_col@0, asc)],
dist_by=SinglePartition
+ // SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // UnionExec
+ // DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], file_type=parquet
+ // DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_impl(repartition_sorts: bool) -> Result<()> {
Review Comment:
Similarly, can you have this function to accept 2 inputs and have all its 2
tests passed
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+ reproducer_e2e_impl(true).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
+ let config = SessionConfig::default()
+ .with_target_partitions(1)
+ .with_repartition_sorts(repartition_sorts);
+ let ctx = SessionContext::new_with_config(config);
+
+ let testdata = parquet_test_data();
+
+ // Register "sorted" table, that is sorted
+ ctx.register_parquet(
+ "sorted",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default()
+ .file_sort_order(vec![vec![col("id").sort(true, false)]]),
+ )
+ .await?;
+
+ // Register "unsorted" table
+ ctx.register_parquet(
+ "unsorted",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default()
+ )
+ .await?;
+
+ let source_sorted = ctx
+ .table("sorted")
+ .await
+ .unwrap()
+ .select(vec![col("id")])
+ .unwrap();
+
+ let source_unsorted = ctx
+ .table("unsorted")
+ .await
+ .unwrap()
+ .select(vec![col("id")])
+ .unwrap();
+
+ let source_unsorted_resorted = source_unsorted
+ .sort(vec![col("id").sort(true, false)])?;
+
+ let union = source_sorted.union(source_unsorted_resorted)?;
Review Comment:
Can you try to replace this with:
`let union = source_unsorted_resorted.union(source_sorted)?;`
to see if it works correctly?
##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -359,6 +359,79 @@ async fn test_union_inputs_different_sorted2() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_with_repartition_sorts_true() -> Result<()> {
+ reproducer_impl(true).await?; // ✅ Passes
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_with_repartition_sorts_false() -> Result<()> {
+ reproducer_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
Review Comment:
It is expected that this test fails here. See
https://github.com/apache/datafusion/blob/e432d5561e5ef16348a3d0ab7cfb553795e31683/datafusion/common/src/config.rs#L919
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+ reproducer_e2e_impl(true).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
+ let config = SessionConfig::default()
+ .with_target_partitions(1)
+ .with_repartition_sorts(repartition_sorts);
+ let ctx = SessionContext::new_with_config(config);
+
+ let testdata = parquet_test_data();
+
+ // Register "sorted" table, that is sorted
+ ctx.register_parquet(
+ "sorted",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default()
+ .file_sort_order(vec![vec![col("id").sort(true, false)]]),
+ )
+ .await?;
+
+ // Register "unsorted" table
+ ctx.register_parquet(
+ "unsorted",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default()
+ )
+ .await?;
+
+ let source_sorted = ctx
+ .table("sorted")
+ .await
+ .unwrap()
+ .select(vec![col("id")])
+ .unwrap();
+
+ let source_unsorted = ctx
+ .table("unsorted")
+ .await
+ .unwrap()
+ .select(vec![col("id")])
+ .unwrap();
+
+ let source_unsorted_resorted = source_unsorted
+ .sort(vec![col("id").sort(true, false)])?;
+
+ let union = source_sorted.union(source_unsorted_resorted)?;
+
+ let agg = union.aggregate(vec![col("id")], vec![])?;
+
+ let df = agg;
+
+ // To be able to remove user specific paths from the plan, for stable
assertions
+ let testdata_clean =
Path::new(&testdata).canonicalize()?.display().to_string();
+ let testdata_clean =
testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean);
+
+ let plan = df.explain(false, false)?.collect().await?;
+ assert_snapshot!(
+ pretty_format_batches(&plan)?.to_string().replace(&testdata_clean,
"{testdata}"),
+ @r#"
+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | plan_type | plan
|
+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+ | logical_plan | Aggregate: groupBy=[[id]], aggr=[[]]
|
+ | | Union
|
+ | | TableScan: sorted projection=[id]
|
+ | | Sort: unsorted.id ASC NULLS LAST
|
+ | | TableScan: unsorted projection=[id]
|
+ | physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
|
+ | | SortPreservingMergeExec: [id@0 ASC NULLS LAST]
|
+ | | AggregateExec: mode=Partial, gby=[id@0 as id],
aggr=[], ordering_mode=Sorted
|
+ | | UnionExec
|
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
+ | | SortExec: expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
|
+ | | DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
|
+ | |
|
+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Review Comment:
👍 Since the 2 input streams out of the union are sorted, the 2 streams
coming out from Partial Aggregate are also sorted. Thus, we should only do the
merge
And this only happens with `repartition_sorts` = true
https://github.com/apache/datafusion/blob/e432d5561e5ef16348a3d0ab7cfb553795e31683/datafusion/common/src/config.rs#L919
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+ reproducer_e2e_impl(true).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
Review Comment:
This is unexpected and the reason of your reproducer 👍
See my comment for `reproducer_e2e_impl` below to modify the test a bit to
make it easier for reviewers understand
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+ reproducer_e2e_impl(true).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
Review Comment:
Can you modify this function to accept 2 parameters: `repartition_sorts` and
`expected_plan`? Then at the comparison step, you compare with `expected_plan`.
This will help make the tests clearer . You have 4 tests in this PR and only
one should fail. The other 3 will pass
##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -359,6 +359,79 @@ async fn test_union_inputs_different_sorted2() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_with_repartition_sorts_true() -> Result<()> {
+ reproducer_impl(true).await?; // ✅ Passes
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_with_repartition_sorts_false() -> Result<()> {
+ reproducer_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // OutputRequirementExec: order_by=[(nullable_col@0, asc)],
dist_by=SinglePartition
+ // SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // UnionExec
+ // DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], file_type=parquet
+ // DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_impl(repartition_sorts: bool) -> Result<()> {
+ let schema = create_test_schema()?;
+
+ // Source 1, will be sorted explicitly (on `nullable_col`)
+ let source1 = parquet_exec(schema.clone());
+ let ordering1 = [sort_expr("nullable_col", &schema)].into();
+ let sort1 = sort_exec(ordering1, source1.clone());
+
+ // Source 2, pre-sorted (on `nullable_col`)
+ let parquet_ordering: LexOrdering = [sort_expr("nullable_col",
&schema)].into();
+ let source2 = parquet_exec_with_sort(schema.clone(),
vec![parquet_ordering.clone()]);
+
+ let union = union_exec(vec![sort1, source2]);
+
+ let coalesced = coalesce_partitions_exec(union);
+
+ // Required sorted / single partitioned output
+ let requirement = [PhysicalSortRequirement::new(
+ col("nullable_col", &schema)?,
+ Some(SortOptions::new(false, true)),
+ )]
+ .into();
+ let physical_plan = Arc::new(OutputRequirementExec::new(
+ coalesced,
+ Some(OrderingRequirements::new(requirement)),
+ Distribution::SinglePartition,
+ None,
+ ));
+
+ let test =
EnforceSortingTest::new(physical_plan).with_repartition_sorts(repartition_sorts);
+
+ assert_snapshot!(test.run(), @r"
+ Input Plan:
+ OutputRequirementExec: order_by=[(nullable_col@0, asc)],
dist_by=SinglePartition
+ CoalescePartitionsExec
+ UnionExec
+ SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC], file_type=parquet
+
+ Optimized Plan:
+ OutputRequirementExec: order_by=[(nullable_col@0, asc)],
dist_by=SinglePartition
+ SortPreservingMergeExec: [nullable_col@0 ASC]
+ UnionExec
+ SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC], file_type=parquet
+ ");
Review Comment:
So this test is to let us know we need
[repartition_sorts](https://github.com/apache/datafusion/blob/e432d5561e5ef16348a3d0ab7cfb553795e31683/datafusion/common/src/config.rs#L919)
= true for it to work. This works as expected: DF understands the 2 input of
the Union is sorted and only do the merge after that. It will fail if
repartition_sorts = false
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+ reproducer_e2e_impl(true).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+ Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
+ let config = SessionConfig::default()
+ .with_target_partitions(1)
+ .with_repartition_sorts(repartition_sorts);
+ let ctx = SessionContext::new_with_config(config);
+
+ let testdata = parquet_test_data();
+
+ // Register "sorted" table, that is sorted
+ ctx.register_parquet(
+ "sorted",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default()
+ .file_sort_order(vec![vec![col("id").sort(true, false)]]),
+ )
+ .await?;
+
+ // Register "unsorted" table
+ ctx.register_parquet(
+ "unsorted",
+ &format!("{testdata}/alltypes_tiny_pages.parquet"),
+ ParquetReadOptions::default()
+ )
+ .await?;
+
+ let source_sorted = ctx
+ .table("sorted")
+ .await
+ .unwrap()
+ .select(vec![col("id")])
+ .unwrap();
+
+ let source_unsorted = ctx
+ .table("unsorted")
+ .await
+ .unwrap()
+ .select(vec![col("id")])
+ .unwrap();
+
+ let source_unsorted_resorted = source_unsorted
+ .sort(vec![col("id").sort(true, false)])?;
+
+ let union = source_sorted.union(source_unsorted_resorted)?;
+
+ let agg = union.aggregate(vec![col("id")], vec![])?;
+
+ let df = agg;
Review Comment:
Now let us verify this plan without optimization first to see it is the
plan. Can you add this?
Note: you may hit compile error because I have not run it
```
let input_plan = displayable(df.as_ref()).indent(true).to_string();
assert_snapshot!( input_plan, @r#"your_expected_input_plan");
```
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+ reproducer_e2e_impl(false).await?;
+
+ // 💥 Doesn't pass, and generates this plan:
+ //
+ // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[],
ordering_mode=Sorted
+ // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+ // CoalescePartitionsExec
+ // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+ // UnionExec
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+ // DataSourceExec: file_groups={1 group:
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
Review Comment:
Since `repartition_sorts`= false. This test fails as expected. I would
modify the test to make it pass instead of not pass. See my comment for
`reproducer_e2e_impl` below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]