This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1861c3d42b Minor: Improve comments in EnforceDistribution tests (#8474)
1861c3d42b is described below
commit 1861c3d42bbb0c4caa9c5a61c65065b87b32aa35
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Dec 11 10:10:50 2023 -0500
Minor: Improve comments in EnforceDistribution tests (#8474)
---
.../src/physical_optimizer/enforce_distribution.rs | 34 ++++++++++++++--------
1 file changed, 22 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 4befea741c..3aed6555f3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -256,7 +256,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check
whether the requirements can be satisfied by adjusting join keys ordering:
/// Requirements can not be satisfied, clear the current requirements,
generate new requirements(to pushdown) based on the current join keys, return
the unchanged plan.
/// Requirements is already satisfied, clear the current requirements,
generate new requirements(to pushdown) based on the current join keys, return
the unchanged plan.
-/// Requirements can be satisfied by adjusting keys ordering, clear the
current requiements, generate new requirements(to pushdown) based on the
adjusted join keys, return the changed plan.
+/// Requirements can be satisfied by adjusting keys ordering, clear the
current requirements, generate new requirements(to pushdown) based on the
adjusted join keys, return the changed plan.
///
/// 2) If the current plan is Aggregation, check whether the requirements can
be satisfied by adjusting group by keys ordering:
/// Requirements can not be satisfied, clear all the requirements, return
the unchanged plan.
@@ -928,7 +928,7 @@ fn add_roundrobin_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying
ordering requirements
// - Usage of order preserving variants is not desirable
- // (determined by flag
`config.optimizer.bounded_order_preserving_variants`)
+ // (determined by flag `config.optimizer.prefer_existing_sort`)
let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition =
RepartitionExec::try_new(input,
partitioning)?.with_preserve_order();
@@ -996,7 +996,7 @@ fn add_hash_on_top(
// - Preserving ordering is not helpful in terms of satisfying ordering
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
- // `config.optimizer.bounded_order_preserving_variants`).
+ // `config.optimizer.prefer_existing_sort`).
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin
repartition
// before it:
@@ -1045,7 +1045,7 @@ fn add_spm_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying
ordering requirements
// - Usage of order preserving variants is not desirable
- // (determined by flag
`config.optimizer.bounded_order_preserving_variants`)
+ // (determined by flag `config.optimizer.prefer_existing_sort`)
let should_preserve_ordering = input.output_ordering().is_some();
let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
let existing_ordering = input.output_ordering().unwrap_or(&[]);
@@ -2026,7 +2026,7 @@ pub(crate) mod tests {
fn ensure_distribution_helper(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
- bounded_order_preserving_variants: bool,
+ prefer_existing_sort: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
@@ -2034,7 +2034,7 @@ pub(crate) mod tests {
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
- config.optimizer.prefer_existing_sort =
bounded_order_preserving_variants;
+ config.optimizer.prefer_existing_sort = prefer_existing_sort;
ensure_distribution(distribution_context, &config).map(|item|
item.into().plan)
}
@@ -2056,23 +2056,33 @@ pub(crate) mod tests {
}
/// Runs the repartition optimizer and asserts the plan against the
expected
+ /// Arguments
+ /// * `EXPECTED_LINES` - Expected output plan
+ /// * `PLAN` - Input plan
+ /// * `FIRST_ENFORCE_DIST` -
+ /// true: (EnforceDistribution, EnforceDistribution, EnforceSorting)
+ /// false: else runs (EnforceSorting, EnforceDistribution,
EnforceDistribution)
+ /// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition /
resort data if it is already sorted
+ /// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition
to
+ /// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file
scans
+ /// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to
repartition
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST,
false, 10, false, 1024);
};
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => {
- assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST,
$BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024);
+ ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$PREFER_EXISTING_SORT: expr) => {
+ assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST,
$PREFER_EXISTING_SORT, 10, false, 1024);
};
- ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr,
$REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
+ ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr,
$PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS:
expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size =
$REPARTITION_FILE_MIN_SIZE;
- config.optimizer.prefer_existing_sort =
$BOUNDED_ORDER_PRESERVING_VARIANTS;
+ config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
// NOTE: These tests verify the joint `EnforceDistribution` +
`EnforceSorting` cascade
// because they were written prior to the separation of
`BasicEnforcement` into
@@ -3294,7 +3304,7 @@ pub(crate) mod tests {
];
assert_optimized!(expected, exec, true);
// In this case preserving ordering through order preserving operators
is not desirable
- // (according to flag: bounded_order_preserving_variants)
+ // (according to flag: PREFER_EXISTING_SORT)
// hence in this case ordering lost during CoalescePartitionsExec and
re-introduced with
// SortExec at the top.
let expected = &[
@@ -4341,7 +4351,7 @@ pub(crate) mod tests {
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC]",
];
- // last flag sets config.optimizer.bounded_order_preserving_variants
+ // last flag sets config.optimizer.PREFER_EXISTING_SORT
assert_optimized!(expected, physical_plan.clone(), true, true);
assert_optimized!(expected, physical_plan, false, true);