zhuqi-lucas commented on code in PR #21976: URL: https://github.com/apache/datafusion/pull/21976#discussion_r3279072061
########## datafusion/physical-optimizer/src/ensure_requirements/mod.rs: ########## @@ -0,0 +1,1814 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EnsureRequirements`] optimizer rule that enforces both distribution and +//! sorting requirements in a **single bottom-up pass**. +//! +//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting` +//! rules with a unified approach inspired by Apache Spark's `EnsureRequirements` +//! and Presto/Trino's `AddExchanges`. +//! +//! # Motivation +//! +//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`) +//! suffers from non-idempotent composition: `EnforceSorting`'s `pushdown_sorts` +//! can break distribution invariants established by `EnforceDistribution`, +//! because `SortExec.preserve_partitioning` couples sorting and distribution +//! decisions. See <https://github.com/apache/datafusion/issues/21973> for details. +//! +//! # Architecture +//! +//! ```text +//! EnsureRequirements::optimize(plan) +//! │ +//! ├─ Phase 1 (optional): reorder_join_keys (top-down) +//! │ └─ Same as existing adjust_input_keys_ordering +//! │ +//! └─ Phase 2: ensure_requirements (single bottom-up pass) +//! └─ For each node (bottom-up), for each child: +//! Step 1: Ensure distribution requirement +//! └─ Add RepartitionExec / CoalescePartitionsExec / SortPreservingMergeExec +//! Step 2: Ensure ordering requirement (distribution-aware) +//! └─ Add SortExec with correct preserve_partitioning + SPM if needed +//! ``` +//! +//! # Key Properties +//! +//! - **Idempotent**: Running the rule twice produces the same plan. +//! - **Distribution before sorting**: For each child, distribution is resolved +//! before ordering, so sorting decisions always have full distribution context. +//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the bottom-up +//! pass only adds `SortExec` where the child doesn't already satisfy the +//! ordering requirement, naturally placing sorts at the deepest valid position. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_plan::ExecutionPlan; + +// Internal functions used directly instead of calling EnforceDistribution/EnforceSorting +// as opaque boxes. This gives us control over the pass ordering and enables +// future merging into a true single-pass architecture. + +// For the no-pushdown variant (Phase 3) +use crate::enforce_sorting::replace_with_order_preserving_variants::{ + OrderPreservationContext, replace_with_order_preserving_variants, +}; +use crate::enforce_sorting::{ + PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, ensure_sorting, + parallelize_sorts, replace_with_partial_sort, +}; + +/// Optimizer rule that enforces both distribution and sorting requirements. +/// +/// This rule combines the functionality of `EnforceDistribution` and +/// `EnforceSorting` into a coordinated sequence where distribution is +/// always settled before sorting for each operator, preventing the +/// non-idempotent interactions between the two separate rules. +/// +/// See [module level documentation](self) for more details. +#[derive(Default, Debug)] +pub struct EnsureRequirements {} + +impl EnsureRequirements { + /// Create a new `EnsureRequirements` optimizer rule. + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for EnsureRequirements { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Phase 1: Join key reordering (top-down, from EnforceDistribution) + use crate::enforce_distribution::{ + PlanWithKeyRequirements, adjust_input_keys_ordering, + }; + let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering; + let plan = if top_down_join_key_reordering { + let ctx = PlanWithKeyRequirements::new_default(plan); + ctx.transform_down(adjust_input_keys_ordering).data()?.plan + } else { + use crate::enforce_distribution::reorder_join_keys_to_inputs; + plan.transform_up(|p| Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?))) + .data()? + }; + + // Phase 2: Combined distribution + sorting enforcement (single bottom-up pass) + // For each node: distribution first, then sorting. + use crate::enforce_distribution::{DistributionContext, ensure_distribution}; + use crate::enforce_sorting::{PlanWithCorrespondingSort, ensure_sorting}; + + // Step 2a: Distribution enforcement (bottom-up) + let dist_ctx = DistributionContext::new_default(plan); + let dist_ctx = dist_ctx + .transform_up(|ctx| ensure_distribution(ctx, config)) + .data()?; + + // Step 2b: Sorting enforcement (bottom-up) — runs on distribution-fixed plan + let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan); + let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data; + + // Phase 3: Optimization passes + // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort) + use crate::enforce_sorting::{ + PlanWithCorrespondingCoalescePartitions, parallelize_sorts, + replace_with_partial_sort, + }; + let plan = if config.optimizer.repartition_sorts { + let ctx = PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan); + ctx.transform_up(parallelize_sorts).data()?.plan + } else { + sort_ctx.plan + }; + + // 3b: Order-preserving variants + use crate::enforce_sorting::replace_with_order_preserving_variants::{ + OrderPreservationContext, replace_with_order_preserving_variants, + }; + let ctx = OrderPreservationContext::new_default(plan); + let plan = ctx + .transform_up(|c| { + replace_with_order_preserving_variants(c, false, true, config) + }) + .data()? + .plan; + + // 3c: Sort pushdown (distribution-aware) + use crate::enforce_sorting::sort_pushdown::{ + SortPushDown, assign_initial_requirements, pushdown_sorts, + }; + let mut sort_pushdown = SortPushDown::new_default(plan); + assign_initial_requirements(&mut sort_pushdown); + let adjusted = pushdown_sorts(sort_pushdown)?; + + // 3d: Partial sort + adjusted + .plan + .transform_up(|p| Ok(Transformed::yes(replace_with_partial_sort(p)?))) + .data() + } + + fn name(&self) -> &str { + "EnsureRequirements" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely +/// by bottom-up passes. Currently experimental — some plan shapes differ +/// from the `pushdown_sorts` variant (less optimal but still correct). +#[derive(Default, Debug)] +pub struct EnsureRequirementsNoPushdown {} + +impl EnsureRequirementsNoPushdown { + /// Create a new rule. + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Step 1: Distribution enforcement + use crate::enforce_distribution::{ + DistributionContext as DistCtx, PlanWithKeyRequirements as KeyReqs, + adjust_input_keys_ordering as adj_keys, ensure_distribution as ensure_dist, + }; + let top_down = config.optimizer.top_down_join_key_reordering; + let plan = if top_down { + KeyReqs::new_default(plan) + .transform_down(adj_keys) + .data()? + .plan + } else { + use crate::enforce_distribution::reorder_join_keys_to_inputs; + plan.transform_up(|p| Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?))) + .data()? + }; + let dist_ctx = DistCtx::new_default(plan); + let plan = dist_ctx + .transform_up(|ctx| ensure_dist(ctx, config)) + .data()? + .plan; + + // Step 2: ensure_sorting (bottom-up, NO pushdown_sorts) + let plan_requirements = PlanWithCorrespondingSort::new_default(plan); + let adjusted = plan_requirements.transform_up(ensure_sorting)?.data; + + // Step 3: parallelize_sorts (optional) + let plan = if config.optimizer.repartition_sorts { + let ctx = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); + ctx.transform_up(parallelize_sorts).data()?.plan + } else { + adjusted.plan + }; + + // Step 4: order-preserving variants + let ctx = OrderPreservationContext::new_default(plan); + let plan = ctx + .transform_up(|c| { + replace_with_order_preserving_variants(c, false, true, config) + }) + .data()? + .plan; + + // Step 5: partial sort + let plan = plan + .transform_up(|p| Ok(Transformed::yes(replace_with_partial_sort(p)?))) + .data()?; + + // NO pushdown_sorts — sort placement is purely bottom-up. + // Step 6: Final distribution enforcement + let dist_ctx2 = DistCtx::new_default(plan); + let plan = dist_ctx2 + .transform_up(|ctx| ensure_dist(ctx, config)) + .data()? + .plan; + + // Step 7: Fix any sorting violations the final distribution pass introduced. + let sort_ctx2 = PlanWithCorrespondingSort::new_default(plan); + let adjusted2 = sort_ctx2.transform_up(ensure_sorting)?.data; + + Ok(adjusted2.plan) + } + + fn name(&self) -> &str { + "EnsureRequirementsNoPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { Review Comment: Thanks @alamb, this is a good point, and addressed in the latest PR — tests moved core/tests/physical_optimizer/ensure_requirements.rs, Tier 1 (4 tests) deleted, Tier 2 folded into one combined test, Tier 3 trimmed from 10x sweep to assert_idempotent. Kept the pairs you flagged (#14150, projection vs OutputRequirement). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
