This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 80cb0afd29 Refactor EnforceDistribution test cases to demonstrate
dependencies across optimizer runs. (#15074)
80cb0afd29 is described below
commit 80cb0afd29290126b5f4f8b854da34d2d49ac223
Author: wiedld <[email protected]>
AuthorDate: Mon Mar 10 10:10:24 2025 -0700
Refactor EnforceDistribution test cases to demonstrate dependencies across
optimizer runs. (#15074)
* refactor(15003): permit any combination of runs desired
* refactor(15003): convert macro to a function call on the TestConfig, and
highlight when the same testing setup, but different ordering of optimizer
runs, effect the outcome.
* chore: remove unneeded comments
* test: update test harness to use passed ref
---
.../physical_optimizer/enforce_distribution.rs | 949 ++++++++++-----------
1 file changed, 460 insertions(+), 489 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index 080a10c7b0..b71724b8f7 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -394,29 +394,32 @@ fn test_suite_default_config_options() -> ConfigOptions {
config
}
-/// How the optimizers are run.
#[derive(PartialEq, Clone)]
-enum DoFirst {
- /// Runs: (EnforceDistribution, EnforceDistribution, EnforceSorting)
+enum Run {
Distribution,
- /// Runs: (EnforceSorting, EnforceDistribution, EnforceDistribution)
Sorting,
}
+/// Standard sets of the series of optimizer runs:
+const DISTRIB_DISTRIB_SORT: [Run; 3] =
+ [Run::Distribution, Run::Distribution, Run::Sorting];
+const SORT_DISTRIB_DISTRIB: [Run; 3] =
+ [Run::Sorting, Run::Distribution, Run::Distribution];
+
#[derive(Clone)]
struct TestConfig {
config: ConfigOptions,
- optimizers_to_run: DoFirst,
}
-impl TestConfig {
- fn new(optimizers_to_run: DoFirst) -> Self {
+impl Default for TestConfig {
+ fn default() -> Self {
Self {
config: test_suite_default_config_options(),
- optimizers_to_run,
}
}
+}
+impl TestConfig {
/// If preferred, will not repartition / resort data if it is already
sorted.
fn with_prefer_existing_sort(mut self) -> Self {
self.config.optimizer.prefer_existing_sort = true;
@@ -442,40 +445,30 @@ impl TestConfig {
self.config.execution.target_partitions = target_partitions;
self
}
-}
-/// Runs the repartition optimizer and asserts the plan against the expected
-/// Arguments
-/// * `EXPECTED_LINES` - Expected output plan
-/// * `PLAN` - Input plan
-/// * `CONFIG` - [`TestConfig`]
-macro_rules! assert_optimized {
- ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => {
- let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
-
- let TestConfig {
- config,
- optimizers_to_run,
- } = $CONFIG;
-
- // NOTE: These tests verify the joint `EnforceDistribution` +
`EnforceSorting` cascade
- // because they were written prior to the separation of
`BasicEnforcement` into
- // `EnforceSorting` and `EnforceDistribution`.
- // TODO: Orthogonalize the tests here just to verify
`EnforceDistribution` and create
- // new tests for the cascade.
+ /// Perform a series of runs using the current [`TestConfig`],
+ /// assert the expected plan result,
+ /// and return the result plan (for potentional subsequent runs).
+ fn run(
+ &self,
+ expected_lines: &[&str],
+ plan: Arc<dyn ExecutionPlan>,
+ optimizers_to_run: &[Run],
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let expected_lines: Vec<&str> = expected_lines.to_vec();
// Add the ancillary output requirements operator at the start:
let optimizer = OutputRequirements::new_add_mode();
- let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+ let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
// This file has 2 rules that use tree node, apply these rules to
original plan consecutively
// After these operations tree nodes should be in a consistent state.
// This code block makes sure that these rules doesn't violate tree
node integrity.
{
- let adjusted = if config.optimizer.top_down_join_key_reordering {
+ let adjusted = if
self.config.optimizer.top_down_join_key_reordering {
// Run adjust_input_keys_ordering rule
let plan_requirements =
- PlanWithKeyRequirements::new_default($PLAN.clone());
+ PlanWithKeyRequirements::new_default(plan.clone());
let adjusted = plan_requirements
.transform_down(adjust_input_keys_ordering)
.data()
@@ -484,51 +477,39 @@ macro_rules! assert_optimized {
adjusted.plan
} else {
// Run reorder_join_keys_to_inputs rule
- $PLAN.clone().transform_up(|plan| {
- Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
- })
- .data()?
+ plan.clone()
+ .transform_up(|plan| {
+
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
+ })
+ .data()?
};
// Then run ensure_distribution rule
DistributionContext::new_default(adjusted)
.transform_up(|distribution_context| {
- ensure_distribution(distribution_context, &config)
+ ensure_distribution(distribution_context, &self.config)
})
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
}
- let optimized = if *optimizers_to_run == DoFirst::Distribution {
- // Run enforce distribution rule first:
- let optimizer = EnforceDistribution::new();
- let optimized = optimizer.optimize(optimized, &config)?;
- // The rule should be idempotent.
- // Re-running this rule shouldn't introduce unnecessary operators.
- let optimizer = EnforceDistribution::new();
- let optimized = optimizer.optimize(optimized, &config)?;
- // Run the enforce sorting rule:
- let optimizer = EnforceSorting::new();
- let optimized = optimizer.optimize(optimized, &config)?;
- optimized
- } else {
- // Run the enforce sorting rule first:
- let optimizer = EnforceSorting::new();
- let optimized = optimizer.optimize(optimized, &config)?;
- // Run enforce distribution rule:
- let optimizer = EnforceDistribution::new();
- let optimized = optimizer.optimize(optimized, &config)?;
- // The rule should be idempotent.
- // Re-running this rule shouldn't introduce unnecessary operators.
- let optimizer = EnforceDistribution::new();
- let optimized = optimizer.optimize(optimized, &config)?;
- optimized
- };
+ for run in optimizers_to_run {
+ optimized = match run {
+ Run::Distribution => {
+ let optimizer = EnforceDistribution::new();
+ optimizer.optimize(optimized, &self.config)?
+ }
+ Run::Sorting => {
+ let optimizer = EnforceSorting::new();
+ optimizer.optimize(optimized, &self.config)?
+ }
+ };
+ }
// Remove the ancillary output requirements operator when done:
let optimizer = OutputRequirements::new_remove_mode();
- let optimized = optimizer.optimize(optimized, &config)?;
+ let optimized = optimizer.optimize(optimized, &self.config)?;
// Now format correctly
let actual_lines = get_plan_string(&optimized);
@@ -538,7 +519,9 @@ macro_rules! assert_optimized {
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
- };
+
+ Ok(optimized)
+ }
}
macro_rules! assert_plan_txt {
@@ -647,12 +630,10 @@ fn multi_hash_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(
- expected,
- top_join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, top_join,
&TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(&expected, top_join.clone(),
&DISTRIB_DISTRIB_SORT)?;
+ test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?;
}
JoinType::RightSemi | JoinType::RightAnti => {}
}
@@ -715,12 +696,10 @@ fn multi_hash_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(
- expected,
- top_join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, top_join,
&TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(&expected, top_join.clone(),
&DISTRIB_DISTRIB_SORT)?;
+ test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?;
}
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {}
}
@@ -776,12 +755,9 @@ fn multi_joins_after_alias() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- top_join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?;
// Join on (a2 == c)
let top_join_on = vec![(
@@ -806,12 +782,9 @@ fn multi_joins_after_alias() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- top_join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -864,12 +837,9 @@ fn multi_joins_after_multi_alias() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- top_join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -908,12 +878,9 @@ fn join_after_agg_alias() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -965,12 +932,9 @@ fn hash_join_key_ordering() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1095,16 +1059,9 @@ fn multi_hash_join_key_ordering() -> Result<()> {
" ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- filter_top_join.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected,
- filter_top_join,
- &TestConfig::new(DoFirst::Sorting)
- );
+ let test_config = TestConfig::default();
+ test_config.run(expected, filter_top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, filter_top_join, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1385,6 +1342,8 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
/// These test cases use [`TestConfig::with_prefer_existing_sort`].
#[test]
fn multi_smj_joins() -> Result<()> {
+ let test_config = TestConfig::default().with_prefer_existing_sort();
+
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
@@ -1484,11 +1443,8 @@ fn multi_smj_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(
- expected,
- top_join.clone(),
- &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
- );
+ // TODO(wiedld): show different test result if enforce sorting first.
+ test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
let expected_first_sort_enforcement = match join_type {
// Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3
SortExecs
@@ -1542,11 +1498,12 @@ fn multi_smj_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(
- expected_first_sort_enforcement,
+ // TODO(wiedld): show different test result if enforce distribution
first.
+ test_config.run(
+ &expected_first_sort_enforcement,
top_join,
- &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
- );
+ &SORT_DISTRIB_DISTRIB,
+ )?;
match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right |
JoinType::Full => {
@@ -1603,11 +1560,8 @@ fn multi_smj_joins() -> Result<()> {
// this match arm cannot be reached
_ => unreachable!()
};
- assert_optimized!(
- expected,
- top_join.clone(),
-
&TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
- );
+ // TODO(wiedld): show different test result if enforce sorting
first.
+ test_config.run(&expected, top_join.clone(),
&DISTRIB_DISTRIB_SORT)?;
let expected_first_sort_enforcement = match join_type {
// Should include 6 RepartitionExecs (3 of them preserves
order) and 3 SortExecs
@@ -1654,11 +1608,12 @@ fn multi_smj_joins() -> Result<()> {
_ => unreachable!()
};
- assert_optimized!(
- expected_first_sort_enforcement,
+ // TODO(wiedld): show different test result if enforce
distribution first.
+ test_config.run(
+ &expected_first_sort_enforcement,
top_join,
-
&TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
- );
+ &SORT_DISTRIB_DISTRIB,
+ )?;
}
_ => {}
}
@@ -1714,6 +1669,10 @@ fn smj_join_key_ordering() -> Result<()> {
];
let join = sort_merge_join_exec(left, right.clone(), &join_on,
&JoinType::Inner);
+ // TestConfig: Prefer existing sort.
+ let test_config = TestConfig::default().with_prefer_existing_sort();
+
+ // Test: run EnforceDistribution, then EnforceSort.
// Only two RepartitionExecs added
let expected = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
@@ -1733,12 +1692,9 @@ fn smj_join_key_ordering() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- join.clone(),
- &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
- );
+ test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?;
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
" RepartitionExec: partitioning=Hash([b3@1, a3@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC",
@@ -1763,11 +1719,7 @@ fn smj_join_key_ordering() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected_first_sort_enforcement,
- join,
- &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
- );
+ test_config.run(expected_first_sort_enforcement, join,
&SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1791,6 +1743,8 @@ fn merge_does_not_need_sort() -> Result<()> {
let exec: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(sort_key, exec));
+ // Test: run EnforceDistribution, then EnforceSort.
+ //
// The optimizer should not add an additional SortExec as the
// data is already sorted
let expected = &[
@@ -1798,19 +1752,22 @@ fn merge_does_not_need_sort() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Distribution));
+ let test_config = TestConfig::default();
+ test_config.run(expected, exec.clone(), &DISTRIB_DISTRIB_SORT)?;
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
+ //
// In this case preserving ordering through order preserving operators is
not desirable
// (according to flag: PREFER_EXISTING_SORT)
// hence in this case ordering lost during CoalescePartitionsExec and
re-introduced with
// SortExec at the top.
- let expected = &[
+ let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=4096",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Sorting));
+ test_config.run(expected_first_sort_enforcement, exec,
&SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1851,12 +1808,10 @@ fn union_to_interleave() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan.clone(),
&TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1899,16 +1854,11 @@ fn union_not_to_interleave() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution).with_prefer_existing_union()
- );
- assert_optimized!(
- expected,
- plan,
- &TestConfig::new(DoFirst::Sorting).with_prefer_existing_union()
- );
+ // TestConfig: Prefer existing union.
+ let test_config = TestConfig::default().with_prefer_existing_union();
+
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1925,12 +1875,10 @@ fn added_repartition_to_single_partition() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(&expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(&expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1948,12 +1896,10 @@ fn repartition_deepest_node() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1972,12 +1918,9 @@ fn repartition_unsorted_limit() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -1998,12 +1941,10 @@ fn repartition_sorted_limit() -> Result<()> {
" SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2030,12 +1971,9 @@ fn repartition_sorted_limit_with_filter() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2064,12 +2002,10 @@ fn repartition_ignores_limit() -> Result<()> {
// Expect no repartition to happen for local limit
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2088,12 +2024,9 @@ fn repartition_ignores_union() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2113,12 +2046,10 @@ fn repartition_through_sort_preserving_merge() ->
Result<()> {
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2136,25 +2067,24 @@ fn repartition_ignores_sort_preserving_merge() ->
Result<()> {
parquet_exec_multiple_sorted(vec![sort_key]),
);
+ // Test: run EnforceDistribution, then EnforceSort
+ //
// should not sort (as the data was already sorted)
// should not repartition, since increased parallelism is not beneficial
for SortPReservingMerge
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
" DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
-
- let expected = &[
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
+ let expected_first_sort_enforcement = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+ test_config.run(expected_first_sort_enforcement, plan,
&SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2170,6 +2100,8 @@ fn repartition_ignores_sort_preserving_merge_with_union()
-> Result<()> {
let input =
union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]);
let plan = sort_preserving_merge_exec(sort_key, input);
+ // Test: run EnforceDistribution, then EnforceSort.
+ //
// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
@@ -2177,21 +2109,18 @@ fn
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
-
- let expected = &[
+ // test: result IS DIFFERENT, if EnforceSorting is run first:
+ let expected_first_sort_enforcement = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+ test_config.run(expected_first_sort_enforcement, plan,
&SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2211,6 +2140,9 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
sort_key,
);
+ // TestConfig: Prefer existing sort.
+ let test_config = TestConfig::default().with_prefer_existing_sort();
+
// during repartitioning ordering is preserved
let expected = &[
"SortRequiredExec: [d@3 ASC]",
@@ -2219,16 +2151,8 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
- );
- assert_optimized!(
- expected,
- plan,
- &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
- );
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2268,12 +2192,10 @@ fn repartition_does_not_destroy_sort_more_complex() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2297,6 +2219,7 @@ fn repartition_transitively_with_projection() ->
Result<()> {
}]);
let plan = sort_preserving_merge_exec(sort_key, proj);
+ // Test: run EnforceDistribution, then EnforceSort.
let expected = &[
"SortPreservingMergeExec: [sum@0 ASC]",
" SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]",
@@ -2305,13 +2228,10 @@ fn repartition_transitively_with_projection() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
-
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
let expected_first_sort_enforcement = &[
"SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
@@ -2320,11 +2240,7 @@ fn repartition_transitively_with_projection() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected_first_sort_enforcement,
- plan,
- &TestConfig::new(DoFirst::Sorting)
- );
+ test_config.run(expected_first_sort_enforcement, plan,
&SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2356,12 +2272,10 @@ fn repartition_ignores_transitively_with_projection()
-> Result<()> {
" ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2393,12 +2307,10 @@ fn repartition_transitively_past_sort_with_projection()
-> Result<()> {
" ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2412,6 +2324,7 @@ fn repartition_transitively_past_sort_with_filter() ->
Result<()> {
}]);
let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false);
+ // Test: run EnforceDistribution, then EnforceSort.
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -2420,13 +2333,10 @@ fn repartition_transitively_past_sort_with_filter() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
-
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
@@ -2435,11 +2345,7 @@ fn repartition_transitively_past_sort_with_filter() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected_first_sort_enforcement,
- plan,
- &TestConfig::new(DoFirst::Sorting)
- );
+ test_config.run(expected_first_sort_enforcement, plan,
&SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2465,6 +2371,7 @@ fn
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
false,
);
+ // Test: run EnforceDistribution, then EnforceSort.
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from
additional parallelism)
@@ -2475,13 +2382,10 @@ fn
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
+ let test_config = TestConfig::default();
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- assert_optimized!(
- expected,
- plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
-
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
@@ -2490,11 +2394,7 @@ fn
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected_first_sort_enforcement,
- plan,
- &TestConfig::new(DoFirst::Sorting)
- );
+ test_config.run(expected_first_sort_enforcement, plan,
&SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2505,24 +2405,33 @@ fn parallelization_single_partition() -> Result<()> {
let plan_parquet = aggregate_exec_with_alias(parquet_exec(),
alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec(), alias);
+ let test_config = TestConfig::default()
+ .with_prefer_repartition_file_scans(10)
+ .with_query_execution_partitions(2);
+
+ // Test: with parquet
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
" DataSourceExec: file_groups={2 groups: [[x:0..50],
[x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
];
+ test_config.run(
+ &expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
" AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
" DataSourceExec: file_groups={2 groups: [[x:0..50],
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
-
- let test_config = TestConfig::new(DoFirst::Distribution)
- .with_prefer_repartition_file_scans(10)
- .with_query_execution_partitions(2);
- assert_optimized!(expected_parquet, plan_parquet, &test_config);
- assert_optimized!(expected_csv, plan_csv, &test_config);
+ test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2538,34 +2447,47 @@ fn parallelization_multiple_files() -> Result<()> {
let plan =
filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
let plan = sort_required_exec_with_req(plan, sort_key);
- let test_config = TestConfig::new(DoFirst::Distribution)
+ let test_config = TestConfig::default()
.with_prefer_existing_sort()
.with_prefer_repartition_file_scans(1);
// The groups must have only contiguous ranges of rows from the same file
// if any group has rows from multiple files, the data is no longer sorted
destroyed
// https://github.com/apache/datafusion/issues/8451
- let expected = [
+ let expected_with_3_target_partitions = [
"SortRequiredExec: [a@0 ASC]",
" FilterExec: c@2 = 0",
" DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100],
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC],
file_type=parquet",
];
- assert_optimized!(
- expected,
- plan,
- &test_config.clone().with_query_execution_partitions(3)
- );
+ let test_config_concurrency_3 =
+ test_config.clone().with_query_execution_partitions(3);
+ test_config_concurrency_3.run(
+ &expected_with_3_target_partitions,
+ plan.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config_concurrency_3.run(
+ &expected_with_3_target_partitions,
+ plan.clone(),
+ &SORT_DISTRIB_DISTRIB,
+ )?;
- let expected = [
+ let expected_with_8_target_partitions = [
"SortRequiredExec: [a@0 ASC]",
" FilterExec: c@2 = 0",
" DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25],
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(
- expected,
+ let test_config_concurrency_8 =
test_config.with_query_execution_partitions(8);
+ test_config_concurrency_8.run(
+ &expected_with_8_target_partitions,
+ plan.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config_concurrency_8.run(
+ &expected_with_8_target_partitions,
plan,
- &test_config.with_query_execution_partitions(8)
- );
+ &SORT_DISTRIB_DISTRIB,
+ )?;
Ok(())
}
@@ -2615,13 +2537,11 @@ fn parallelization_compressed_csv() -> Result<()> {
.build(),
vec![("a".to_string(), "a".to_string())],
);
- assert_optimized!(
- expected,
- plan,
- &TestConfig::new(DoFirst::Distribution)
- .with_query_execution_partitions(2)
- .with_prefer_repartition_file_scans(10)
- );
+ let test_config = TestConfig::default()
+ .with_query_execution_partitions(2)
+ .with_prefer_repartition_file_scans(10);
+ test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
}
Ok(())
}
@@ -2632,6 +2552,11 @@ fn parallelization_two_partitions() -> Result<()> {
let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(),
alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias);
+ let test_config = TestConfig::default()
+ .with_query_execution_partitions(2)
+ .with_prefer_repartition_file_scans(10);
+
+ // Test: with parquet
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
@@ -2639,6 +2564,14 @@ fn parallelization_two_partitions() -> Result<()> {
// Plan already has two partitions
" DataSourceExec: file_groups={2 groups: [[x:0..100],
[y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet",
];
+ test_config.run(
+ &expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
@@ -2646,11 +2579,9 @@ fn parallelization_two_partitions() -> Result<()> {
// Plan already has two partitions
" DataSourceExec: file_groups={2 groups: [[x:0..100],
[y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
- let test_config = TestConfig::new(DoFirst::Distribution)
- .with_query_execution_partitions(2)
- .with_prefer_repartition_file_scans(10);
- assert_optimized!(expected_parquet, plan_parquet, &test_config);
- assert_optimized!(expected_csv, plan_csv, &test_config);
+ test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
+
Ok(())
}
@@ -2660,6 +2591,11 @@ fn parallelization_two_partitions_into_four() ->
Result<()> {
let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(),
alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias);
+ let test_config = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .with_prefer_repartition_file_scans(10);
+
+ // Test: with parquet
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
@@ -2667,6 +2603,14 @@ fn parallelization_two_partitions_into_four() ->
Result<()> {
// Multiple source files splitted across partitions
" DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100],
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
];
+ test_config.run(
+ &expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
@@ -2674,11 +2618,8 @@ fn parallelization_two_partitions_into_four() ->
Result<()> {
// Multiple source files splitted across partitions
" DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100],
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv,
has_header=false",
];
- let test_config = TestConfig::new(DoFirst::Distribution)
- .with_query_execution_partitions(4)
- .with_prefer_repartition_file_scans(10);
- assert_optimized!(expected_parquet, plan_parquet, &test_config);
- assert_optimized!(expected_csv, plan_csv, &test_config);
+ test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2693,6 +2634,9 @@ fn parallelization_sorted_limit() -> Result<()> {
let plan_parquet = limit_exec(sort_exec(sort_key.clone(), parquet_exec(),
false));
let plan_csv = limit_exec(sort_exec(sort_key, csv_exec(), false));
+ let test_config = TestConfig::default();
+
+ // Test: with parquet
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
@@ -2701,6 +2645,14 @@ fn parallelization_sorted_limit() -> Result<()> {
// Doesn't parallelize for SortExec without preserve_partitioning
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
+ test_config.run(
+ expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
@@ -2709,16 +2661,8 @@ fn parallelization_sorted_limit() -> Result<()> {
// Doesn't parallelize for SortExec without preserve_partitioning
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2737,6 +2681,9 @@ fn parallelization_limit_with_filter() -> Result<()> {
)));
let plan_csv = limit_exec(filter_exec(sort_exec(sort_key, csv_exec(),
false)));
+ let test_config = TestConfig::default();
+
+ // Test: with parquet
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
" CoalescePartitionsExec",
@@ -2749,6 +2696,14 @@ fn parallelization_limit_with_filter() -> Result<()> {
// SortExec doesn't benefit from input partitioning
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
+ test_config.run(
+ expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
" CoalescePartitionsExec",
@@ -2761,16 +2716,8 @@ fn parallelization_limit_with_filter() -> Result<()> {
// SortExec doesn't benefit from input partitioning
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2785,6 +2732,9 @@ fn parallelization_ignores_limit() -> Result<()> {
let plan_csv =
aggregate_exec_with_alias(limit_exec(filter_exec(limit_exec(csv_exec()))),
alias);
+ let test_config = TestConfig::default();
+
+ // Test: with parquet
let expected_parquet = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
@@ -2801,6 +2751,14 @@ fn parallelization_ignores_limit() -> Result<()> {
" LocalLimitExec: fetch=100",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
+ test_config.run(
+ expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
@@ -2817,16 +2775,8 @@ fn parallelization_ignores_limit() -> Result<()> {
" LocalLimitExec: fetch=100",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2836,6 +2786,9 @@ fn parallelization_union_inputs() -> Result<()> {
let plan_parquet = union_exec(vec![parquet_exec(); 5]);
let plan_csv = union_exec(vec![csv_exec(); 5]);
+ let test_config = TestConfig::default();
+
+ // Test: with parquet
let expected_parquet = &[
"UnionExec",
// Union doesn't benefit from input partitioning - no parallelism
@@ -2845,6 +2798,14 @@ fn parallelization_union_inputs() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
+ test_config.run(
+ expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = &[
"UnionExec",
// Union doesn't benefit from input partitioning - no parallelism
@@ -2854,16 +2815,8 @@ fn parallelization_union_inputs() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2883,23 +2836,28 @@ fn parallelization_prior_to_sort_preserving_merge() ->
Result<()> {
let plan_csv =
sort_preserving_merge_exec(sort_key.clone(),
csv_exec_with_sort(vec![sort_key]));
+ let test_config = TestConfig::default();
+
+ // Expected Outcome:
// parallelization is not beneficial for SortPreservingMerge
+
+ // Test: with parquet
let expected_parquet = &[
"DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
];
+ test_config.run(
+ expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = &[
"DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -2918,30 +2876,61 @@ fn parallelization_sort_preserving_merge_with_union()
-> Result<()> {
let plan_parquet = sort_preserving_merge_exec(sort_key.clone(),
input_parquet);
let plan_csv = sort_preserving_merge_exec(sort_key, input_csv);
+ let test_config = TestConfig::default();
+
+ // Expected Outcome:
// should not repartition (union doesn't benefit from increased
parallelism)
// should not sort (as the data was already sorted)
+
+ // Test: with parquet
let expected_parquet = &[
"SortPreservingMergeExec: [c@2 ASC]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
+ test_config.run(
+ expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ let expected_parquet_first_sort_enforcement = &[
+ // no SPM
+ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ // has coalesce
+ " CoalescePartitionsExec",
+ " UnionExec",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ ];
+ test_config.run(
+ expected_parquet_first_sort_enforcement,
+ plan_parquet,
+ &SORT_DISTRIB_DISTRIB,
+ )?;
+
+ // Test: with csv
let expected_csv = &[
"SortPreservingMergeExec: [c@2 ASC]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ let expected_csv_first_sort_enforcement = &[
+ // no SPM
+ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ // has coalesce
+ " CoalescePartitionsExec",
+ " UnionExec",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+ ];
+ test_config.run(
+ expected_csv_first_sort_enforcement,
+ plan_csv.clone(),
+ &SORT_DISTRIB_DISTRIB,
+ )?;
Ok(())
}
@@ -2962,25 +2951,30 @@ fn parallelization_does_not_benefit() -> Result<()> {
let plan_csv =
sort_required_exec_with_req(csv_exec_with_sort(vec![sort_key.clone()]),
sort_key);
+ let test_config = TestConfig::default();
+
+ // Expected Outcome:
// no parallelization, because SortRequiredExec doesn't benefit from
increased parallelism
+
+ // Test: with parquet
let expected_parquet = &[
"SortRequiredExec: [c@2 ASC]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
+ test_config.run(
+ expected_parquet,
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
+
+ // Test: with csv
let expected_csv = &[
"SortRequiredExec: [c@2 ASC]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3014,16 +3008,19 @@ fn
parallelization_ignores_transitively_with_projection_parquet() -> Result<()>
];
plans_matches_expected!(expected, &plan_parquet);
+ // Expected Outcome:
// data should not be repartitioned / resorted
let expected_parquet = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(
+ let test_config = TestConfig::default();
+ test_config.run(
expected_parquet,
- plan_parquet,
- &TestConfig::new(DoFirst::Distribution)
- );
+ plan_parquet.clone(),
+ &DISTRIB_DISTRIB_SORT,
+ )?;
+ test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3057,16 +3054,15 @@ fn
parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
];
plans_matches_expected!(expected, &plan_csv);
+ // Expected Outcome:
// data should not be repartitioned / resorted
let expected_csv = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(
- expected_csv,
- plan_csv,
- &TestConfig::new(DoFirst::Distribution)
- );
+ let test_config = TestConfig::default();
+ test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3090,12 +3086,10 @@ fn remove_redundant_roundrobins() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3111,6 +3105,10 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key,
filter_exec(input));
+ // TestConfig: Prefer existing sort.
+ let test_config = TestConfig::default().with_prefer_existing_sort();
+
+ // Expected Outcome:
// Original plan expects its output to be ordered by c@2 ASC.
// This is still satisfied since, after filter that column is constant.
let expected = &[
@@ -3119,16 +3117,9 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
- );
- assert_optimized!(
- expected,
- physical_plan,
- &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
- );
+
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3144,22 +3135,17 @@ fn preserve_ordering_through_repartition() ->
Result<()> {
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key,
filter_exec(input));
+ // TestConfig: Prefer existing sort.
+ let test_config = TestConfig::default().with_prefer_existing_sort();
+
let expected = &[
"SortPreservingMergeExec: [d@3 ASC]",
" FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
];
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
- );
- assert_optimized!(
- expected,
- physical_plan,
- &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
- );
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3174,6 +3160,9 @@ fn do_not_preserve_ordering_through_repartition() ->
Result<()> {
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key,
filter_exec(input));
+ let test_config = TestConfig::default();
+
+ // Test: run EnforceDistribution, then EnforceSort.
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -3181,21 +3170,21 @@ fn do_not_preserve_ordering_through_repartition() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
-
- let expected = &[
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
+ let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
+ test_config.run(
+ expected_first_sort_enforcement,
+ physical_plan,
+ &SORT_DISTRIB_DISTRIB,
+ )?;
Ok(())
}
@@ -3218,12 +3207,9 @@ fn no_need_for_sort_after_filter() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3243,6 +3229,9 @@ fn do_not_preserve_ordering_through_repartition2() ->
Result<()> {
}]);
let physical_plan = sort_preserving_merge_exec(sort_req,
filter_exec(input));
+ let test_config = TestConfig::default();
+
+ // Test: run EnforceDistribution, then EnforceSort.
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -3250,14 +3239,10 @@ fn do_not_preserve_ordering_through_repartition2() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
-
- let expected = &[
+ // Test: result IS DIFFERENT, if EnforceSorting is run first:
+ let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
@@ -3265,7 +3250,11 @@ fn do_not_preserve_ordering_through_repartition2() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
+ test_config.run(
+ expected_first_sort_enforcement,
+ physical_plan,
+ &SORT_DISTRIB_DISTRIB,
+ )?;
Ok(())
}
@@ -3285,12 +3274,9 @@ fn do_not_preserve_ordering_through_repartition3() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3380,22 +3366,17 @@ fn do_not_add_unnecessary_hash() -> Result<()> {
let input = parquet_exec_with_sort(vec![sort_key]);
let physical_plan = aggregate_exec_with_alias(input, alias);
+ // TestConfig:
+ // Make sure target partition number is 1. In this case hash repartition
is unnecessary.
+ let test_config = TestConfig::default().with_query_execution_partitions(1);
+
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
" AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- // Make sure target partition number is 1. In this case hash repartition
is unnecessary
- assert_optimized!(
- expected,
- physical_plan.clone(),
-
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(1)
- );
- assert_optimized!(
- expected,
- physical_plan,
- &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(1)
- );
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3412,6 +3393,10 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
let aggregate = aggregate_exec_with_alias(input, alias.clone());
let physical_plan = aggregate_exec_with_alias(aggregate, alias);
+ // TestConfig:
+ // Make sure target partition number is larger than 2 (e.g partition
number at the source).
+ let test_config = TestConfig::default().with_query_execution_partitions(4);
+
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
// Since hash requirements of this operator is satisfied. There
shouldn't be
@@ -3423,17 +3408,8 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- // Make sure target partition number is larger than 2 (e.g partition
number at the source).
- assert_optimized!(
- expected,
- physical_plan.clone(),
-
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(4)
- );
- assert_optimized!(
- expected,
- physical_plan,
- &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(4)
- );
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3450,12 +3426,10 @@ fn optimize_away_unnecessary_repartition() ->
Result<()> {
let expected =
&["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet"];
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
+
+ let test_config = TestConfig::default();
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
@@ -3481,12 +3455,9 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(
- expected,
- physical_plan.clone(),
- &TestConfig::new(DoFirst::Distribution)
- );
- assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
+ let test_config = TestConfig::default();
+ test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?;
+ test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?;
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]