ozankabak commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1056439006


##########
datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs:
##########
@@ -0,0 +1,887 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! RemoveUnnecessarySorts optimizer rule inspects SortExec's in the given
+//! physical plan and removes the ones it can prove unnecessary. The rule can
+//! work on valid *and* invalid physical plans with respect to sorting
+//! requirements, but always produces a valid physical plan in this sense.
+//!
+//! A non-realistic but easy to follow example: Assume that we somehow get the 
fragment
+//! "SortExec: [nullable_col@0 ASC]",
+//! "  SortExec: [non_nullable_col@1 ASC]",
+//! in the physical plan. The first sort is unnecessary since its result is 
overwritten
+//! by another SortExec. Therefore, this rule removes it from the physical 
plan.
+use crate::error::Result;
+use crate::physical_optimizer::utils::{
+    add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete,
+};
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::windows::WindowAggExec;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use crate::prelude::SessionConfig;
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{reverse_sort_options, DataFusionError};
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+use itertools::izip;
+use std::iter::zip;
+use std::sync::Arc;
+
+/// This rule inspects SortExec's in the given physical plan and removes the
+/// ones it can prove unnecessary.
+#[derive(Default)]
+pub struct RemoveUnnecessarySorts {}
+
+impl RemoveUnnecessarySorts {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// This is a "data class" we use within the [RemoveUnnecessarySorts] rule
+/// that tracks the closest `SortExec` descendant for every child of a plan.
+#[derive(Debug, Clone)]
+struct PlanWithCorrespondingSort {
+    plan: Arc<dyn ExecutionPlan>,
+    // For every child, keep a vector of `ExecutionPlan`s starting from the
+    // closest `SortExec` till the current plan. The first index of the tuple 
is
+    // the child index of the plan -- we need this information as we make 
updates.
+    sort_onwards: Vec<Vec<(usize, Arc<dyn ExecutionPlan>)>>,
+}
+
+impl PlanWithCorrespondingSort {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PlanWithCorrespondingSort {
+            plan,
+            sort_onwards: vec![vec![]; length],
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithCorrespondingSort> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| PlanWithCorrespondingSort::new(child))
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithCorrespondingSort {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_requirements = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            let children_plans = children_requirements
+                .iter()
+                .map(|elem| elem.plan.clone())
+                .collect::<Vec<_>>();
+            let sort_onwards = children_requirements
+                .iter()
+                .map(|item| {
+                    if item.sort_onwards.is_empty() {
+                        vec![]
+                    } else {
+                        // TODO: When `maintains_input_order` returns 
Vec<bool>,
+                        //       pass the order-enforcing sort upwards.
+                        item.sort_onwards[0].clone()
+                    }
+                })
+                .collect::<Vec<_>>();
+            let plan = with_new_children_if_necessary(self.plan, 
children_plans)?;
+            Ok(PlanWithCorrespondingSort { plan, sort_onwards })
+        }
+    }
+}
+
+impl PhysicalOptimizerRule for RemoveUnnecessarySorts {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &SessionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Execute a post-order traversal to adjust input key ordering:
+        let plan_requirements = PlanWithCorrespondingSort::new(plan);
+        let adjusted = 
plan_requirements.transform_up(&remove_unnecessary_sorts)?;
+        Ok(adjusted.plan)
+    }
+
+    fn name(&self) -> &str {
+        "RemoveUnnecessarySorts"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn remove_unnecessary_sorts(
+    requirements: PlanWithCorrespondingSort,
+) -> Result<Option<PlanWithCorrespondingSort>> {
+    // Perform naive analysis at the beginning -- remove already-satisfied 
sorts:
+    if let Some(result) = analyze_immediate_sort_removal(&requirements)? {
+        return Ok(Some(result));
+    }
+    let plan = &requirements.plan;
+    let mut new_children = plan.children().clone();
+    let mut new_onwards = requirements.sort_onwards.clone();
+    for (idx, (child, sort_onwards, required_ordering)) in izip!(
+        new_children.iter_mut(),
+        new_onwards.iter_mut(),
+        plan.required_input_ordering()
+    )
+    .enumerate()
+    {
+        let physical_ordering = child.output_ordering();
+        match (required_ordering, physical_ordering) {
+            (Some(required_ordering), Some(physical_ordering)) => {
+                let is_ordering_satisfied = ordering_satisfy_concrete(
+                    physical_ordering,
+                    required_ordering,
+                    || child.equivalence_properties(),
+                );
+                if !is_ordering_satisfied {
+                    // Make sure we preserve the ordering requirements:
+                    update_child_to_remove_unnecessary_sort(child, 
sort_onwards)?;
+                    let sort_expr = required_ordering.to_vec();
+                    *child = add_sort_above_child(child, sort_expr)?;

Review Comment:
   One thing we can do is to name the rule as `OptimizeSorts` instead of 
`RemoveUnnecessarySorts` so that others do not get thrown off by the sort 
adding code (which runs rarely, but needs to be there to cover corner cases 
@mustafasrepo explains).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to