This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 80cb0afd29 Refactor EnforceDistribution test cases to demonstrate 
dependencies across optimizer runs. (#15074)
80cb0afd29 is described below

commit 80cb0afd29290126b5f4f8b854da34d2d49ac223
Author: wiedld <[email protected]>
AuthorDate: Mon Mar 10 10:10:24 2025 -0700

    Refactor EnforceDistribution test cases to demonstrate dependencies across 
optimizer runs. (#15074)
    
    * refactor(15003): permit any combination of runs desired
    
    * refactor(15003): convert macro to a function call on the TestConfig, and 
highlight when the same testing setup, but different ordering of optimizer 
runs, effect the outcome.
    
    * chore: remove unneeded comments
    
    * test: update test harness to use passed ref
---
 .../physical_optimizer/enforce_distribution.rs     | 949 ++++++++++-----------
 1 file changed, 460 insertions(+), 489 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index 080a10c7b0..b71724b8f7 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -394,29 +394,32 @@ fn test_suite_default_config_options() -> ConfigOptions {
     config
 }
 
-/// How the optimizers are run.
 #[derive(PartialEq, Clone)]
-enum DoFirst {
-    /// Runs: (EnforceDistribution, EnforceDistribution, EnforceSorting)
+enum Run {
     Distribution,
-    /// Runs: (EnforceSorting, EnforceDistribution, EnforceDistribution)
     Sorting,
 }
 
+/// Standard sets of the series of optimizer runs:
+const DISTRIB_DISTRIB_SORT: [Run; 3] =
+    [Run::Distribution, Run::Distribution, Run::Sorting];
+const SORT_DISTRIB_DISTRIB: [Run; 3] =
+    [Run::Sorting, Run::Distribution, Run::Distribution];
+
 #[derive(Clone)]
 struct TestConfig {
     config: ConfigOptions,
-    optimizers_to_run: DoFirst,
 }
 
-impl TestConfig {
-    fn new(optimizers_to_run: DoFirst) -> Self {
+impl Default for TestConfig {
+    fn default() -> Self {
         Self {
             config: test_suite_default_config_options(),
-            optimizers_to_run,
         }
     }
+}
 
+impl TestConfig {
     /// If preferred, will not repartition / resort data if it is already 
sorted.
     fn with_prefer_existing_sort(mut self) -> Self {
         self.config.optimizer.prefer_existing_sort = true;
@@ -442,40 +445,30 @@ impl TestConfig {
         self.config.execution.target_partitions = target_partitions;
         self
     }
-}
 
-/// Runs the repartition optimizer and asserts the plan against the expected
-/// Arguments
-/// * `EXPECTED_LINES` - Expected output plan
-/// * `PLAN` - Input plan
-/// * `CONFIG` - [`TestConfig`]
-macro_rules! assert_optimized {
-    ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => {
-        let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| 
*s).collect();
-
-        let TestConfig {
-            config,
-            optimizers_to_run,
-        } = $CONFIG;
-
-        // NOTE: These tests verify the joint `EnforceDistribution` + 
`EnforceSorting` cascade
-        //       because they were written prior to the separation of 
`BasicEnforcement` into
-        //       `EnforceSorting` and `EnforceDistribution`.
-        // TODO: Orthogonalize the tests here just to verify 
`EnforceDistribution` and create
-        //       new tests for the cascade.
+    /// Perform a series of runs using the current [`TestConfig`],
+    /// assert the expected plan result,
+    /// and return the result plan (for potentional subsequent runs).
+    fn run(
+        &self,
+        expected_lines: &[&str],
+        plan: Arc<dyn ExecutionPlan>,
+        optimizers_to_run: &[Run],
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let expected_lines: Vec<&str> = expected_lines.to_vec();
 
         // Add the ancillary output requirements operator at the start:
         let optimizer = OutputRequirements::new_add_mode();
-        let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+        let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
 
         // This file has 2 rules that use tree node, apply these rules to 
original plan consecutively
         // After these operations tree nodes should be in a consistent state.
         // This code block makes sure that these rules doesn't violate tree 
node integrity.
         {
-            let adjusted = if config.optimizer.top_down_join_key_reordering {
+            let adjusted = if 
self.config.optimizer.top_down_join_key_reordering {
                 // Run adjust_input_keys_ordering rule
                 let plan_requirements =
-                    PlanWithKeyRequirements::new_default($PLAN.clone());
+                    PlanWithKeyRequirements::new_default(plan.clone());
                 let adjusted = plan_requirements
                     .transform_down(adjust_input_keys_ordering)
                     .data()
@@ -484,51 +477,39 @@ macro_rules! assert_optimized {
                 adjusted.plan
             } else {
                 // Run reorder_join_keys_to_inputs rule
-                $PLAN.clone().transform_up(|plan| {
-                    Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
-                })
-                .data()?
+                plan.clone()
+                    .transform_up(|plan| {
+                        
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
+                    })
+                    .data()?
             };
 
             // Then run ensure_distribution rule
             DistributionContext::new_default(adjusted)
                 .transform_up(|distribution_context| {
-                    ensure_distribution(distribution_context, &config)
+                    ensure_distribution(distribution_context, &self.config)
                 })
                 .data()
                 .and_then(check_integrity)?;
             // TODO: End state payloads will be checked here.
         }
 
-        let optimized = if *optimizers_to_run == DoFirst::Distribution {
-            // Run enforce distribution rule first:
-            let optimizer = EnforceDistribution::new();
-            let optimized = optimizer.optimize(optimized, &config)?;
-            // The rule should be idempotent.
-            // Re-running this rule shouldn't introduce unnecessary operators.
-            let optimizer = EnforceDistribution::new();
-            let optimized = optimizer.optimize(optimized, &config)?;
-            // Run the enforce sorting rule:
-            let optimizer = EnforceSorting::new();
-            let optimized = optimizer.optimize(optimized, &config)?;
-            optimized
-        } else {
-            // Run the enforce sorting rule first:
-            let optimizer = EnforceSorting::new();
-            let optimized = optimizer.optimize(optimized, &config)?;
-            // Run enforce distribution rule:
-            let optimizer = EnforceDistribution::new();
-            let optimized = optimizer.optimize(optimized, &config)?;
-            // The rule should be idempotent.
-            // Re-running this rule shouldn't introduce unnecessary operators.
-            let optimizer = EnforceDistribution::new();
-            let optimized = optimizer.optimize(optimized, &config)?;
-            optimized
-        };
+        for run in optimizers_to_run {
+            optimized = match run {
+                Run::Distribution => {
+                    let optimizer = EnforceDistribution::new();
+                    optimizer.optimize(optimized, &self.config)?
+                }
+                Run::Sorting => {
+                    let optimizer = EnforceSorting::new();
+                    optimizer.optimize(optimized, &self.config)?
+                }
+            };
+        }
 
         // Remove the ancillary output requirements operator when done:
         let optimizer = OutputRequirements::new_remove_mode();
-        let optimized = optimizer.optimize(optimized, &config)?;
+        let optimized = optimizer.optimize(optimized, &self.config)?;
 
         // Now format correctly
         let actual_lines = get_plan_string(&optimized);
@@ -538,7 +519,9 @@ macro_rules! assert_optimized {
             "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
             expected_lines, actual_lines
         );
-    };
+
+        Ok(optimized)
+    }
 }
 
 macro_rules! assert_plan_txt {
@@ -647,12 +630,10 @@ fn multi_hash_joins() -> Result<()> {
                         "      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                 };
-                assert_optimized!(
-                    expected,
-                    top_join.clone(),
-                    &TestConfig::new(DoFirst::Distribution)
-                );
-                assert_optimized!(expected, top_join, 
&TestConfig::new(DoFirst::Sorting));
+
+                let test_config = TestConfig::default();
+                test_config.run(&expected, top_join.clone(), 
&DISTRIB_DISTRIB_SORT)?;
+                test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?;
             }
             JoinType::RightSemi | JoinType::RightAnti => {}
         }
@@ -715,12 +696,10 @@ fn multi_hash_joins() -> Result<()> {
                             "      DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
                         ],
                 };
-                assert_optimized!(
-                    expected,
-                    top_join.clone(),
-                    &TestConfig::new(DoFirst::Distribution)
-                );
-                assert_optimized!(expected, top_join, 
&TestConfig::new(DoFirst::Sorting));
+
+                let test_config = TestConfig::default();
+                test_config.run(&expected, top_join.clone(), 
&DISTRIB_DISTRIB_SORT)?;
+                test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?;
             }
             JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {}
         }
@@ -776,12 +755,9 @@ fn multi_joins_after_alias() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        top_join.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?;
 
     // Join on (a2 == c)
     let top_join_on = vec![(
@@ -806,12 +782,9 @@ fn multi_joins_after_alias() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        top_join.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -864,12 +837,9 @@ fn multi_joins_after_multi_alias() -> Result<()> {
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(
-        expected,
-        top_join.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -908,12 +878,9 @@ fn join_after_agg_alias() -> Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        join.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -965,12 +932,9 @@ fn hash_join_key_ordering() -> Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        join.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1095,16 +1059,9 @@ fn multi_hash_join_key_ordering() -> Result<()> {
         "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
         "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        filter_top_join.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected,
-        filter_top_join,
-        &TestConfig::new(DoFirst::Sorting)
-    );
+    let test_config = TestConfig::default();
+    test_config.run(expected, filter_top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, filter_top_join, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1385,6 +1342,8 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
 /// These test cases use [`TestConfig::with_prefer_existing_sort`].
 #[test]
 fn multi_smj_joins() -> Result<()> {
+    let test_config = TestConfig::default().with_prefer_existing_sort();
+
     let left = parquet_exec();
     let alias_pairs: Vec<(String, String)> = vec![
         ("a".to_string(), "a1".to_string()),
@@ -1484,11 +1443,8 @@ fn multi_smj_joins() -> Result<()> {
                     "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
             ],
         };
-        assert_optimized!(
-            expected,
-            top_join.clone(),
-            &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
-        );
+        // TODO(wiedld): show different test result if enforce sorting first.
+        test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
 
         let expected_first_sort_enforcement = match join_type {
             // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 
SortExecs
@@ -1542,11 +1498,12 @@ fn multi_smj_joins() -> Result<()> {
                 "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
             ],
         };
-        assert_optimized!(
-            expected_first_sort_enforcement,
+        // TODO(wiedld): show different test result if enforce distribution 
first.
+        test_config.run(
+            &expected_first_sort_enforcement,
             top_join,
-            &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
-        );
+            &SORT_DISTRIB_DISTRIB,
+        )?;
 
         match join_type {
             JoinType::Inner | JoinType::Left | JoinType::Right | 
JoinType::Full => {
@@ -1603,11 +1560,8 @@ fn multi_smj_joins() -> Result<()> {
                     // this match arm cannot be reached
                     _ => unreachable!()
                 };
-                assert_optimized!(
-                    expected,
-                    top_join.clone(),
-                    
&TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
-                );
+                // TODO(wiedld): show different test result if enforce sorting 
first.
+                test_config.run(&expected, top_join.clone(), 
&DISTRIB_DISTRIB_SORT)?;
 
                 let expected_first_sort_enforcement = match join_type {
                     // Should include 6 RepartitionExecs (3 of them preserves 
order) and 3 SortExecs
@@ -1654,11 +1608,12 @@ fn multi_smj_joins() -> Result<()> {
                     _ => unreachable!()
                 };
 
-                assert_optimized!(
-                    expected_first_sort_enforcement,
+                // TODO(wiedld): show different test result if enforce 
distribution first.
+                test_config.run(
+                    &expected_first_sort_enforcement,
                     top_join,
-                    
&TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
-                );
+                    &SORT_DISTRIB_DISTRIB,
+                )?;
             }
             _ => {}
         }
@@ -1714,6 +1669,10 @@ fn smj_join_key_ordering() -> Result<()> {
     ];
     let join = sort_merge_join_exec(left, right.clone(), &join_on, 
&JoinType::Inner);
 
+    // TestConfig: Prefer existing sort.
+    let test_config = TestConfig::default().with_prefer_existing_sort();
+
+    // Test: run EnforceDistribution, then EnforceSort.
     // Only two RepartitionExecs added
     let expected = &[
         "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
@@ -1733,12 +1692,9 @@ fn smj_join_key_ordering() -> Result<()> {
         "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        join.clone(),
-        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
-    );
+    test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?;
 
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
     let expected_first_sort_enforcement = &[
         "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
         "  RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC",
@@ -1763,11 +1719,7 @@ fn smj_join_key_ordering() -> Result<()> {
         "                  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "                    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected_first_sort_enforcement,
-        join,
-        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
-    );
+    test_config.run(expected_first_sort_enforcement, join, 
&SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1791,6 +1743,8 @@ fn merge_does_not_need_sort() -> Result<()> {
     let exec: Arc<dyn ExecutionPlan> =
         Arc::new(SortPreservingMergeExec::new(sort_key, exec));
 
+    // Test: run EnforceDistribution, then EnforceSort.
+    //
     // The optimizer should not add an additional SortExec as the
     // data is already sorted
     let expected = &[
@@ -1798,19 +1752,22 @@ fn merge_does_not_need_sort() -> Result<()> {
         "  CoalesceBatchesExec: target_batch_size=4096",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Distribution));
+    let test_config = TestConfig::default();
+    test_config.run(expected, exec.clone(), &DISTRIB_DISTRIB_SORT)?;
 
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
+    //
     // In this case preserving ordering through order preserving operators is 
not desirable
     // (according to flag: PREFER_EXISTING_SORT)
     // hence in this case ordering lost during CoalescePartitionsExec and 
re-introduced with
     // SortExec at the top.
-    let expected = &[
+    let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
         "    CoalesceBatchesExec: target_batch_size=4096",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Sorting));
+    test_config.run(expected_first_sort_enforcement, exec, 
&SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1851,12 +1808,10 @@ fn union_to_interleave() -> Result<()> {
         "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan.clone(), 
&TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1899,16 +1854,11 @@ fn union_not_to_interleave() -> Result<()> {
         "                DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_union()
-    );
-    assert_optimized!(
-        expected,
-        plan,
-        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_union()
-    );
+    // TestConfig: Prefer existing union.
+    let test_config = TestConfig::default().with_prefer_existing_union();
+
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1925,12 +1875,10 @@ fn added_repartition_to_single_partition() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(&expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(&expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1948,12 +1896,10 @@ fn repartition_deepest_node() -> Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1972,12 +1918,9 @@ fn repartition_unsorted_limit() -> Result<()> {
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -1998,12 +1941,10 @@ fn repartition_sorted_limit() -> Result<()> {
         "    SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2030,12 +1971,9 @@ fn repartition_sorted_limit_with_filter() -> Result<()> {
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2064,12 +2002,10 @@ fn repartition_ignores_limit() -> Result<()> {
         // Expect no repartition to happen for local limit
         "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2088,12 +2024,9 @@ fn repartition_ignores_union() -> Result<()> {
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2113,12 +2046,10 @@ fn repartition_through_sort_preserving_merge() -> 
Result<()> {
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2136,25 +2067,24 @@ fn repartition_ignores_sort_preserving_merge() -> 
Result<()> {
         parquet_exec_multiple_sorted(vec![sort_key]),
     );
 
+    // Test: run EnforceDistribution, then EnforceSort
+    //
     // should not sort (as the data was already sorted)
     // should not repartition, since increased parallelism is not beneficial 
for SortPReservingMerge
     let expected = &[
         "SortPreservingMergeExec: [c@2 ASC]",
         "  DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-
-    let expected = &[
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
+    let expected_first_sort_enforcement = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+    test_config.run(expected_first_sort_enforcement, plan, 
&SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2170,6 +2100,8 @@ fn repartition_ignores_sort_preserving_merge_with_union() 
-> Result<()> {
     let input = 
union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]);
     let plan = sort_preserving_merge_exec(sort_key, input);
 
+    // Test: run EnforceDistribution, then EnforceSort.
+    //
     // should not repartition / sort (as the data was already sorted)
     let expected = &[
         "SortPreservingMergeExec: [c@2 ASC]",
@@ -2177,21 +2109,18 @@ fn 
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-
-    let expected = &[
+    // test: result IS DIFFERENT, if EnforceSorting is run first:
+    let expected_first_sort_enforcement = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
         "    UnionExec",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+    test_config.run(expected_first_sort_enforcement, plan, 
&SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2211,6 +2140,9 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
         sort_key,
     );
 
+    // TestConfig: Prefer existing sort.
+    let test_config = TestConfig::default().with_prefer_existing_sort();
+
     // during repartitioning ordering is preserved
     let expected = &[
         "SortRequiredExec: [d@3 ASC]",
@@ -2219,16 +2151,8 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
     ];
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
-    );
-    assert_optimized!(
-        expected,
-        plan,
-        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
-    );
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2268,12 +2192,10 @@ fn repartition_does_not_destroy_sort_more_complex() -> 
Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2297,6 +2219,7 @@ fn repartition_transitively_with_projection() -> 
Result<()> {
     }]);
     let plan = sort_preserving_merge_exec(sort_key, proj);
 
+    // Test: run EnforceDistribution, then EnforceSort.
     let expected = &[
         "SortPreservingMergeExec: [sum@0 ASC]",
         "  SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]",
@@ -2305,13 +2228,10 @@ fn repartition_transitively_with_projection() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
@@ -2320,11 +2240,7 @@ fn repartition_transitively_with_projection() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected_first_sort_enforcement,
-        plan,
-        &TestConfig::new(DoFirst::Sorting)
-    );
+    test_config.run(expected_first_sort_enforcement, plan, 
&SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2356,12 +2272,10 @@ fn repartition_ignores_transitively_with_projection() 
-> Result<()> {
         "  ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2393,12 +2307,10 @@ fn repartition_transitively_past_sort_with_projection() 
-> Result<()> {
         "  ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2412,6 +2324,7 @@ fn repartition_transitively_past_sort_with_filter() -> 
Result<()> {
     }]);
     let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false);
 
+    // Test: run EnforceDistribution, then EnforceSort.
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
         "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -2420,13 +2333,10 @@ fn repartition_transitively_past_sort_with_filter() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
@@ -2435,11 +2345,7 @@ fn repartition_transitively_past_sort_with_filter() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected_first_sort_enforcement,
-        plan,
-        &TestConfig::new(DoFirst::Sorting)
-    );
+    test_config.run(expected_first_sort_enforcement, plan, 
&SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2465,6 +2371,7 @@ fn 
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
         false,
     );
 
+    // Test: run EnforceDistribution, then EnforceSort.
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
         // Expect repartition on the input to the sort (as it can benefit from 
additional parallelism)
@@ -2475,13 +2382,10 @@ fn 
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
+    let test_config = TestConfig::default();
+    test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
 
-    assert_optimized!(
-        expected,
-        plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
@@ -2490,11 +2394,7 @@ fn 
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected_first_sort_enforcement,
-        plan,
-        &TestConfig::new(DoFirst::Sorting)
-    );
+    test_config.run(expected_first_sort_enforcement, plan, 
&SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2505,24 +2405,33 @@ fn parallelization_single_partition() -> Result<()> {
     let plan_parquet = aggregate_exec_with_alias(parquet_exec(), 
alias.clone());
     let plan_csv = aggregate_exec_with_alias(csv_exec(), alias);
 
+    let test_config = TestConfig::default()
+        .with_prefer_repartition_file_scans(10)
+        .with_query_execution_partitions(2);
+
+    // Test: with parquet
     let expected_parquet = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
         "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         "      DataSourceExec: file_groups={2 groups: [[x:0..50], 
[x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
     ];
+    test_config.run(
+        &expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
         "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         "      DataSourceExec: file_groups={2 groups: [[x:0..50], 
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-
-    let test_config = TestConfig::new(DoFirst::Distribution)
-        .with_prefer_repartition_file_scans(10)
-        .with_query_execution_partitions(2);
-    assert_optimized!(expected_parquet, plan_parquet, &test_config);
-    assert_optimized!(expected_csv, plan_csv, &test_config);
+    test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2538,34 +2447,47 @@ fn parallelization_multiple_files() -> Result<()> {
     let plan = 
filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
     let plan = sort_required_exec_with_req(plan, sort_key);
 
-    let test_config = TestConfig::new(DoFirst::Distribution)
+    let test_config = TestConfig::default()
         .with_prefer_existing_sort()
         .with_prefer_repartition_file_scans(1);
 
     // The groups must have only contiguous ranges of rows from the same file
     // if any group has rows from multiple files, the data is no longer sorted 
destroyed
     // https://github.com/apache/datafusion/issues/8451
-    let expected = [
+    let expected_with_3_target_partitions = [
         "SortRequiredExec: [a@0 ASC]",
         "  FilterExec: c@2 = 0",
         "    DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], 
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], 
file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        plan,
-        &test_config.clone().with_query_execution_partitions(3)
-    );
+    let test_config_concurrency_3 =
+        test_config.clone().with_query_execution_partitions(3);
+    test_config_concurrency_3.run(
+        &expected_with_3_target_partitions,
+        plan.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config_concurrency_3.run(
+        &expected_with_3_target_partitions,
+        plan.clone(),
+        &SORT_DISTRIB_DISTRIB,
+    )?;
 
-    let expected = [
+    let expected_with_8_target_partitions = [
         "SortRequiredExec: [a@0 ASC]",
         "  FilterExec: c@2 = 0",
         "    DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], 
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
+    let test_config_concurrency_8 = 
test_config.with_query_execution_partitions(8);
+    test_config_concurrency_8.run(
+        &expected_with_8_target_partitions,
+        plan.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config_concurrency_8.run(
+        &expected_with_8_target_partitions,
         plan,
-        &test_config.with_query_execution_partitions(8)
-    );
+        &SORT_DISTRIB_DISTRIB,
+    )?;
 
     Ok(())
 }
@@ -2615,13 +2537,11 @@ fn parallelization_compressed_csv() -> Result<()> {
             .build(),
             vec![("a".to_string(), "a".to_string())],
         );
-        assert_optimized!(
-            expected,
-            plan,
-            &TestConfig::new(DoFirst::Distribution)
-                .with_query_execution_partitions(2)
-                .with_prefer_repartition_file_scans(10)
-        );
+        let test_config = TestConfig::default()
+            .with_query_execution_partitions(2)
+            .with_prefer_repartition_file_scans(10);
+        test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+        test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
     }
     Ok(())
 }
@@ -2632,6 +2552,11 @@ fn parallelization_two_partitions() -> Result<()> {
     let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(), 
alias.clone());
     let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias);
 
+    let test_config = TestConfig::default()
+        .with_query_execution_partitions(2)
+        .with_prefer_repartition_file_scans(10);
+
+    // Test: with parquet
     let expected_parquet = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
@@ -2639,6 +2564,14 @@ fn parallelization_two_partitions() -> Result<()> {
         // Plan already has two partitions
         "      DataSourceExec: file_groups={2 groups: [[x:0..100], 
[y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet",
     ];
+    test_config.run(
+        &expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
@@ -2646,11 +2579,9 @@ fn parallelization_two_partitions() -> Result<()> {
         // Plan already has two partitions
         "      DataSourceExec: file_groups={2 groups: [[x:0..100], 
[y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-    let test_config = TestConfig::new(DoFirst::Distribution)
-        .with_query_execution_partitions(2)
-        .with_prefer_repartition_file_scans(10);
-    assert_optimized!(expected_parquet, plan_parquet, &test_config);
-    assert_optimized!(expected_csv, plan_csv, &test_config);
+    test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
+
     Ok(())
 }
 
@@ -2660,6 +2591,11 @@ fn parallelization_two_partitions_into_four() -> 
Result<()> {
     let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(), 
alias.clone());
     let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias);
 
+    let test_config = TestConfig::default()
+        .with_query_execution_partitions(4)
+        .with_prefer_repartition_file_scans(10);
+
+    // Test: with parquet
     let expected_parquet = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
@@ -2667,6 +2603,14 @@ fn parallelization_two_partitions_into_four() -> 
Result<()> {
         // Multiple source files splitted across partitions
         "      DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], 
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
     ];
+    test_config.run(
+        &expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = [
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
@@ -2674,11 +2618,8 @@ fn parallelization_two_partitions_into_four() -> 
Result<()> {
         // Multiple source files splitted across partitions
         "      DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], 
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, 
has_header=false",
     ];
-    let test_config = TestConfig::new(DoFirst::Distribution)
-        .with_query_execution_partitions(4)
-        .with_prefer_repartition_file_scans(10);
-    assert_optimized!(expected_parquet, plan_parquet, &test_config);
-    assert_optimized!(expected_csv, plan_csv, &test_config);
+    test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2693,6 +2634,9 @@ fn parallelization_sorted_limit() -> Result<()> {
     let plan_parquet = limit_exec(sort_exec(sort_key.clone(), parquet_exec(), 
false));
     let plan_csv = limit_exec(sort_exec(sort_key, csv_exec(), false));
 
+    let test_config = TestConfig::default();
+
+    // Test: with parquet
     let expected_parquet = &[
         "GlobalLimitExec: skip=0, fetch=100",
         "  LocalLimitExec: fetch=100",
@@ -2701,6 +2645,14 @@ fn parallelization_sorted_limit() -> Result<()> {
         // Doesn't parallelize for SortExec without preserve_partitioning
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
+    test_config.run(
+        expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = &[
         "GlobalLimitExec: skip=0, fetch=100",
         "  LocalLimitExec: fetch=100",
@@ -2709,16 +2661,8 @@ fn parallelization_sorted_limit() -> Result<()> {
         // Doesn't parallelize for SortExec without preserve_partitioning
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2737,6 +2681,9 @@ fn parallelization_limit_with_filter() -> Result<()> {
     )));
     let plan_csv = limit_exec(filter_exec(sort_exec(sort_key, csv_exec(), 
false)));
 
+    let test_config = TestConfig::default();
+
+    // Test: with parquet
     let expected_parquet = &[
         "GlobalLimitExec: skip=0, fetch=100",
         "  CoalescePartitionsExec",
@@ -2749,6 +2696,14 @@ fn parallelization_limit_with_filter() -> Result<()> {
         // SortExec doesn't benefit from input partitioning
         "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
+    test_config.run(
+        expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = &[
         "GlobalLimitExec: skip=0, fetch=100",
         "  CoalescePartitionsExec",
@@ -2761,16 +2716,8 @@ fn parallelization_limit_with_filter() -> Result<()> {
         // SortExec doesn't benefit from input partitioning
         "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2785,6 +2732,9 @@ fn parallelization_ignores_limit() -> Result<()> {
     let plan_csv =
         
aggregate_exec_with_alias(limit_exec(filter_exec(limit_exec(csv_exec()))), 
alias);
 
+    let test_config = TestConfig::default();
+
+    // Test: with parquet
     let expected_parquet = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
@@ -2801,6 +2751,14 @@ fn parallelization_ignores_limit() -> Result<()> {
         "                    LocalLimitExec: fetch=100",
         "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
+    test_config.run(
+        expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
@@ -2817,16 +2775,8 @@ fn parallelization_ignores_limit() -> Result<()> {
         "                    LocalLimitExec: fetch=100",
         "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2836,6 +2786,9 @@ fn parallelization_union_inputs() -> Result<()> {
     let plan_parquet = union_exec(vec![parquet_exec(); 5]);
     let plan_csv = union_exec(vec![csv_exec(); 5]);
 
+    let test_config = TestConfig::default();
+
+    // Test: with parquet
     let expected_parquet = &[
         "UnionExec",
         // Union doesn't benefit from input partitioning - no parallelism
@@ -2845,6 +2798,14 @@ fn parallelization_union_inputs() -> Result<()> {
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
+    test_config.run(
+        expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = &[
         "UnionExec",
         // Union doesn't benefit from input partitioning - no parallelism
@@ -2854,16 +2815,8 @@ fn parallelization_union_inputs() -> Result<()> {
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2883,23 +2836,28 @@ fn parallelization_prior_to_sort_preserving_merge() -> 
Result<()> {
     let plan_csv =
         sort_preserving_merge_exec(sort_key.clone(), 
csv_exec_with_sort(vec![sort_key]));
 
+    let test_config = TestConfig::default();
+
+    // Expected Outcome:
     // parallelization is not beneficial for SortPreservingMerge
+
+    // Test: with parquet
     let expected_parquet = &[
         "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
+    test_config.run(
+        expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = &[
         "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -2918,30 +2876,61 @@ fn parallelization_sort_preserving_merge_with_union() 
-> Result<()> {
     let plan_parquet = sort_preserving_merge_exec(sort_key.clone(), 
input_parquet);
     let plan_csv = sort_preserving_merge_exec(sort_key, input_csv);
 
+    let test_config = TestConfig::default();
+
+    // Expected Outcome:
     // should not repartition (union doesn't benefit from increased 
parallelism)
     // should not sort (as the data was already sorted)
+
+    // Test: with parquet
     let expected_parquet = &[
         "SortPreservingMergeExec: [c@2 ASC]",
         "  UnionExec",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
+    test_config.run(
+        expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    let expected_parquet_first_sort_enforcement = &[
+        // no SPM
+        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        // has coalesce
+        "  CoalescePartitionsExec",
+        "    UnionExec",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+    ];
+    test_config.run(
+        expected_parquet_first_sort_enforcement,
+        plan_parquet,
+        &SORT_DISTRIB_DISTRIB,
+    )?;
+
+    // Test: with csv
     let expected_csv = &[
         "SortPreservingMergeExec: [c@2 ASC]",
         "  UnionExec",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    let expected_csv_first_sort_enforcement = &[
+        // no SPM
+        "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+        // has coalesce
+        "  CoalescePartitionsExec",
+        "    UnionExec",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+        "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+    ];
+    test_config.run(
+        expected_csv_first_sort_enforcement,
+        plan_csv.clone(),
+        &SORT_DISTRIB_DISTRIB,
+    )?;
 
     Ok(())
 }
@@ -2962,25 +2951,30 @@ fn parallelization_does_not_benefit() -> Result<()> {
     let plan_csv =
         
sort_required_exec_with_req(csv_exec_with_sort(vec![sort_key.clone()]), 
sort_key);
 
+    let test_config = TestConfig::default();
+
+    // Expected Outcome:
     // no parallelization, because SortRequiredExec doesn't benefit from 
increased parallelism
+
+    // Test: with parquet
     let expected_parquet = &[
         "SortRequiredExec: [c@2 ASC]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
+    test_config.run(
+        expected_parquet,
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+    // Test: with csv
     let expected_csv = &[
         "SortRequiredExec: [c@2 ASC]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3014,16 +3008,19 @@ fn 
parallelization_ignores_transitively_with_projection_parquet() -> Result<()>
     ];
     plans_matches_expected!(expected, &plan_parquet);
 
+    // Expected Outcome:
     // data should not be repartitioned / resorted
     let expected_parquet = &[
         "ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(
+    let test_config = TestConfig::default();
+    test_config.run(
         expected_parquet,
-        plan_parquet,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+        plan_parquet.clone(),
+        &DISTRIB_DISTRIB_SORT,
+    )?;
+    test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3057,16 +3054,15 @@ fn 
parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
     ];
     plans_matches_expected!(expected, &plan_csv);
 
+    // Expected Outcome:
     // data should not be repartitioned / resorted
     let expected_csv = &[
         "ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(
-        expected_csv,
-        plan_csv,
-        &TestConfig::new(DoFirst::Distribution)
-    );
+    let test_config = TestConfig::default();
+    test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3090,12 +3086,10 @@ fn remove_redundant_roundrobins() -> Result<()> {
         "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3111,6 +3105,10 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
     let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
     let physical_plan = sort_preserving_merge_exec(sort_key, 
filter_exec(input));
 
+    // TestConfig: Prefer existing sort.
+    let test_config = TestConfig::default().with_prefer_existing_sort();
+
+    // Expected Outcome:
     // Original plan expects its output to be ordered by c@2 ASC.
     // This is still satisfied since, after filter that column is constant.
     let expected = &[
@@ -3119,16 +3117,9 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
-    );
-    assert_optimized!(
-        expected,
-        physical_plan,
-        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
-    );
+
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3144,22 +3135,17 @@ fn preserve_ordering_through_repartition() -> 
Result<()> {
     let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
     let physical_plan = sort_preserving_merge_exec(sort_key, 
filter_exec(input));
 
+    // TestConfig: Prefer existing sort.
+    let test_config = TestConfig::default().with_prefer_existing_sort();
+
     let expected = &[
         "SortPreservingMergeExec: [d@3 ASC]",
         "  FilterExec: c@2 = 0",
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
-    );
-    assert_optimized!(
-        expected,
-        physical_plan,
-        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
-    );
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3174,6 +3160,9 @@ fn do_not_preserve_ordering_through_repartition() -> 
Result<()> {
     let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
     let physical_plan = sort_preserving_merge_exec(sort_key, 
filter_exec(input));
 
+    let test_config = TestConfig::default();
+
+    // Test: run EnforceDistribution, then EnforceSort.
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
         "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -3181,21 +3170,21 @@ fn do_not_preserve_ordering_through_repartition() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
 
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-
-    let expected = &[
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
+    let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
         "    FilterExec: c@2 = 0",
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
+    test_config.run(
+        expected_first_sort_enforcement,
+        physical_plan,
+        &SORT_DISTRIB_DISTRIB,
+    )?;
 
     Ok(())
 }
@@ -3218,12 +3207,9 @@ fn no_need_for_sort_after_filter() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3243,6 +3229,9 @@ fn do_not_preserve_ordering_through_repartition2() -> 
Result<()> {
     }]);
     let physical_plan = sort_preserving_merge_exec(sort_req, 
filter_exec(input));
 
+    let test_config = TestConfig::default();
+
+    // Test: run EnforceDistribution, then EnforceSort.
     let expected = &[
         "SortPreservingMergeExec: [a@0 ASC]",
         "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -3250,14 +3239,10 @@ fn do_not_preserve_ordering_through_repartition2() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
 
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-
-    let expected = &[
+    // Test: result IS DIFFERENT, if EnforceSorting is run first:
+    let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
         "    SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -3265,7 +3250,11 @@ fn do_not_preserve_ordering_through_repartition2() -> 
Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "          DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
+    test_config.run(
+        expected_first_sort_enforcement,
+        physical_plan,
+        &SORT_DISTRIB_DISTRIB,
+    )?;
 
     Ok(())
 }
@@ -3285,12 +3274,9 @@ fn do_not_preserve_ordering_through_repartition3() -> 
Result<()> {
         "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3380,22 +3366,17 @@ fn do_not_add_unnecessary_hash() -> Result<()> {
     let input = parquet_exec_with_sort(vec![sort_key]);
     let physical_plan = aggregate_exec_with_alias(input, alias);
 
+    // TestConfig:
+    // Make sure target partition number is 1. In this case hash repartition 
is unnecessary.
+    let test_config = TestConfig::default().with_query_execution_partitions(1);
+
     let expected = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         "  AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    // Make sure target partition number is 1. In this case hash repartition 
is unnecessary
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(1)
-    );
-    assert_optimized!(
-        expected,
-        physical_plan,
-        &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(1)
-    );
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3412,6 +3393,10 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
     let aggregate = aggregate_exec_with_alias(input, alias.clone());
     let physical_plan = aggregate_exec_with_alias(aggregate, alias);
 
+    // TestConfig:
+    // Make sure target partition number is larger than 2 (e.g partition 
number at the source).
+    let test_config = TestConfig::default().with_query_execution_partitions(4);
+
     let expected = &[
         "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
         // Since hash requirements of this operator is satisfied. There 
shouldn't be
@@ -3423,17 +3408,8 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
         "          RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=2",
         "            DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    // Make sure target partition number is larger than 2 (e.g partition 
number at the source).
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(4)
-    );
-    assert_optimized!(
-        expected,
-        physical_plan,
-        &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(4)
-    );
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3450,12 +3426,10 @@ fn optimize_away_unnecessary_repartition() -> 
Result<()> {
 
     let expected =
         &["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet"];
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
+
+    let test_config = TestConfig::default();
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }
@@ -3481,12 +3455,9 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(
-        expected,
-        physical_plan.clone(),
-        &TestConfig::new(DoFirst::Distribution)
-    );
-    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
+    let test_config = TestConfig::default();
+    test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+    test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
 
     Ok(())
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to