wiedld commented on code in PR #15409: URL: https://github.com/apache/datafusion/pull/15409#discussion_r2011766884
########## datafusion/core/tests/physical_optimizer/enforce_distribution.rs: ########## @@ -3461,3 +3467,102 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_distribute_sort_parquet() -> Result<()> { + let test_config: TestConfig = + TestConfig::default().with_prefer_repartition_file_scans(1000); + assert!( + test_config.config.optimizer.repartition_file_scans, + "should enable scans to be repartitioned" + ); + + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("c", &schema).unwrap(), + options: SortOptions::default(), + }]); + let physical_plan = sort_exec(sort_key, parquet_exec_with_stats(10000 * 8192), false); + + // prior to optimization, this is the starting plan + let starting = &[ + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + plans_matches_expected!(starting, physical_plan.clone()); + + // what the enforce distribution run does. + let expected = &[ + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + " DataSourceExec: file_groups={10 groups: [[x:0..8192000], [x:8192000..16384000], [x:16384000..24576000], [x:24576000..32768000], [x:32768000..40960000], [x:40960000..49152000], [x:49152000..57344000], [x:57344000..65536000], [x:65536000..73728000], [x:73728000..81920000]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected, physical_plan.clone(), &[Run::Distribution])?; + + // what the sort parallelization (in enforce sorting), does after the enforce distribution changes + let expected = &[ + "SortPreservingMergeExec: [c@2 ASC]", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", + " DataSourceExec: file_groups={10 groups: [[x:0..8192000], [x:8192000..16384000], [x:16384000..24576000], [x:24576000..32768000], [x:32768000..40960000], [x:40960000..49152000], [x:49152000..57344000], [x:57344000..65536000], [x:65536000..73728000], [x:73728000..81920000]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected, physical_plan, &[Run::Distribution, Run::Sorting])?; + Ok(()) +} + +#[tokio::test] +async fn test_distribute_sort_memtable() -> Result<()> { Review Comment: Reproducer from the issue. This compares with the test above (`test_distribute_sort_parquet`), which demonstrates how it (already) works in oarquet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org