mustafasrepo commented on code in PR #5290:
URL: https://github.com/apache/arrow-datafusion/pull/5290#discussion_r1112652213


##########
datafusion/core/src/physical_optimizer/sort_enforcement2.rs:
##########
@@ -0,0 +1,2872 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this 
sense.
+//!
+//! A non-realistic but easy to follow example for sort removals: Assume that 
we
+//! somehow get the fragment
+//!
+//! ```text
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
+//! ```
+//!
+//! in the physical plan. The child sort is unnecessary since its result is 
overwritten
+//! by the parent SortExec. Therefore, this rule removes it from the physical 
plan.
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::execution::context::TaskContext;
+use crate::physical_optimizer::utils::add_sort_above;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use crate::physical_plan::filter::FilterExec;
+use crate::physical_plan::joins::utils::JoinSide;
+use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::union::UnionExec;
+use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
+use crate::physical_plan::{
+    with_new_children_if_necessary, DisplayFormatType, Distribution, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream,
+};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError, Statistics};
+use datafusion_expr::JoinType;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::{
+    create_sort_expr_from_requirement, map_requirement_before_projection,
+    ordering_satisfy, ordering_satisfy_requirement, requirements_compatible,
+};
+use datafusion_physical_expr::window::WindowExpr;
+use datafusion_physical_expr::{
+    new_sort_requirements, EquivalenceProperties, PhysicalExpr, 
PhysicalSortExpr,
+    PhysicalSortRequirements,
+};
+use itertools::izip;
+use std::any::Any;
+use std::ops::Deref;
+use std::sync::Arc;
+
+/// This rule implements a Top-Down approach to inspects SortExec's in the 
given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct TopDownEnforceSorting {}
+
+impl TopDownEnforceSorting {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [TopDownEnforceSorting] rule
+#[derive(Debug, Clone)]
+struct PlanWithSortRequirements {
+    /// Current plan
+    plan: Arc<dyn ExecutionPlan>,
+    /// Whether the plan could impact the final result ordering
+    impact_result_ordering: bool,
+    /// Parent has the SinglePartition requirement to children
+    satisfy_single_distribution: bool,
+    /// Parent required sort ordering
+    required_ordering: Option<Vec<PhysicalSortRequirements>>,
+    /// The adjusted request sort ordering to children.
+    /// By default they are the same as the plan's required input ordering, 
but can be adjusted based on parent required sort ordering properties.
+    adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirements>>>,
+}
+
+impl PlanWithSortRequirements {
+    pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let impact_result_ordering = plan.output_ordering().is_some()
+            || plan.output_partitioning().partition_count() <= 1
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn new_without_impact_result_ordering(plan: Arc<dyn ExecutionPlan>) -> 
Self {
+        let request_ordering = plan.required_input_ordering();
+        PlanWithSortRequirements {
+            plan,
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: request_ordering,
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithSortRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.adjusted_request_ordering.len());
+
+        izip!(
+            plan_children.into_iter(),
+            self.adjusted_request_ordering.clone().into_iter(),
+            self.plan.maintains_input_order().into_iter(),
+            self.plan.required_input_distribution().into_iter(),
+        )
+        .map(
+            |(child, from_parent, maintains_input_order, required_dist)| {
+                let child_satisfy_single_distribution =
+                    matches!(required_dist, Distribution::SinglePartition)
+                        || (self.satisfy_single_distribution
+                            && self
+                                .plan
+                                .as_any()
+                                .downcast_ref::<CoalescePartitionsExec>()
+                                .is_none());
+                let child_impact_result_ordering = if self
+                    .plan
+                    .as_any()
+                    .downcast_ref::<GlobalLimitExec>()
+                    .is_some()
+                    || self
+                        .plan
+                        .as_any()
+                        .downcast_ref::<LocalLimitExec>()
+                        .is_some()
+                {
+                    true
+                } else {
+                    maintains_input_order && self.impact_result_ordering
+                };
+                let child_request_ordering = child.required_input_ordering();
+                PlanWithSortRequirements {
+                    plan: child,
+                    impact_result_ordering: child_impact_result_ordering,
+                    satisfy_single_distribution: 
child_satisfy_single_distribution,
+                    required_ordering: from_parent,
+                    adjusted_request_ordering: child_request_ordering,
+                }
+            },
+        )
+        .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithSortRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let new_children = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+
+            let children_plans = new_children
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, 
children_plans)?;
+            Ok(PlanWithSortRequirements {
+                plan,
+                impact_result_ordering: self.impact_result_ordering,
+                satisfy_single_distribution: self.satisfy_single_distribution,
+                required_ordering: self.required_ordering,
+                adjusted_request_ordering: self.adjusted_request_ordering,
+            })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for TopDownEnforceSorting {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a Top-Down process(Preorder Traversal) to ensure the sort 
requirements:
+        let plan_requirements = PlanWithSortRequirements::init(plan);
+        let adjusted = plan_requirements.transform_down(&ensure_sorting)?;
+        // Execute a Top-Down process(Preorder Traversal) to remove all the 
unnecessary Sort
+        let adjusted_plan = adjusted.plan.transform_down(&|plan| {
+            if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+                if ordering_satisfy(
+                    sort_exec.input().output_ordering(),
+                    sort_exec.output_ordering(),
+                    || sort_exec.input().equivalence_properties(),
+                ) {
+                    Ok(Some(Arc::new(TombStoneExec::new(
+                        sort_exec.input().clone(),
+                    ))))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                Ok(None)
+            }
+        })?;
+        // Remove the TombStoneExec
+        let final_plan = adjusted_plan.transform_up(&|plan| {
+            if let Some(tombstone_exec) = 
plan.as_any().downcast_ref::<TombStoneExec>() {
+                Ok(Some(tombstone_exec.input.clone()))
+            } else {
+                Ok(None)
+            }
+        })?;
+        Ok(final_plan)
+    }
+
+    fn name(&self) -> &str {
+        "TopDownEnforceSorting"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn ensure_sorting(
+    requirements: PlanWithSortRequirements,
+) -> Result<Option<PlanWithSortRequirements>> {
+    if let Some(sort_exec) = 
requirements.plan.as_any().downcast_ref::<SortExec>() {
+        // Remove unnecessary SortExec(local/global)
+        if let Some(result) = analyze_immediate_sort_removal(&requirements, 
sort_exec) {
+            return Ok(Some(result));
+        }
+    } else if let Some(sort_pres_exec) = requirements
+        .plan
+        .as_any()
+        .downcast_ref::<SortPreservingMergeExec>()
+    {
+        // SortPreservingMergeExec + SortExec(local/global) is the same as the 
global SortExec
+        // Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
+        if let Some(child_sort_exec) =
+            sort_pres_exec.input().as_any().downcast_ref::<SortExec>()
+        {
+            if sort_pres_exec.expr() == child_sort_exec.expr() {
+                if let Some(result) =
+                    analyze_immediate_sort_removal(&requirements, 
child_sort_exec)
+                {
+                    return Ok(Some(result));
+                }
+            }
+        } else if !requirements.satisfy_single_distribution
+            || sort_pres_exec
+                .input()
+                .output_partitioning()
+                .partition_count()
+                <= 1
+        {
+            if let Some(result) =
+                analyze_immediate_spm_removal(&requirements, sort_pres_exec)
+            {
+                return Ok(Some(result));
+            }
+        }
+    }
+    let plan = &requirements.plan;
+    let parent_required = requirements.required_ordering.as_deref();
+    if ordering_satisfy_requirement(plan.output_ordering(), parent_required, 
|| {
+        plan.equivalence_properties()
+    }) {
+        // Can satisfy the parent requirements, change the 
adjusted_request_ordering for UnionExec and WindowAggExec(BoundedWindowAggExec)
+        if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+            // UnionExec does not have real sort requirements for its input. 
Here we change the adjusted_request_ordering to UnionExec's output ordering and
+            // propagate the sort requirements down to correct the unnecessary 
descendant SortExec under the UnionExec
+            let adjusted = new_sort_requirements(union_exec.output_ordering());
+            return Ok(Some(PlanWithSortRequirements {
+                required_ordering: None,
+                adjusted_request_ordering: vec![
+                    adjusted;
+                    requirements
+                        .adjusted_request_ordering
+                        .len()
+                ],
+                ..requirements
+            }));
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            // WindowAggExec(BoundedWindowAggExec) might reverse their sort 
requirements
+            let request_child = 
requirements.adjusted_request_ordering[0].as_deref();
+            let reversed_request_child = 
reverse_window_sort_requirements(request_child);
+
+            if should_reverse_window_sort_requirements(
+                plan.clone(),
+                request_child,
+                reversed_request_child.as_deref(),
+            ) {
+                let WindowExecInfo {
+                    window_expr,
+                    input_schema,
+                    partition_keys,
+                } = extract_window_info_from_plan(plan).unwrap();
+
+                let new_window_expr = window_expr
+                    .iter()
+                    .map(|e| e.get_reverse_expr())
+                    .collect::<Option<Vec<_>>>();
+                let new_physical_ordering = create_sort_expr_from_requirement(
+                    reversed_request_child.clone().unwrap().as_ref(),
+                );
+                if let Some(window_expr) = new_window_expr {
+                    let uses_bounded_memory =
+                        window_expr.iter().all(|e| e.uses_bounded_memory());
+                    // If all window expressions can run with bounded memory, 
choose the
+                    // bounded window variant:
+                    let new_plan = if uses_bounded_memory {
+                        Arc::new(BoundedWindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    } else {
+                        Arc::new(WindowAggExec::try_new(
+                            window_expr,
+                            plan.children()[0].clone(),
+                            input_schema,
+                            partition_keys,
+                            Some(new_physical_ordering),
+                        )?) as Arc<dyn ExecutionPlan>
+                    };
+                    return Ok(Some(PlanWithSortRequirements {
+                        plan: new_plan,
+                        impact_result_ordering: false,
+                        satisfy_single_distribution: requirements
+                            .satisfy_single_distribution,
+                        required_ordering: None,
+                        adjusted_request_ordering: 
vec![reversed_request_child],
+                    }));
+                }
+            }
+        }
+        Ok(Some(PlanWithSortRequirements {
+            required_ordering: None,
+            ..requirements
+        }))
+    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        // If the current plan is a SortExec, modify current SortExec to 
satisfy the parent requirements
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let mut new_plan = sort_exec.input.clone();
+        add_sort_above(&mut new_plan, parent_required_expr)?;
+        Ok(Some(
+            
PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+        ))
+    } else {
+        // Can not satisfy the parent requirements, check whether the 
requirements can be pushed down. If not, add new SortExec.
+        let parent_required_expr =
+            create_sort_expr_from_requirement(parent_required.unwrap());
+        let maintains_input_order = plan.maintains_input_order();
+        // If the current plan is a leaf node or can not maintain any of the 
input ordering, can not pushed down requirements.
+        // For RepartitionExec, we always choose to not push down the sort 
requirements even the RepartitionExec(input_partition=1) could maintain input 
ordering.
+        // For UnionExec, we can always push down
+        if (maintains_input_order.is_empty()
+            || !maintains_input_order.iter().any(|o| *o)
+            || plan.as_any().downcast_ref::<RepartitionExec>().is_some()
+            || plan.as_any().downcast_ref::<FilterExec>().is_some()
+            || plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
+            || plan.as_any().downcast_ref::<LocalLimitExec>().is_some())
+            && plan.as_any().downcast_ref::<UnionExec>().is_none()
+        {
+            let mut new_plan = plan.clone();
+            add_sort_above(&mut new_plan, parent_required_expr)?;
+            Ok(Some(
+                
PlanWithSortRequirements::new_without_impact_result_ordering(new_plan),
+            ))
+        } else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
+            || plan
+                .as_any()
+                .downcast_ref::<BoundedWindowAggExec>()
+                .is_some()
+        {
+            let request_child = 
requirements.adjusted_request_ordering[0].as_deref();
+            let child_plan = plan.children()[0].clone();
+            match determine_children_requirement(
+                parent_required,
+                request_child,
+                child_plan,
+            ) {
+                RequirementsCompatibility::Satisfy => Ok(None),
+                RequirementsCompatibility::Compatible(adjusted) => {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![adjusted],
+                        ..requirements
+                    }))
+                }
+                RequirementsCompatibility::NonCompatible => {
+                    let WindowExecInfo {
+                        window_expr,
+                        input_schema,
+                        partition_keys,
+                    } = extract_window_info_from_plan(plan).unwrap();
+                    if should_reverse_window_exec(
+                        parent_required,
+                        request_child,
+                        &input_schema,
+                    ) {
+                        let new_physical_ordering = 
parent_required_expr.to_vec();
+                        let new_window_expr = window_expr
+                            .iter()
+                            .map(|e| e.get_reverse_expr())
+                            .collect::<Option<Vec<_>>>();
+                        if let Some(window_expr) = new_window_expr {
+                            let uses_bounded_memory =
+                                window_expr.iter().all(|e| 
e.uses_bounded_memory());
+                            let new_plan = if uses_bounded_memory {
+                                Arc::new(BoundedWindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            } else {
+                                Arc::new(WindowAggExec::try_new(
+                                    window_expr,
+                                    plan.children()[0].clone(),
+                                    input_schema,
+                                    partition_keys,
+                                    Some(new_physical_ordering),
+                                )?)
+                                    as Arc<dyn ExecutionPlan>
+                            };
+                            let adjusted_request_ordering =
+                                new_plan.required_input_ordering();
+                            return Ok(Some(PlanWithSortRequirements {
+                                plan: new_plan,
+                                impact_result_ordering: false,
+                                satisfy_single_distribution: requirements
+                                    .satisfy_single_distribution,
+                                required_ordering: None,
+                                adjusted_request_ordering,
+                            }));
+                        }
+                    }
+                    // Can not push down requirements, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        
PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if let Some(smj) = 
plan.as_any().downcast_ref::<SortMergeJoinExec>() {
+            // If the current plan is SortMergeJoinExec
+            let left_columns_len = smj.left.schema().fields().len();
+            let expr_source_side =
+                expr_source_sides(&parent_required_expr, smj.join_type, 
left_columns_len);
+            match expr_source_side {
+                Some(JoinSide::Left) if maintains_input_order[0] => {
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        parent_required,
+                        parent_required_expr,
+                        JoinSide::Left,
+                    )
+                }
+                Some(JoinSide::Right) if maintains_input_order[1] => {
+                    let new_right_required = match smj.join_type {
+                        JoinType::Inner | JoinType::Right => 
shift_right_required(
+                            parent_required.unwrap(),
+                            left_columns_len,
+                        )?,
+                        JoinType::RightSemi | JoinType::RightAnti => {
+                            parent_required.unwrap().to_vec()
+                        }
+                        _ => Err(DataFusionError::Plan(
+                            "Unexpected SortMergeJoin type here".to_string(),
+                        ))?,
+                    };
+                    try_pushdown_requirements_to_join(
+                        &requirements,
+                        Some(new_right_required.deref()),
+                        parent_required_expr,
+                        JoinSide::Right,
+                    )
+                }
+                _ => {
+                    // Can not decide the expr side for SortMergeJoinExec, can 
not push down, add SortExec;
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        
PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            }
+        } else if plan.required_input_ordering().iter().any(Option::is_some) {
+            let plan_children = plan.children();
+            let compatible_with_children = izip!(
+                maintains_input_order.iter(),
+                plan.required_input_ordering().into_iter(),
+                plan_children.iter()
+            )
+            .map(|(can_push_down, request_child, child)| {
+                if *can_push_down {
+                    determine_children_requirement(
+                        parent_required,
+                        request_child.as_deref(),
+                        child.clone(),
+                    )
+                } else {
+                    RequirementsCompatibility::NonCompatible
+                }
+            })
+            .collect::<Vec<_>>();
+            if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Satisfy))
+            {
+                // Requirements are satisfied, not need to push down.
+                Ok(None)
+            } else if compatible_with_children
+                .iter()
+                .all(|a| matches!(a, RequirementsCompatibility::Compatible(_)))
+            {
+                // Adjust child requirements and push down the requirements
+                let adjusted = parent_required.map(|r| r.to_vec());
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![adjusted; 
plan_children.len()],
+                    ..requirements
+                }))
+            } else {
+                // Can not push down, add new SortExec
+                let mut new_plan = plan.clone();
+                add_sort_above(&mut new_plan, parent_required_expr)?;
+                Ok(Some(
+                    
PlanWithSortRequirements::new_without_impact_result_ordering(
+                        new_plan,
+                    ),
+                ))
+            }
+        } else {
+            // The current plan does not have its own ordering requirements to 
its children, consider push down the requirements
+            if let Some(ProjectionExec { expr, .. }) =
+                plan.as_any().downcast_ref::<ProjectionExec>()
+            {
+                // For Projection, we need to transform the requirements to 
the columns before the Projection
+                // And then to push down the requirements
+                let new_adjusted =
+                    map_requirement_before_projection(parent_required, expr);
+                if new_adjusted.is_some() {
+                    Ok(Some(PlanWithSortRequirements {
+                        required_ordering: None,
+                        adjusted_request_ordering: vec![new_adjusted],
+                        ..requirements
+                    }))
+                } else {
+                    // Can not push down, add new SortExec
+                    let mut new_plan = plan.clone();
+                    add_sort_above(&mut new_plan, parent_required_expr)?;
+                    Ok(Some(
+                        
PlanWithSortRequirements::new_without_impact_result_ordering(
+                            new_plan,
+                        ),
+                    ))
+                }
+            } else {
+                Ok(Some(PlanWithSortRequirements {
+                    required_ordering: None,
+                    adjusted_request_ordering: vec![
+                        requirements.required_ordering;
+                        requirements
+                            .adjusted_request_ordering
+                            .len()
+                    ],
+                    ..requirements
+                }))
+            }
+        }
+    }
+}
+
+/// Analyzes a given `Sort` (`plan`) to determine whether the Sort can be 
removed:
+/// 1) The input already has a finer ordering than this `Sort` enforces.
+/// 2) The `Sort` does not impact the final result ordering.
+fn analyze_immediate_sort_removal(
+    requirements: &PlanWithSortRequirements,
+    sort_exec: &SortExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        sort_exec.input().output_ordering(),
+        sort_exec.output_ordering(),
+        || sort_exec.input().equivalence_properties(),
+    ) {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+            impact_result_ordering: requirements.impact_result_ordering,
+            satisfy_single_distribution: 
requirements.satisfy_single_distribution,
+            required_ordering: None,
+            adjusted_request_ordering: 
vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortExec
+    else if !requirements.impact_result_ordering {
+        if requirements.satisfy_single_distribution
+            && sort_exec.input().output_partitioning().partition_count() > 1
+        {
+            Some(PlanWithSortRequirements {
+                plan: 
Arc::new(CoalescePartitionsExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: 
vec![requirements.required_ordering.clone()],
+            })
+        } else {
+            Some(PlanWithSortRequirements {
+                plan: Arc::new(TombStoneExec::new(sort_exec.input().clone())),
+                impact_result_ordering: false,
+                satisfy_single_distribution: false,
+                required_ordering: None,
+                adjusted_request_ordering: 
vec![requirements.required_ordering.clone()],
+            })
+        }
+    } else {
+        None
+    }
+}
+
+/// Analyzes a given `SortPreservingMergeExec` (`plan`) to determine whether 
the SortPreservingMergeExec can be removed:
+/// 1) The input already has a finer ordering than this 
`SortPreservingMergeExec` enforces.
+/// 2) The `SortPreservingMergeExec` does not impact the final result ordering.
+fn analyze_immediate_spm_removal(
+    requirements: &PlanWithSortRequirements,
+    spm_exec: &SortPreservingMergeExec,
+) -> Option<PlanWithSortRequirements> {
+    if ordering_satisfy(
+        spm_exec.input().output_ordering(),
+        Some(spm_exec.expr()),
+        || spm_exec.input().equivalence_properties(),
+    ) && spm_exec.input().output_partitioning().partition_count() <= 1
+    {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: true,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: 
vec![requirements.required_ordering.clone()],
+        })
+    }
+    // Remove unnecessary SortPreservingMergeExec only
+    else if !requirements.impact_result_ordering {
+        Some(PlanWithSortRequirements {
+            plan: Arc::new(TombStoneExec::new(spm_exec.input().clone())),
+            impact_result_ordering: false,
+            satisfy_single_distribution: false,
+            required_ordering: None,
+            adjusted_request_ordering: 
vec![requirements.required_ordering.clone()],
+        })
+    } else {
+        None
+    }
+}
+
+/// Determine the children requirements
+/// If the children requirements are more specific, do not push down the 
parent requirements
+/// If the the parent requirements are more specific, push down the parent 
requirements
+/// If they are not compatible, need to add Sort.
+fn determine_children_requirement(
+    parent_required: Option<&[PhysicalSortRequirements]>,
+    request_child: Option<&[PhysicalSortRequirements]>,
+    child_plan: Arc<dyn ExecutionPlan>,
+) -> RequirementsCompatibility {
+    if requirements_compatible(request_child, parent_required, || {
+        child_plan.equivalence_properties()
+    }) {
+        // request child requirements are more specific, no need to push down 
the parent requirements
+        RequirementsCompatibility::Satisfy
+    } else if requirements_compatible(parent_required, request_child, || {
+        child_plan.equivalence_properties()
+    }) {
+        // parent requirements are more specific, adjust the request child 
requirements and push down the new requirements
+        let adjusted = parent_required.map(|r| r.to_vec());
+        RequirementsCompatibility::Compatible(adjusted)
+    } else {
+        RequirementsCompatibility::NonCompatible
+    }
+}
+
+/// Compares window expression's `window_request` and `parent_required_expr` 
ordering, returns
+/// whether we should reverse the window expression's ordering in order to 
meet parent's requirements.
+fn check_alignment(
+    input_schema: &SchemaRef,
+    window_request: &PhysicalSortRequirements,
+    parent_required_expr: &PhysicalSortRequirements,
+) -> bool {
+    if parent_required_expr.expr.eq(&window_request.expr)
+        && window_request.sort_options.is_some()
+        && parent_required_expr.sort_options.is_some()
+    {
+        let nullable = 
parent_required_expr.expr.nullable(input_schema).unwrap();
+        let window_request_opts = window_request.sort_options.unwrap();
+        let parent_required_opts = parent_required_expr.sort_options.unwrap();
+        if nullable {
+            window_request_opts == reverse_sort_options(parent_required_opts)
+        } else {
+            // If the column is not nullable, NULLS FIRST/LAST is not 
important.
+            window_request_opts.descending != parent_required_opts.descending
+        }
+    } else {
+        false
+    }
+}
+
+fn reverse_window_sort_requirements(
+    request_child: Option<&[PhysicalSortRequirements]>,
+) -> Option<Vec<PhysicalSortRequirements>> {
+    request_child.map(|request| {
+        request
+            .iter()
+            .map(|req| match req.sort_options {
+                None => req.clone(),
+                Some(ops) => PhysicalSortRequirements {
+                    expr: req.expr.clone(),
+                    sort_options: Some(reverse_sort_options(ops)),
+                },
+            })
+            .collect::<Vec<_>>()
+    })
+}
+
+/// Whether to reverse the top WindowExec's sort requirements.
+/// Considering the requirements of the descendants WindowExecs and leaf 
nodes' output ordering.
+/// TODO!considering all the cases
+fn should_reverse_window_sort_requirements(
+    window_plan: Arc<dyn ExecutionPlan>,
+    top_requirement: Option<&[PhysicalSortRequirements]>,
+    top_reversed_requirement: Option<&[PhysicalSortRequirements]>,
+) -> bool {
+    if top_requirement.is_none() {
+        return false;
+    }
+    let WindowExecInfo { window_expr, .. } =
+        extract_window_info_from_plan(&window_plan).unwrap();
+    let reverse_window_expr = window_expr
+        .iter()
+        .map(|e| e.get_reverse_expr())
+        .collect::<Option<Vec<_>>>();
+    if reverse_window_expr.is_none() {
+        return false;
+    }
+    let flags = window_plan
+        .children()
+        .into_iter()
+        .map(|child| {
+            // If the child is leaf node, check the output ordering
+            if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                false
+            } else if child.children().is_empty()
+                && ordering_satisfy_requirement(
+                    child.output_ordering(),
+                    top_reversed_requirement,
+                    || child.equivalence_properties(),
+                )
+            {
+                true
+            } else if child.as_any().downcast_ref::<WindowAggExec>().is_some()
+                || child
+                    .as_any()
+                    .downcast_ref::<BoundedWindowAggExec>()
+                    .is_some()
+            {
+                // If the child is WindowExec, check the child requirements
+                if requirements_compatible(
+                    top_requirement,
+                    child.required_input_ordering()[0].as_deref(),
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(
+                    child.required_input_ordering()[0].as_deref(),
+                    top_requirement,
+                    || child.equivalence_properties(),
+                ) || requirements_compatible(

Review Comment:
   This pattern is used a lot. I think you can construct a util to check 
whether one of the requirements satisfy other (where function does this check). 
That would be better I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to