This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 297af9533b refactor(15003): refactor test suite in
EnforceDistribution, to use test config builder (#15010)
297af9533b is described below
commit 297af9533b7779b7d4bf847b8cea48e739bc16cc
Author: wiedld <[email protected]>
AuthorDate: Fri Mar 7 05:57:47 2025 -0800
refactor(15003): refactor test suite in EnforceDistribution, to use test
config builder (#15010)
---
.../physical_optimizer/enforce_distribution.rs | 657 +++++++++++++++------
1 file changed, 488 insertions(+), 169 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index 85d826109f..f22a896c18 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -372,46 +372,91 @@ macro_rules! plans_matches_expected {
}
}
+fn test_suite_default_config_options() -> ConfigOptions {
+ let mut config = ConfigOptions::new();
+
+ // By default, will not repartition / resort data if it is already sorted.
+ config.optimizer.prefer_existing_sort = false;
+
+ // By default, will attempt to convert Union to Interleave.
+ config.optimizer.prefer_existing_union = false;
+
+ // By default, will not repartition file scans.
+ config.optimizer.repartition_file_scans = false;
+ config.optimizer.repartition_file_min_size = 1024;
+
+ // By default, set query execution concurrency to 10.
+ config.execution.target_partitions = 10;
+
+ // Use a small batch size, to trigger RoundRobin in tests
+ config.execution.batch_size = 1;
+
+ config
+}
+
+/// How the optimizers are run.
+#[derive(PartialEq, Clone)]
+enum DoFirst {
+ /// Runs: (EnforceDistribution, EnforceDistribution, EnforceSorting)
+ Distribution,
+ /// Runs: (EnforceSorting, EnforceDistribution, EnforceDistribution)
+ Sorting,
+}
+
+#[derive(Clone)]
+struct TestConfig {
+ config: ConfigOptions,
+ optimizers_to_run: DoFirst,
+}
+
+impl TestConfig {
+ fn new(optimizers_to_run: DoFirst) -> Self {
+ Self {
+ config: test_suite_default_config_options(),
+ optimizers_to_run,
+ }
+ }
+
+ /// If preferred, will not repartition / resort data if it is already
sorted.
+ fn with_prefer_existing_sort(mut self) -> Self {
+ self.config.optimizer.prefer_existing_sort = true;
+ self
+ }
+
+ /// If preferred, will not attempt to convert Union to Interleave.
+ fn with_prefer_existing_union(mut self) -> Self {
+ self.config.optimizer.prefer_existing_union = true;
+ self
+ }
+
+ /// If preferred, will repartition file scans.
+ /// Accepts a minimum file size to repartition.
+ fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) ->
Self {
+ self.config.optimizer.repartition_file_scans = true;
+ self.config.optimizer.repartition_file_min_size = file_min_size;
+ self
+ }
+
+ /// Set the preferred target partitions for query execution concurrency.
+ fn with_query_execution_partitions(mut self, target_partitions: usize) ->
Self {
+ self.config.execution.target_partitions = target_partitions;
+ self
+ }
+}
+
/// Runs the repartition optimizer and asserts the plan against the expected
/// Arguments
/// * `EXPECTED_LINES` - Expected output plan
/// * `PLAN` - Input plan
-/// * `FIRST_ENFORCE_DIST` -
-/// true: (EnforceDistribution, EnforceDistribution, EnforceSorting)
-/// false: else runs (EnforceSorting, EnforceDistribution,
EnforceDistribution)
-/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition /
resort data if it is already sorted
-/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
-/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file
scans
-/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
-/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to
convert Union to Interleave
+/// * `CONFIG` - [`TestConfig`]
macro_rules! assert_optimized {
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
- assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false,
10, false, 1024, false);
- };
-
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$PREFER_EXISTING_SORT: expr) => {
- assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST,
$PREFER_EXISTING_SORT, 10, false, 1024, false);
- };
-
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => {
- assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST,
$PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION);
- };
-
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS:
expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
- assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST,
$PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS,
$REPARTITION_FILE_MIN_SIZE, false);
- };
-
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS:
expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
+ ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
- let mut config = ConfigOptions::new();
- config.execution.target_partitions = $TARGET_PARTITIONS;
- config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
- config.optimizer.repartition_file_min_size =
$REPARTITION_FILE_MIN_SIZE;
- config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
- config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;
- // Use a small batch size, to trigger RoundRobin in tests
- config.execution.batch_size = 1;
+ let TestConfig {
+ config,
+ optimizers_to_run,
+ } = $CONFIG;
// NOTE: These tests verify the joint `EnforceDistribution` +
`EnforceSorting` cascade
// because they were written prior to the separation of
`BasicEnforcement` into
@@ -455,7 +500,7 @@ macro_rules! assert_optimized {
// TODO: End state payloads will be checked here.
}
- let optimized = if $FIRST_ENFORCE_DIST {
+ let optimized = if *optimizers_to_run == DoFirst::Distribution {
// Run enforce distribution rule first:
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
@@ -602,8 +647,12 @@ fn multi_hash_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(expected, top_join.clone(), true);
- assert_optimized!(expected, top_join, false);
+ assert_optimized!(
+ expected,
+ top_join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, top_join,
&TestConfig::new(DoFirst::Sorting));
}
JoinType::RightSemi | JoinType::RightAnti => {}
}
@@ -666,8 +715,12 @@ fn multi_hash_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(expected, top_join.clone(), true);
- assert_optimized!(expected, top_join, false);
+ assert_optimized!(
+ expected,
+ top_join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, top_join,
&TestConfig::new(DoFirst::Sorting));
}
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {}
}
@@ -723,8 +776,12 @@ fn multi_joins_after_alias() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, top_join.clone(), true);
- assert_optimized!(expected, top_join, false);
+ assert_optimized!(
+ expected,
+ top_join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
// Join on (a2 == c)
let top_join_on = vec![(
@@ -749,8 +806,12 @@ fn multi_joins_after_alias() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, top_join.clone(), true);
- assert_optimized!(expected, top_join, false);
+ assert_optimized!(
+ expected,
+ top_join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -803,8 +864,12 @@ fn multi_joins_after_multi_alias() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, top_join.clone(), true);
- assert_optimized!(expected, top_join, false);
+ assert_optimized!(
+ expected,
+ top_join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -843,8 +908,12 @@ fn join_after_agg_alias() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, join.clone(), true);
- assert_optimized!(expected, join, false);
+ assert_optimized!(
+ expected,
+ join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -896,8 +965,12 @@ fn hash_join_key_ordering() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, join.clone(), true);
- assert_optimized!(expected, join, false);
+ assert_optimized!(
+ expected,
+ join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1022,8 +1095,16 @@ fn multi_hash_join_key_ordering() -> Result<()> {
" ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, filter_top_join.clone(), true);
- assert_optimized!(expected, filter_top_join, false);
+ assert_optimized!(
+ expected,
+ filter_top_join.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected,
+ filter_top_join,
+ &TestConfig::new(DoFirst::Sorting)
+ );
Ok(())
}
@@ -1301,6 +1382,7 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
Ok(())
}
+/// These test cases use [`TestConfig::with_prefer_existing_sort`].
#[test]
fn multi_smj_joins() -> Result<()> {
let left = parquet_exec();
@@ -1402,7 +1484,11 @@ fn multi_smj_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(expected, top_join.clone(), true, true);
+ assert_optimized!(
+ expected,
+ top_join.clone(),
+ &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+ );
let expected_first_sort_enforcement = match join_type {
// Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3
SortExecs
@@ -1456,7 +1542,11 @@ fn multi_smj_joins() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
- assert_optimized!(expected_first_sort_enforcement, top_join, false,
true);
+ assert_optimized!(
+ expected_first_sort_enforcement,
+ top_join,
+ &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+ );
match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right |
JoinType::Full => {
@@ -1513,7 +1603,11 @@ fn multi_smj_joins() -> Result<()> {
// this match arm cannot be reached
_ => unreachable!()
};
- assert_optimized!(expected, top_join.clone(), true, true);
+ assert_optimized!(
+ expected,
+ top_join.clone(),
+
&TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+ );
let expected_first_sort_enforcement = match join_type {
// Should include 6 RepartitionExecs (3 of them preserves
order) and 3 SortExecs
@@ -1559,7 +1653,12 @@ fn multi_smj_joins() -> Result<()> {
// this match arm cannot be reached
_ => unreachable!()
};
- assert_optimized!(expected_first_sort_enforcement, top_join,
false, true);
+
+ assert_optimized!(
+ expected_first_sort_enforcement,
+ top_join,
+
&TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+ );
}
_ => {}
}
@@ -1568,6 +1667,7 @@ fn multi_smj_joins() -> Result<()> {
Ok(())
}
+/// These test cases use [`TestConfig::with_prefer_existing_sort`].
#[test]
fn smj_join_key_ordering() -> Result<()> {
// group by (a as a1, b as b1)
@@ -1633,7 +1733,11 @@ fn smj_join_key_ordering() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, join.clone(), true, true);
+ assert_optimized!(
+ expected,
+ join.clone(),
+ &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+ );
let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
@@ -1659,7 +1763,11 @@ fn smj_join_key_ordering() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected_first_sort_enforcement, join, false, true);
+ assert_optimized!(
+ expected_first_sort_enforcement,
+ join,
+ &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+ );
Ok(())
}
@@ -1690,7 +1798,7 @@ fn merge_does_not_need_sort() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(expected, exec, true);
+ assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Distribution));
// In this case preserving ordering through order preserving operators is
not desirable
// (according to flag: PREFER_EXISTING_SORT)
@@ -1702,7 +1810,7 @@ fn merge_does_not_need_sort() -> Result<()> {
" CoalesceBatchesExec: target_batch_size=4096",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(expected, exec, false);
+ assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1743,8 +1851,12 @@ fn union_to_interleave() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan.clone(), false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan.clone(),
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1786,24 +1898,16 @@ fn union_not_to_interleave() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- // no sort in the plan but since we need it as a parameter, make it
default false
- let prefer_existing_sort = false;
- let first_enforce_distribution = true;
- let prefer_existing_union = true;
assert_optimized!(
expected,
plan.clone(),
- first_enforce_distribution,
- prefer_existing_sort,
- prefer_existing_union
+ &TestConfig::new(DoFirst::Distribution).with_prefer_existing_union()
);
assert_optimized!(
expected,
plan,
- !first_enforce_distribution,
- prefer_existing_sort,
- prefer_existing_union
+ &TestConfig::new(DoFirst::Sorting).with_prefer_existing_union()
);
Ok(())
@@ -1821,8 +1925,12 @@ fn added_repartition_to_single_partition() -> Result<()>
{
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1840,14 +1948,17 @@ fn repartition_deepest_node() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
#[test]
-
fn repartition_unsorted_limit() -> Result<()> {
let plan = limit_exec(filter_exec(parquet_exec()));
@@ -1861,8 +1972,12 @@ fn repartition_unsorted_limit() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1883,8 +1998,12 @@ fn repartition_sorted_limit() -> Result<()> {
" SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1911,8 +2030,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1941,8 +2064,12 @@ fn repartition_ignores_limit() -> Result<()> {
// Expect no repartition to happen for local limit
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1961,8 +2088,12 @@ fn repartition_ignores_union() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -1982,8 +2113,12 @@ fn repartition_through_sort_preserving_merge() ->
Result<()> {
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2008,14 +2143,18 @@ fn repartition_ignores_sort_preserving_merge() ->
Result<()> {
" DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan, false);
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2039,7 +2178,11 @@ fn
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
@@ -2048,11 +2191,12 @@ fn
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan, false);
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
+/// These test cases use [`TestConfig::with_prefer_existing_sort`].
#[test]
fn repartition_does_not_destroy_sort() -> Result<()> {
// SortRequired
@@ -2075,8 +2219,16 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true, true);
- assert_optimized!(expected, plan, false, true);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+ );
+ assert_optimized!(
+ expected,
+ plan,
+ &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+ );
Ok(())
}
@@ -2116,8 +2268,12 @@ fn repartition_does_not_destroy_sort_more_complex() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2150,7 +2306,11 @@ fn repartition_transitively_with_projection() ->
Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
let expected_first_sort_enforcement = &[
"SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]",
@@ -2160,7 +2320,11 @@ fn repartition_transitively_with_projection() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected_first_sort_enforcement, plan, false);
+ assert_optimized!(
+ expected_first_sort_enforcement,
+ plan,
+ &TestConfig::new(DoFirst::Sorting)
+ );
Ok(())
}
@@ -2192,8 +2356,12 @@ fn repartition_ignores_transitively_with_projection() ->
Result<()> {
" ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2225,8 +2393,12 @@ fn repartition_transitively_past_sort_with_projection()
-> Result<()> {
" ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
- assert_optimized!(expected, plan, false);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2249,7 +2421,11 @@ fn repartition_transitively_past_sort_with_filter() ->
Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2259,7 +2435,11 @@ fn repartition_transitively_past_sort_with_filter() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected_first_sort_enforcement, plan, false);
+ assert_optimized!(
+ expected_first_sort_enforcement,
+ plan,
+ &TestConfig::new(DoFirst::Sorting)
+ );
Ok(())
}
@@ -2296,7 +2476,11 @@ fn
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected, plan.clone(), true);
+ assert_optimized!(
+ expected,
+ plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2306,7 +2490,11 @@ fn
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
- assert_optimized!(expected_first_sort_enforcement, plan, false);
+ assert_optimized!(
+ expected_first_sort_enforcement,
+ plan,
+ &TestConfig::new(DoFirst::Sorting)
+ );
Ok(())
}
@@ -2329,8 +2517,12 @@ fn parallelization_single_partition() -> Result<()> {
" AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
" DataSourceExec: file_groups={2 groups: [[x:0..50],
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true,
10);
- assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
+
+ let test_config = TestConfig::new(DoFirst::Distribution)
+ .with_prefer_repartition_file_scans(10)
+ .with_query_execution_partitions(2);
+ assert_optimized!(expected_parquet, plan_parquet, &test_config);
+ assert_optimized!(expected_csv, plan_csv, &test_config);
Ok(())
}
@@ -2346,24 +2538,22 @@ fn parallelization_multiple_files() -> Result<()> {
let plan =
filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
let plan = sort_required_exec_with_req(plan, sort_key);
+ let test_config = TestConfig::new(DoFirst::Distribution)
+ .with_prefer_existing_sort()
+ .with_prefer_repartition_file_scans(1);
+
// The groups must have only contiguous ranges of rows from the same file
// if any group has rows from multiple files, the data is no longer sorted
destroyed
// https://github.com/apache/datafusion/issues/8451
let expected = [
"SortRequiredExec: [a@0 ASC]",
" FilterExec: c@2 = 0",
- " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100],
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC],
file_type=parquet", ];
- let target_partitions = 3;
- let repartition_size = 1;
+ " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100],
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC],
file_type=parquet",
+ ];
assert_optimized!(
expected,
plan,
- true,
- true,
- target_partitions,
- true,
- repartition_size,
- false
+ &test_config.clone().with_query_execution_partitions(3)
);
let expected = [
@@ -2371,17 +2561,10 @@ fn parallelization_multiple_files() -> Result<()> {
" FilterExec: c@2 = 0",
" DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25],
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- let target_partitions = 8;
- let repartition_size = 1;
assert_optimized!(
expected,
plan,
- true,
- true,
- target_partitions,
- true,
- repartition_size,
- false
+ &test_config.with_query_execution_partitions(8)
);
Ok(())
@@ -2432,7 +2615,13 @@ fn parallelization_compressed_csv() -> Result<()> {
.build(),
vec![("a".to_string(), "a".to_string())],
);
- assert_optimized!(expected, plan, true, false, 2, true, 10, false);
+ assert_optimized!(
+ expected,
+ plan,
+ &TestConfig::new(DoFirst::Distribution)
+ .with_query_execution_partitions(2)
+ .with_prefer_repartition_file_scans(10)
+ );
}
Ok(())
}
@@ -2457,8 +2646,11 @@ fn parallelization_two_partitions() -> Result<()> {
// Plan already has two partitions
" DataSourceExec: file_groups={2 groups: [[x:0..100],
[y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true,
10);
- assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
+ let test_config = TestConfig::new(DoFirst::Distribution)
+ .with_query_execution_partitions(2)
+ .with_prefer_repartition_file_scans(10);
+ assert_optimized!(expected_parquet, plan_parquet, &test_config);
+ assert_optimized!(expected_csv, plan_csv, &test_config);
Ok(())
}
@@ -2482,8 +2674,11 @@ fn parallelization_two_partitions_into_four() ->
Result<()> {
// Multiple source files splitted across partitions
" DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100],
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv,
has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true,
10);
- assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10);
+ let test_config = TestConfig::new(DoFirst::Distribution)
+ .with_query_execution_partitions(4)
+ .with_prefer_repartition_file_scans(10);
+ assert_optimized!(expected_parquet, plan_parquet, &test_config);
+ assert_optimized!(expected_csv, plan_csv, &test_config);
Ok(())
}
@@ -2514,8 +2709,16 @@ fn parallelization_sorted_limit() -> Result<()> {
// Doesn't parallelize for SortExec without preserve_partitioning
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2558,8 +2761,16 @@ fn parallelization_limit_with_filter() -> Result<()> {
// SortExec doesn't benefit from input partitioning
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2606,8 +2817,16 @@ fn parallelization_ignores_limit() -> Result<()> {
" LocalLimitExec: fetch=100",
" DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2635,8 +2854,16 @@ fn parallelization_union_inputs() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2663,8 +2890,16 @@ fn parallelization_prior_to_sort_preserving_merge() ->
Result<()> {
let expected_csv = &[
"DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2697,8 +2932,16 @@ fn parallelization_sort_preserving_merge_with_union() ->
Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2728,8 +2971,16 @@ fn parallelization_does_not_benefit() -> Result<()> {
"SortRequiredExec: [c@2 ASC]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2768,7 +3019,11 @@ fn
parallelization_ignores_transitively_with_projection_parquet() -> Result<()>
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected_parquet, plan_parquet, true);
+ assert_optimized!(
+ expected_parquet,
+ plan_parquet,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2807,7 +3062,11 @@ fn
parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
- assert_optimized!(expected_csv, plan_csv, true);
+ assert_optimized!(
+ expected_csv,
+ plan_csv,
+ &TestConfig::new(DoFirst::Distribution)
+ );
Ok(())
}
@@ -2831,12 +3090,17 @@ fn remove_redundant_roundrobins() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, physical_plan.clone(), true);
- assert_optimized!(expected, physical_plan, false);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
+/// This test case uses [`TestConfig::with_prefer_existing_sort`].
#[test]
fn remove_unnecessary_spm_after_filter() -> Result<()> {
let schema = schema();
@@ -2855,13 +3119,21 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- // last flag sets config.optimizer.PREFER_EXISTING_SORT
- assert_optimized!(expected, physical_plan.clone(), true, true);
- assert_optimized!(expected, physical_plan, false, true);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+ );
+ assert_optimized!(
+ expected,
+ physical_plan,
+ &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+ );
Ok(())
}
+/// This test case uses [`TestConfig::with_prefer_existing_sort`].
#[test]
fn preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
@@ -2878,9 +3150,16 @@ fn preserve_ordering_through_repartition() -> Result<()>
{
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
];
- // last flag sets config.optimizer.PREFER_EXISTING_SORT
- assert_optimized!(expected, physical_plan.clone(), true, true);
- assert_optimized!(expected, physical_plan, false, true);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+ );
+ assert_optimized!(
+ expected,
+ physical_plan,
+ &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+ );
Ok(())
}
@@ -2903,7 +3182,11 @@ fn do_not_preserve_ordering_through_repartition() ->
Result<()> {
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan.clone(), true);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2912,7 +3195,7 @@ fn do_not_preserve_ordering_through_repartition() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan, false);
+ assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2935,8 +3218,12 @@ fn no_need_for_sort_after_filter() -> Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan.clone(), true);
- assert_optimized!(expected, physical_plan, false);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2964,7 +3251,11 @@ fn do_not_preserve_ordering_through_repartition2() ->
Result<()> {
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan.clone(), true);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2974,7 +3265,7 @@ fn do_not_preserve_ordering_through_repartition2() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan, false);
+ assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -2994,8 +3285,12 @@ fn do_not_preserve_ordering_through_repartition3() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
- assert_optimized!(expected, physical_plan.clone(), true);
- assert_optimized!(expected, physical_plan, false);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -3091,8 +3386,16 @@ fn do_not_add_unnecessary_hash() -> Result<()> {
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
// Make sure target partition number is 1. In this case hash repartition
is unnecessary
- assert_optimized!(expected, physical_plan.clone(), true, false, 1, false,
1024);
- assert_optimized!(expected, physical_plan, false, false, 1, false, 1024);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(1)
+ );
+ assert_optimized!(
+ expected,
+ physical_plan,
+ &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(1)
+ );
Ok(())
}
@@ -3121,8 +3424,16 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
" DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
// Make sure target partition number is larger than 2 (e.g partition
number at the source).
- assert_optimized!(expected, physical_plan.clone(), true, false, 4, false,
1024);
- assert_optimized!(expected, physical_plan, false, false, 4, false, 1024);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(4)
+ );
+ assert_optimized!(
+ expected,
+ physical_plan,
+ &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(4)
+ );
Ok(())
}
@@ -3139,8 +3450,12 @@ fn optimize_away_unnecessary_repartition() -> Result<()>
{
let expected =
&["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet"];
- assert_optimized!(expected, physical_plan.clone(), true);
- assert_optimized!(expected, physical_plan, false);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
@@ -3166,8 +3481,12 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
- assert_optimized!(expected, physical_plan.clone(), true);
- assert_optimized!(expected, physical_plan, false);
+ assert_optimized!(
+ expected,
+ physical_plan.clone(),
+ &TestConfig::new(DoFirst::Distribution)
+ );
+ assert_optimized!(expected, physical_plan,
&TestConfig::new(DoFirst::Sorting));
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]