wiedld commented on code in PR #15409:
URL: https://github.com/apache/datafusion/pull/15409#discussion_r2011766884


##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3461,3 +3467,102 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_distribute_sort_parquet() -> Result<()> {
+    let test_config: TestConfig =
+        TestConfig::default().with_prefer_repartition_file_scans(1000);
+    assert!(
+        test_config.config.optimizer.repartition_file_scans,
+        "should enable scans to be repartitioned"
+    );
+
+    let schema = schema();
+    let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
+        expr: col("c", &schema).unwrap(),
+        options: SortOptions::default(),
+    }]);
+    let physical_plan = sort_exec(sort_key, parquet_exec_with_stats(10000 * 
8192), false);
+
+    // prior to optimization, this is the starting plan
+    let starting = &[
+        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
+    ];
+    plans_matches_expected!(starting, physical_plan.clone());
+
+    // what the enforce distribution run does.
+    let expected = &[
+        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        "  CoalescePartitionsExec",
+        "    DataSourceExec: file_groups={10 groups: [[x:0..8192000], 
[x:8192000..16384000], [x:16384000..24576000], [x:24576000..32768000], 
[x:32768000..40960000], [x:40960000..49152000], [x:49152000..57344000], 
[x:57344000..65536000], [x:65536000..73728000], [x:73728000..81920000]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+    ];
+    test_config.run(expected, physical_plan.clone(), &[Run::Distribution])?;
+
+    // what the sort parallelization (in enforce sorting), does after the 
enforce distribution changes
+    let expected = &[
+        "SortPreservingMergeExec: [c@2 ASC]",
+        "  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
+        "    DataSourceExec: file_groups={10 groups: [[x:0..8192000], 
[x:8192000..16384000], [x:16384000..24576000], [x:24576000..32768000], 
[x:32768000..40960000], [x:40960000..49152000], [x:49152000..57344000], 
[x:57344000..65536000], [x:65536000..73728000], [x:73728000..81920000]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+    ];
+    test_config.run(expected, physical_plan, &[Run::Distribution, 
Run::Sorting])?;
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_distribute_sort_memtable() -> Result<()> {

Review Comment:
   Reproducer from the issue.
   
   This compares with the test above (`test_distribute_sort_parquet`), which 
demonstrates how it (already) works in oarquet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to