This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ceff6cb44 Orthogonalize distribution and sort enforcement rules into
`EnforceDistribution` and `EnforceSorting` (#4839)
ceff6cb44 is described below
commit ceff6cb44ad621f89c9c4e9c0bd34cb204246910
Author: Mustafa akur <[email protected]>
AuthorDate: Tue Jan 10 01:10:43 2023 +0300
Orthogonalize distribution and sort enforcement rules into
`EnforceDistribution` and `EnforceSorting` (#4839)
* Separate sort rule
* Migrate to clearer file name, tidy up comments
* Add a note about tests verifying EnforceDistribution/EnforceSorting
jointly
* Address review, fix the stale comment
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/core/src/execution/context.rs | 39 ++++++------
.../{enforcement.rs => dist_enforcement.rs} | 73 +++++++++-------------
datafusion/core/src/physical_optimizer/mod.rs | 4 +-
.../core/src/physical_optimizer/repartition.rs | 10 ++-
.../{optimize_sorts.rs => sort_enforcement.rs} | 41 ++++++------
datafusion/core/src/physical_plan/limit.rs | 4 ++
datafusion/expr/src/logical_plan/builder.rs | 3 +-
7 files changed, 87 insertions(+), 87 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 2687902a3..98fe6ff79 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -68,7 +68,7 @@ use crate::physical_optimizer::repartition::Repartition;
use crate::config::ConfigOptions;
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
-use crate::physical_optimizer::enforcement::BasicEnforcement;
+use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json,
plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
@@ -91,9 +91,9 @@ use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
-use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
+use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion_optimizer::OptimizerConfig;
use datafusion_sql::planner::object_name_to_table_reference;
use uuid::Uuid;
@@ -1448,37 +1448,36 @@ impl SessionState {
// output partitioning of some operators in the plan tree, which
will influence
// other rules. Therefore, it should run as soon as possible. It
is optional because:
// - It's not used for the distributed engine, Ballista.
- // - It's conflicted with some parts of the BasicEnforcement,
since it will
- // introduce additional repartitioning while the
BasicEnforcement aims at
- // reducing unnecessary repartitioning.
+ // - It's conflicted with some parts of the EnforceDistribution,
since it will
+ // introduce additional repartitioning while EnforceDistribution
aims to
+ // reduce unnecessary repartitioning.
Arc::new(Repartition::new()),
// - Currently it will depend on the partition number to decide
whether to change the
// single node sort to parallel local sort and merge. Therefore,
GlobalSortSelection
// should run after the Repartition.
// - Since it will change the output ordering of some operators,
it should run
- // before JoinSelection and BasicEnforcement, which may depend on
that.
+ // before JoinSelection and EnforceSorting, which may depend on
that.
Arc::new(GlobalSortSelection::new()),
// Statistics-based join selection will change the Auto mode to a
real join implementation,
- // like collect left, or hash join, or future sort merge join,
which will
- // influence the BasicEnforcement to decide whether to add
additional repartition
- // and local sort to meet the distribution and ordering
requirements.
- // Therefore, it should run before BasicEnforcement.
+ // like collect left, or hash join, or future sort merge join,
which will influence the
+ // EnforceDistribution and EnforceSorting rules as they decide
whether to add additional
+ // repartitioning and local sorting steps to meet distribution and
ordering requirements.
+ // Therefore, it should run before EnforceDistribution and
EnforceSorting.
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer
rule applies the
// necessary transformations to make the query runnable (if it is
not already runnable).
// If the query can not be made runnable, the rule emits an error
with a diagnostic message.
// Since the transformations it applies may alter output
partitioning properties of operators
- // (e.g. by swapping hash join sides), this rule runs before
BasicEnforcement.
+ // (e.g. by swapping hash join sides), this rule runs before
EnforceDistribution.
Arc::new(PipelineFixer::new()),
- // BasicEnforcement is for adding essential repartition and local
sorting operators
- // to satisfy the required distribution and local sort
requirements.
- // Please make sure that the whole plan tree is determined.
- Arc::new(BasicEnforcement::new()),
- // The BasicEnforcement stage conservatively inserts sorts to
satisfy ordering requirements.
- // However, a deeper analysis may sometimes reveal that such a
sort is actually unnecessary.
- // These cases typically arise when we have reversible window
expressions or deep subqueries.
- // The rule below performs this analysis and removes unnecessary
sorts.
- Arc::new(OptimizeSorts::new()),
+ // The EnforceDistribution rule is for adding essential
repartition to satisfy the required
+ // distribution. Please make sure that the whole plan tree is
determined before this rule.
+ Arc::new(EnforceDistribution::new()),
+ // The EnforceSorting rule is for adding essential local sorting
to satisfy the required
+ // ordering. Please make sure that the whole plan tree is
determined before this rule.
+ // Note that one should always run this rule after running the
EnforceDistribution rule
+ // as the latter may break local sorting requirements.
+ Arc::new(EnforceSorting::new()),
// The CoalesceBatches rule will not influence the distribution
and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules,
it should run last.
Arc::new(CoalesceBatches::new()),
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
similarity index 97%
rename from datafusion/core/src/physical_optimizer/enforcement.rs
rename to datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 0b8206a78..aa8b07569 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-//! Enforcement optimizer rules are used to make sure the plan's Distribution
and Ordering
-//! requirements are met by inserting necessary [[RepartitionExec]] and
[[SortExec]].
-//!
+//! EnforceDistribution optimizer rule inspects the physical plan with respect
+//! to distribution requirements and adds [RepartitionExec]s to satisfy them
+//! when necessary.
use crate::config::ConfigOptions;
use crate::error::Result;
-use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -46,8 +45,8 @@ use datafusion_physical_expr::{
use std::collections::HashMap;
use std::sync::Arc;
-/// BasicEnforcement rule, it ensures the Distribution and Ordering
requirements are met
-/// in the strictest way. It might add additional [[RepartitionExec]] to the
plan tree
+/// The EnforceDistribution rule ensures that distribution requirements are met
+/// in the strictest way. It might add additional [RepartitionExec] to the
plan tree
/// and give a non-optimal plan, but it can avoid the possible data skew in
joins.
///
/// For example for a HashJoin with keys(a, b, c), the required
Distribution(a, b, c) can be satisfied by
@@ -55,16 +54,16 @@ use std::sync::Arc;
///
/// This rule only chooses the exactly match and satisfies the Distribution(a,
b, c) by a HashPartition(a, b, c).
#[derive(Default)]
-pub struct BasicEnforcement {}
+pub struct EnforceDistribution {}
-impl BasicEnforcement {
+impl EnforceDistribution {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
-impl PhysicalOptimizerRule for BasicEnforcement {
+impl PhysicalOptimizerRule for EnforceDistribution {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
@@ -81,7 +80,7 @@ impl PhysicalOptimizerRule for BasicEnforcement {
} else {
plan
};
- // Distribution and Ordering enforcement need to be applied bottom-up.
+ // Distribution enforcement needs to be applied bottom-up.
new_plan.transform_up(&{
|plan| {
let adjusted = if !top_down_join_key_reordering {
@@ -89,16 +88,13 @@ impl PhysicalOptimizerRule for BasicEnforcement {
} else {
plan
};
- Ok(Some(ensure_distribution_and_ordering(
- adjusted,
- target_partitions,
- )?))
+ Ok(Some(ensure_distribution(adjusted, target_partitions)?))
}
})
}
fn name(&self) -> &str {
- "BasicEnforcement"
+ "EnforceDistribution"
}
fn schema_check(&self) -> bool {
@@ -829,10 +825,11 @@ fn new_join_conditions(
new_join_on
}
-/// Within this function, it checks whether we need to add additional plan
operators
-/// of data exchanging and data ordering to satisfy the required distribution
and ordering.
-/// And we should avoid to manually add plan operators of data exchanging and
data ordering in other places
-fn ensure_distribution_and_ordering(
+/// This function checks whether we need to add additional data exchange
+/// operators to satisfy distribution requirements. Since this function
+/// takes care of such requirements, we should avoid manually adding data
+/// exchange operators in other places.
+fn ensure_distribution(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
@@ -841,13 +838,11 @@ fn ensure_distribution_and_ordering(
}
let required_input_distributions = plan.required_input_distribution();
- let required_input_orderings = plan.required_input_ordering();
let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
assert_eq!(children.len(), required_input_distributions.len());
- assert_eq!(children.len(), required_input_orderings.len());
// Add RepartitionExec to guarantee output partitioning
- let children = children
+ let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.into_iter()
.zip(required_input_distributions.into_iter())
.map(|(child, required)| {
@@ -870,24 +865,8 @@ fn ensure_distribution_and_ordering(
};
new_child
}
- });
-
- // Add local SortExec to guarantee output ordering within each partition
- let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
- .zip(required_input_orderings.into_iter())
- .map(|(child_result, required)| {
- let child = child_result?;
- if ordering_satisfy(child.output_ordering(), required, || {
- child.equivalence_properties()
- }) {
- Ok(child)
- } else {
- let sort_expr = required.unwrap().to_vec();
- add_sort_above_child(&child, sort_expr)
- }
})
.collect();
-
with_new_children_if_necessary(plan, new_children?)
}
@@ -979,6 +958,7 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
@@ -1136,8 +1116,15 @@ mod tests {
config.execution.target_partitions = 10;
// run optimizer
- let optimizer = BasicEnforcement {};
+ let optimizer = EnforceDistribution {};
let optimized = optimizer.optimize($PLAN, &config)?;
+ // NOTE: These tests verify the joint `EnforceDistribution` +
`EnforceSorting` cascade
+ // because they were written prior to the separation of
`BasicEnforcement` into
+ // `EnforceSorting` and `EnfoceDistribution`.
+ // TODO: Orthogonalize the tests here just to verify
`EnforceDistribution` and create
+ // new tests for the cascade.
+ let optimizer = EnforceSorting {};
+ let optimized = optimizer.optimize(optimized, &config)?;
// Now format correctly
let plan = displayable(optimized.as_ref()).indent().to_string();
@@ -1656,7 +1643,7 @@ mod tests {
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
];
- let bottom_left_join = ensure_distribution_and_ordering(
+ let bottom_left_join = ensure_distribution(
hash_join_exec(left.clone(), right.clone(), &join_on,
&JoinType::Inner),
10,
)?;
@@ -1686,7 +1673,7 @@ mod tests {
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
- let bottom_right_join = ensure_distribution_and_ordering(
+ let bottom_right_join = ensure_distribution(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
)?;
@@ -1775,7 +1762,7 @@ mod tests {
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
];
- let bottom_left_join = ensure_distribution_and_ordering(
+ let bottom_left_join = ensure_distribution(
hash_join_exec(left.clone(), right.clone(), &join_on,
&JoinType::Inner),
10,
)?;
@@ -1805,7 +1792,7 @@ mod tests {
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
- let bottom_right_join = ensure_distribution_and_ordering(
+ let bottom_right_join = ensure_distribution(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
)?;
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index fb07d54b9..3958a546a 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -20,14 +20,14 @@
pub mod aggregate_statistics;
pub mod coalesce_batches;
-pub mod enforcement;
+pub mod dist_enforcement;
pub mod global_sort_selection;
pub mod join_selection;
-pub mod optimize_sorts;
pub mod optimizer;
pub mod pipeline_checker;
pub mod pruning;
pub mod repartition;
+pub mod sort_enforcement;
mod utils;
pub mod pipeline_fixer;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index 2044b2aaa..98ca12a9e 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -241,7 +241,8 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
- use crate::physical_optimizer::enforcement::BasicEnforcement;
+ use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
+ use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
@@ -370,9 +371,12 @@ mod tests {
// run optimizer
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>>
= vec![
Arc::new(Repartition::new()),
- // The `BasicEnforcement` is an essential rule to be applied.
+ // EnforceDistribution is an essential rule to be applied.
// Otherwise, the correctness of the generated optimized plan
cannot be guaranteed
- Arc::new(BasicEnforcement::new()),
+ Arc::new(EnforceDistribution::new()),
+ // EnforceSorting is an essential rule to be applied.
+ // Otherwise, the correctness of the generated optimized plan
cannot be guaranteed
+ Arc::new(EnforceSorting::new()),
];
let optimized = optimizers.into_iter().fold($PLAN, |plan,
optimizer| {
optimizer.optimize(plan, &config).unwrap()
diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
similarity index 96%
rename from datafusion/core/src/physical_optimizer/optimize_sorts.rs
rename to datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 17b27bfa7..52463b4bd 100644
--- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -15,12 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-//! OptimizeSorts optimizer rule inspects [SortExec]s in the given physical
-//! plan and removes the ones it can prove unnecessary. The rule can work on
-//! valid *and* invalid physical plans with respect to sorting requirements,
-//! but always produces a valid physical plan in this sense.
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//! that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this
sense.
//!
-//! A non-realistic but easy to follow example: Assume that we somehow get the
fragment
+//! A non-realistic but easy to follow example for sort removals: Assume that
we
+//! somehow get the fragment
//! "SortExec: [nullable_col@0 ASC]",
//! " SortExec: [non_nullable_col@1 ASC]",
//! in the physical plan. The first sort is unnecessary since its result is
overwritten
@@ -46,16 +50,16 @@ use std::sync::Arc;
/// This rule inspects SortExec's in the given physical plan and removes the
/// ones it can prove unnecessary.
#[derive(Default)]
-pub struct OptimizeSorts {}
+pub struct EnforceSorting {}
-impl OptimizeSorts {
+impl EnforceSorting {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
-/// This is a "data class" we use within the [OptimizeSorts] rule that
+/// This is a "data class" we use within the [EnforceSorting] rule that
/// tracks the closest `SortExec` descendant for every child of a plan.
#[derive(Debug, Clone)]
struct PlanWithCorrespondingSort {
@@ -119,7 +123,7 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
}
}
-impl PhysicalOptimizerRule for OptimizeSorts {
+impl PhysicalOptimizerRule for EnforceSorting {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
@@ -127,12 +131,12 @@ impl PhysicalOptimizerRule for OptimizeSorts {
) -> Result<Arc<dyn ExecutionPlan>> {
// Execute a post-order traversal to adjust input key ordering:
let plan_requirements = PlanWithCorrespondingSort::new(plan);
- let adjusted = plan_requirements.transform_up(&optimize_sorts)?;
+ let adjusted = plan_requirements.transform_up(&ensure_sorting)?;
Ok(adjusted.plan)
}
fn name(&self) -> &str {
- "OptimizeSorts"
+ "EnforceSorting"
}
fn schema_check(&self) -> bool {
@@ -140,7 +144,7 @@ impl PhysicalOptimizerRule for OptimizeSorts {
}
}
-fn optimize_sorts(
+fn ensure_sorting(
requirements: PlanWithCorrespondingSort,
) -> Result<Option<PlanWithCorrespondingSort>> {
// Perform naive analysis at the beginning -- remove already-satisfied
sorts:
@@ -171,7 +175,8 @@ fn optimize_sorts(
let sort_expr = required_ordering.to_vec();
*child = add_sort_above_child(child, sort_expr)?;
sort_onwards.push((idx, child.clone()))
- } else if let [first, ..] = sort_onwards.as_slice() {
+ }
+ if let [first, ..] = sort_onwards.as_slice() {
// The ordering requirement is met, we can analyze if
there is an unnecessary sort:
let sort_any = first.1.clone();
let sort_exec = convert_to_sort_exec(&sort_any)?;
@@ -618,7 +623,7 @@ mod tests {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan,
state.config_options())?;
+ EnforceSorting::new().optimize(physical_plan,
state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -717,7 +722,7 @@ mod tests {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan,
state.config_options())?;
+ EnforceSorting::new().optimize(physical_plan,
state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -761,7 +766,7 @@ mod tests {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan,
state.config_options())?;
+ EnforceSorting::new().optimize(physical_plan,
state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -826,7 +831,7 @@ mod tests {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan,
state.config_options())?;
+ EnforceSorting::new().optimize(physical_plan,
state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
@@ -886,7 +891,7 @@ mod tests {
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
let optimized_physical_plan =
- OptimizeSorts::new().optimize(physical_plan,
state.config_options())?;
+ EnforceSorting::new().optimize(physical_plan,
state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index 3a3a4a20b..776fefd8b 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -287,6 +287,10 @@ impl ExecutionPlan for LocalLimitExec {
self.input.output_ordering()
}
+ fn maintains_input_order(&self) -> bool {
+ true
+ }
+
fn equivalence_properties(&self) -> EquivalenceProperties {
self.input.equivalence_properties()
}
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 428a31f14..d5e5257ea 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -273,7 +273,8 @@ impl LogicalPlanBuilder {
});
for (_, exprs) in groups {
let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
- // the partition and sort itself is done at physical level, see
the BasicEnforcement rule
+ // Partition and sorting is done at physical level, see the
EnforceDistribution
+ // and EnforceSorting rules.
plan = LogicalPlanBuilder::from(plan)
.window(window_exprs)?
.build()?;