This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ceff6cb44 Orthogonalize distribution and sort enforcement rules into 
`EnforceDistribution` and `EnforceSorting` (#4839)
ceff6cb44 is described below

commit ceff6cb44ad621f89c9c4e9c0bd34cb204246910
Author: Mustafa akur <106137913+mustafasr...@users.noreply.github.com>
AuthorDate: Tue Jan 10 01:10:43 2023 +0300

    Orthogonalize distribution and sort enforcement rules into 
`EnforceDistribution` and `EnforceSorting` (#4839)
    
    * Separate sort rule
    
    * Migrate to clearer file name, tidy up comments
    
    * Add a note about tests verifying EnforceDistribution/EnforceSorting 
jointly
    
    * Address review, fix the stale comment
    
    Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com>
---
 datafusion/core/src/execution/context.rs           | 39 ++++++------
 .../{enforcement.rs => dist_enforcement.rs}        | 73 +++++++++-------------
 datafusion/core/src/physical_optimizer/mod.rs      |  4 +-
 .../core/src/physical_optimizer/repartition.rs     | 10 ++-
 .../{optimize_sorts.rs => sort_enforcement.rs}     | 41 ++++++------
 datafusion/core/src/physical_plan/limit.rs         |  4 ++
 datafusion/expr/src/logical_plan/builder.rs        |  3 +-
 7 files changed, 87 insertions(+), 87 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 2687902a3..98fe6ff79 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -68,7 +68,7 @@ use crate::physical_optimizer::repartition::Repartition;
 
 use crate::config::ConfigOptions;
 use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
-use crate::physical_optimizer::enforcement::BasicEnforcement;
+use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
 use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, 
plan_to_parquet};
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
 use crate::physical_plan::udaf::AggregateUDF;
@@ -91,9 +91,9 @@ use crate::catalog::listing_schema::ListingSchemaProvider;
 use crate::datasource::object_store::ObjectStoreUrl;
 use crate::execution::memory_pool::MemoryPool;
 use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
-use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
 use crate::physical_optimizer::pipeline_checker::PipelineChecker;
 use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
+use crate::physical_optimizer::sort_enforcement::EnforceSorting;
 use datafusion_optimizer::OptimizerConfig;
 use datafusion_sql::planner::object_name_to_table_reference;
 use uuid::Uuid;
@@ -1448,37 +1448,36 @@ impl SessionState {
             // output partitioning of some operators in the plan tree, which 
will influence
             // other rules. Therefore, it should run as soon as possible. It 
is optional because:
             // - It's not used for the distributed engine, Ballista.
-            // - It's conflicted with some parts of the BasicEnforcement, 
since it will
-            //   introduce additional repartitioning while the 
BasicEnforcement aims at
-            //   reducing unnecessary repartitioning.
+            // - It's conflicted with some parts of the EnforceDistribution, 
since it will
+            //   introduce additional repartitioning while EnforceDistribution 
aims to
+            //   reduce unnecessary repartitioning.
             Arc::new(Repartition::new()),
             // - Currently it will depend on the partition number to decide 
whether to change the
             // single node sort to parallel local sort and merge. Therefore, 
GlobalSortSelection
             // should run after the Repartition.
             // - Since it will change the output ordering of some operators, 
it should run
-            // before JoinSelection and BasicEnforcement, which may depend on 
that.
+            // before JoinSelection and EnforceSorting, which may depend on 
that.
             Arc::new(GlobalSortSelection::new()),
             // Statistics-based join selection will change the Auto mode to a 
real join implementation,
-            // like collect left, or hash join, or future sort merge join, 
which will
-            // influence the BasicEnforcement to decide whether to add 
additional repartition
-            // and local sort to meet the distribution and ordering 
requirements.
-            // Therefore, it should run before BasicEnforcement.
+            // like collect left, or hash join, or future sort merge join, 
which will influence the
+            // EnforceDistribution and EnforceSorting rules as they decide 
whether to add additional
+            // repartitioning and local sorting steps to meet distribution and 
ordering requirements.
+            // Therefore, it should run before EnforceDistribution and 
EnforceSorting.
             Arc::new(JoinSelection::new()),
             // If the query is processing infinite inputs, the PipelineFixer 
rule applies the
             // necessary transformations to make the query runnable (if it is 
not already runnable).
             // If the query can not be made runnable, the rule emits an error 
with a diagnostic message.
             // Since the transformations it applies may alter output 
partitioning properties of operators
-            // (e.g. by swapping hash join sides), this rule runs before 
BasicEnforcement.
+            // (e.g. by swapping hash join sides), this rule runs before 
EnforceDistribution.
             Arc::new(PipelineFixer::new()),
-            // BasicEnforcement is for adding essential repartition and local 
sorting operators
-            // to satisfy the required distribution and local sort 
requirements.
-            // Please make sure that the whole plan tree is determined.
-            Arc::new(BasicEnforcement::new()),
-            // The BasicEnforcement stage conservatively inserts sorts to 
satisfy ordering requirements.
-            // However, a deeper analysis may sometimes reveal that such a 
sort is actually unnecessary.
-            // These cases typically arise when we have reversible window 
expressions or deep subqueries.
-            // The rule below performs this analysis and removes unnecessary 
sorts.
-            Arc::new(OptimizeSorts::new()),
+            // The EnforceDistribution rule is for adding essential 
repartition to satisfy the required
+            // distribution. Please make sure that the whole plan tree is 
determined before this rule.
+            Arc::new(EnforceDistribution::new()),
+            // The EnforceSorting rule is for adding essential local sorting 
to satisfy the required
+            // ordering. Please make sure that the whole plan tree is 
determined before this rule.
+            // Note that one should always run this rule after running the 
EnforceDistribution rule
+            // as the latter may break local sorting requirements.
+            Arc::new(EnforceSorting::new()),
             // The CoalesceBatches rule will not influence the distribution 
and ordering of the
             // whole plan tree. Therefore, to avoid influencing other rules, 
it should run last.
             Arc::new(CoalesceBatches::new()),
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs 
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
similarity index 97%
rename from datafusion/core/src/physical_optimizer/enforcement.rs
rename to datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 0b8206a78..aa8b07569 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Enforcement optimizer rules are used to make sure the plan's Distribution 
and Ordering
-//! requirements are met by inserting necessary [[RepartitionExec]] and 
[[SortExec]].
-//!
+//! EnforceDistribution optimizer rule inspects the physical plan with respect
+//! to distribution requirements and adds [RepartitionExec]s to satisfy them
+//! when necessary.
 use crate::config::ConfigOptions;
 use crate::error::Result;
-use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -46,8 +45,8 @@ use datafusion_physical_expr::{
 use std::collections::HashMap;
 use std::sync::Arc;
 
-/// BasicEnforcement rule, it ensures the Distribution and Ordering 
requirements are met
-/// in the strictest way. It might add additional [[RepartitionExec]] to the 
plan tree
+/// The EnforceDistribution rule ensures that distribution requirements are met
+/// in the strictest way. It might add additional [RepartitionExec] to the 
plan tree
 /// and give a non-optimal plan, but it can avoid the possible data skew in 
joins.
 ///
 /// For example for a HashJoin with keys(a, b, c), the required 
Distribution(a, b, c) can be satisfied by
@@ -55,16 +54,16 @@ use std::sync::Arc;
 ///
 /// This rule only chooses the exactly match and satisfies the Distribution(a, 
b, c) by a HashPartition(a, b, c).
 #[derive(Default)]
-pub struct BasicEnforcement {}
+pub struct EnforceDistribution {}
 
-impl BasicEnforcement {
+impl EnforceDistribution {
     #[allow(missing_docs)]
     pub fn new() -> Self {
         Self {}
     }
 }
 
-impl PhysicalOptimizerRule for BasicEnforcement {
+impl PhysicalOptimizerRule for EnforceDistribution {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
@@ -81,7 +80,7 @@ impl PhysicalOptimizerRule for BasicEnforcement {
         } else {
             plan
         };
-        // Distribution and Ordering enforcement need to be applied bottom-up.
+        // Distribution enforcement needs to be applied bottom-up.
         new_plan.transform_up(&{
             |plan| {
                 let adjusted = if !top_down_join_key_reordering {
@@ -89,16 +88,13 @@ impl PhysicalOptimizerRule for BasicEnforcement {
                 } else {
                     plan
                 };
-                Ok(Some(ensure_distribution_and_ordering(
-                    adjusted,
-                    target_partitions,
-                )?))
+                Ok(Some(ensure_distribution(adjusted, target_partitions)?))
             }
         })
     }
 
     fn name(&self) -> &str {
-        "BasicEnforcement"
+        "EnforceDistribution"
     }
 
     fn schema_check(&self) -> bool {
@@ -829,10 +825,11 @@ fn new_join_conditions(
     new_join_on
 }
 
-/// Within this function, it checks whether we need to add additional plan 
operators
-/// of data exchanging and data ordering to satisfy the required distribution 
and ordering.
-/// And we should avoid to manually add plan operators of data exchanging and 
data ordering in other places
-fn ensure_distribution_and_ordering(
+/// This function checks whether we need to add additional data exchange
+/// operators to satisfy distribution requirements. Since this function
+/// takes care of such requirements, we should avoid manually adding data
+/// exchange operators in other places.
+fn ensure_distribution(
     plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
     target_partitions: usize,
 ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
@@ -841,13 +838,11 @@ fn ensure_distribution_and_ordering(
     }
 
     let required_input_distributions = plan.required_input_distribution();
-    let required_input_orderings = plan.required_input_ordering();
     let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
     assert_eq!(children.len(), required_input_distributions.len());
-    assert_eq!(children.len(), required_input_orderings.len());
 
     // Add RepartitionExec to guarantee output partitioning
-    let children = children
+    let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
         .into_iter()
         .zip(required_input_distributions.into_iter())
         .map(|(child, required)| {
@@ -870,24 +865,8 @@ fn ensure_distribution_and_ordering(
                 };
                 new_child
             }
-        });
-
-    // Add local SortExec to guarantee output ordering within each partition
-    let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
-        .zip(required_input_orderings.into_iter())
-        .map(|(child_result, required)| {
-            let child = child_result?;
-            if ordering_satisfy(child.output_ordering(), required, || {
-                child.equivalence_properties()
-            }) {
-                Ok(child)
-            } else {
-                let sort_expr = required.unwrap().to_vec();
-                add_sort_above_child(&child, sort_expr)
-            }
         })
         .collect();
-
     with_new_children_if_necessary(plan, new_children?)
 }
 
@@ -979,6 +958,7 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::physical_optimizer::sort_enforcement::EnforceSorting;
     use crate::physical_plan::aggregates::{
         AggregateExec, AggregateMode, PhysicalGroupBy,
     };
@@ -1136,8 +1116,15 @@ mod tests {
             config.execution.target_partitions = 10;
 
             // run optimizer
-            let optimizer = BasicEnforcement {};
+            let optimizer = EnforceDistribution {};
             let optimized = optimizer.optimize($PLAN, &config)?;
+            // NOTE: These tests verify the joint `EnforceDistribution` + 
`EnforceSorting` cascade
+            //       because they were written prior to the separation of 
`BasicEnforcement` into
+            //       `EnforceSorting` and `EnfoceDistribution`.
+            // TODO: Orthogonalize the tests here just to verify 
`EnforceDistribution` and create
+            //       new tests for the cascade.
+            let optimizer = EnforceSorting {};
+            let optimized = optimizer.optimize(optimized, &config)?;
 
             // Now format correctly
             let plan = displayable(optimized.as_ref()).indent().to_string();
@@ -1656,7 +1643,7 @@ mod tests {
                 Column::new_with_schema("c1", &right.schema()).unwrap(),
             ),
         ];
-        let bottom_left_join = ensure_distribution_and_ordering(
+        let bottom_left_join = ensure_distribution(
             hash_join_exec(left.clone(), right.clone(), &join_on, 
&JoinType::Inner),
             10,
         )?;
@@ -1686,7 +1673,7 @@ mod tests {
                 Column::new_with_schema("a1", &right.schema()).unwrap(),
             ),
         ];
-        let bottom_right_join = ensure_distribution_and_ordering(
+        let bottom_right_join = ensure_distribution(
             hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
             10,
         )?;
@@ -1775,7 +1762,7 @@ mod tests {
                 Column::new_with_schema("b1", &right.schema()).unwrap(),
             ),
         ];
-        let bottom_left_join = ensure_distribution_and_ordering(
+        let bottom_left_join = ensure_distribution(
             hash_join_exec(left.clone(), right.clone(), &join_on, 
&JoinType::Inner),
             10,
         )?;
@@ -1805,7 +1792,7 @@ mod tests {
                 Column::new_with_schema("a1", &right.schema()).unwrap(),
             ),
         ];
-        let bottom_right_join = ensure_distribution_and_ordering(
+        let bottom_right_join = ensure_distribution(
             hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
             10,
         )?;
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index fb07d54b9..3958a546a 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -20,14 +20,14 @@
 
 pub mod aggregate_statistics;
 pub mod coalesce_batches;
-pub mod enforcement;
+pub mod dist_enforcement;
 pub mod global_sort_selection;
 pub mod join_selection;
-pub mod optimize_sorts;
 pub mod optimizer;
 pub mod pipeline_checker;
 pub mod pruning;
 pub mod repartition;
+pub mod sort_enforcement;
 mod utils;
 
 pub mod pipeline_fixer;
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 2044b2aaa..98ca12a9e 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -241,7 +241,8 @@ mod tests {
     use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
-    use crate::physical_optimizer::enforcement::BasicEnforcement;
+    use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
+    use crate::physical_optimizer::sort_enforcement::EnforceSorting;
     use crate::physical_plan::aggregates::{
         AggregateExec, AggregateMode, PhysicalGroupBy,
     };
@@ -370,9 +371,12 @@ mod tests {
             // run optimizer
             let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> 
= vec![
                 Arc::new(Repartition::new()),
-                // The `BasicEnforcement` is an essential rule to be applied.
+                // EnforceDistribution is an essential rule to be applied.
                 // Otherwise, the correctness of the generated optimized plan 
cannot be guaranteed
-                Arc::new(BasicEnforcement::new()),
+                Arc::new(EnforceDistribution::new()),
+                // EnforceSorting is an essential rule to be applied.
+                // Otherwise, the correctness of the generated optimized plan 
cannot be guaranteed
+                Arc::new(EnforceSorting::new()),
             ];
             let optimized = optimizers.into_iter().fold($PLAN, |plan, 
optimizer| {
                 optimizer.optimize(plan, &config).unwrap()
diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
similarity index 96%
rename from datafusion/core/src/physical_optimizer/optimize_sorts.rs
rename to datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 17b27bfa7..52463b4bd 100644
--- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -15,12 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! OptimizeSorts optimizer rule inspects [SortExec]s in the given physical
-//! plan and removes the ones it can prove unnecessary. The rule can work on
-//! valid *and* invalid physical plans with respect to sorting requirements,
-//! but always produces a valid physical plan in this sense.
+//! EnforceSorting optimizer rule inspects the physical plan with respect
+//! to local sorting requirements and does the following:
+//! - Adds a [SortExec] when a requirement is not met,
+//! - Removes an already-existing [SortExec] if it is possible to prove
+//!   that this sort is unnecessary
+//! The rule can work on valid *and* invalid physical plans with respect to
+//! sorting requirements, but always produces a valid physical plan in this 
sense.
 //!
-//! A non-realistic but easy to follow example: Assume that we somehow get the 
fragment
+//! A non-realistic but easy to follow example for sort removals: Assume that 
we
+//! somehow get the fragment
 //! "SortExec: [nullable_col@0 ASC]",
 //! "  SortExec: [non_nullable_col@1 ASC]",
 //! in the physical plan. The first sort is unnecessary since its result is 
overwritten
@@ -46,16 +50,16 @@ use std::sync::Arc;
 /// This rule inspects SortExec's in the given physical plan and removes the
 /// ones it can prove unnecessary.
 #[derive(Default)]
-pub struct OptimizeSorts {}
+pub struct EnforceSorting {}
 
-impl OptimizeSorts {
+impl EnforceSorting {
     #[allow(missing_docs)]
     pub fn new() -> Self {
         Self {}
     }
 }
 
-/// This is a "data class" we use within the [OptimizeSorts] rule that
+/// This is a "data class" we use within the [EnforceSorting] rule that
 /// tracks the closest `SortExec` descendant for every child of a plan.
 #[derive(Debug, Clone)]
 struct PlanWithCorrespondingSort {
@@ -119,7 +123,7 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
     }
 }
 
-impl PhysicalOptimizerRule for OptimizeSorts {
+impl PhysicalOptimizerRule for EnforceSorting {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
@@ -127,12 +131,12 @@ impl PhysicalOptimizerRule for OptimizeSorts {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Execute a post-order traversal to adjust input key ordering:
         let plan_requirements = PlanWithCorrespondingSort::new(plan);
-        let adjusted = plan_requirements.transform_up(&optimize_sorts)?;
+        let adjusted = plan_requirements.transform_up(&ensure_sorting)?;
         Ok(adjusted.plan)
     }
 
     fn name(&self) -> &str {
-        "OptimizeSorts"
+        "EnforceSorting"
     }
 
     fn schema_check(&self) -> bool {
@@ -140,7 +144,7 @@ impl PhysicalOptimizerRule for OptimizeSorts {
     }
 }
 
-fn optimize_sorts(
+fn ensure_sorting(
     requirements: PlanWithCorrespondingSort,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
     // Perform naive analysis at the beginning -- remove already-satisfied 
sorts:
@@ -171,7 +175,8 @@ fn optimize_sorts(
                     let sort_expr = required_ordering.to_vec();
                     *child = add_sort_above_child(child, sort_expr)?;
                     sort_onwards.push((idx, child.clone()))
-                } else if let [first, ..] = sort_onwards.as_slice() {
+                }
+                if let [first, ..] = sort_onwards.as_slice() {
                     // The ordering requirement is met, we can analyze if 
there is an unnecessary sort:
                     let sort_any = first.1.clone();
                     let sort_exec = convert_to_sort_exec(&sort_any)?;
@@ -618,7 +623,7 @@ mod tests {
             "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, 
state.config_options())?;
+            EnforceSorting::new().optimize(physical_plan, 
state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -717,7 +722,7 @@ mod tests {
             "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, 
state.config_options())?;
+            EnforceSorting::new().optimize(physical_plan, 
state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -761,7 +766,7 @@ mod tests {
             "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, 
state.config_options())?;
+            EnforceSorting::new().optimize(physical_plan, 
state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -826,7 +831,7 @@ mod tests {
             "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, 
state.config_options())?;
+            EnforceSorting::new().optimize(physical_plan, 
state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
@@ -886,7 +891,7 @@ mod tests {
             "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
         );
         let optimized_physical_plan =
-            OptimizeSorts::new().optimize(physical_plan, 
state.config_options())?;
+            EnforceSorting::new().optimize(physical_plan, 
state.config_options())?;
         let formatted = displayable(optimized_physical_plan.as_ref())
             .indent()
             .to_string();
diff --git a/datafusion/core/src/physical_plan/limit.rs 
b/datafusion/core/src/physical_plan/limit.rs
index 3a3a4a20b..776fefd8b 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -287,6 +287,10 @@ impl ExecutionPlan for LocalLimitExec {
         self.input.output_ordering()
     }
 
+    fn maintains_input_order(&self) -> bool {
+        true
+    }
+
     fn equivalence_properties(&self) -> EquivalenceProperties {
         self.input.equivalence_properties()
     }
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 428a31f14..d5e5257ea 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -273,7 +273,8 @@ impl LogicalPlanBuilder {
         });
         for (_, exprs) in groups {
             let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
-            // the partition and sort itself is done at physical level, see 
the BasicEnforcement rule
+            // Partition and sorting is done at physical level, see the 
EnforceDistribution
+            // and EnforceSorting rules.
             plan = LogicalPlanBuilder::from(plan)
                 .window(window_exprs)?
                 .build()?;

Reply via email to