zhuqi-lucas commented on code in PR #21976:
URL: https://github.com/apache/datafusion/pull/21976#discussion_r3279522574


##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1814 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+
+// Internal functions used directly instead of calling 
EnforceDistribution/EnforceSorting
+// as opaque boxes. This gives us control over the pass ordering and enables
+// future merging into a true single-pass architecture.
+
+// For the no-pushdown variant (Phase 3)
+use crate::enforce_sorting::replace_with_order_preserving_variants::{
+    OrderPreservationContext, replace_with_order_preserving_variants,
+};
+use crate::enforce_sorting::{
+    PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, 
ensure_sorting,
+    parallelize_sorts, replace_with_partial_sort,
+};
+
+/// Optimizer rule that enforces both distribution and sorting requirements.
+///
+/// This rule combines the functionality of `EnforceDistribution` and
+/// `EnforceSorting` into a coordinated sequence where distribution is
+/// always settled before sorting for each operator, preventing the
+/// non-idempotent interactions between the two separate rules.
+///
+/// See [module level documentation](self) for more details.
+#[derive(Default, Debug)]
+pub struct EnsureRequirements {}
+
+impl EnsureRequirements {
+    /// Create a new `EnsureRequirements` optimizer rule.
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for EnsureRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Phase 1: Join key reordering (top-down, from EnforceDistribution)
+        use crate::enforce_distribution::{
+            PlanWithKeyRequirements, adjust_input_keys_ordering,
+        };
+        let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
+        let plan = if top_down_join_key_reordering {
+            let ctx = PlanWithKeyRequirements::new_default(plan);
+            ctx.transform_down(adjust_input_keys_ordering).data()?.plan
+        } else {
+            use crate::enforce_distribution::reorder_join_keys_to_inputs;
+            plan.transform_up(|p| 
Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?)))
+                .data()?
+        };
+
+        // Phase 2: Combined distribution + sorting enforcement (single 
bottom-up pass)

Review Comment:
   Fair point — tightened the module doc in 97728adb7 to drop the "single pass" 
framing and break down what each phase actually does. Phase 2 is the one 
combined distribution+sorting bottom-up; the join-key reorder, 
parallelize_sorts, replace_with_order_preserving_variants, pushdown_sorts, 
replace_with_partial_sort are still separate traversals. Agree consolidating is 
a good follow-up, but i tried something before, it's pretty hard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to