This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 5fc91cc8fb Top down `EnforceSorting`, Extended testbench for `EnforceSorting` rule to prepare for refactors, additional functionality such as pushdowns over unions (#5661) 5fc91cc8fb is described below commit 5fc91cc8fbb56b6d2a32e66f8b327a871f7d15ac Author: Mustafa Akur <106137913+mustafasr...@users.noreply.github.com> AuthorDate: Tue Apr 4 17:14:54 2023 +0300 Top down `EnforceSorting`, Extended testbench for `EnforceSorting` rule to prepare for refactors, additional functionality such as pushdowns over unions (#5661) * Top Down Sort Enforer * Add support to optimize parallel sorting * fix UT * add more UTs to sort_enforcement2.rs * refine codebase * Fix SortMergeJoin case * Fix reverse window sort requirements * fix test comments * add determine_children_requirement() method * Convert Topdown to pushdown then unify t * reorganize to decrease diff, remove ignore * Update test * retract dist enforcement * tmp * reorganize files * reorganize tests * simplify sort pushdown * remove global sort print * remove unnecessary parameter * Updates * simplifications * Refactors and simplifications part 1 * Refactors and simplifications part 2 * simplifications * remove_sort_keys parameters from window * Update window multi_path test * consider existing ordering during Coalesce * retract assertion in planner * remove todo. * remove unnecessary repartition from plan * update comments * Remove commented out code * Address reviews * update comments * address reviews --------- Co-authored-by: mingmw...@ebay.com <mingmw...@ebay.com> Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com> --- .../src/physical_optimizer/dist_enforcement.rs | 54 +- .../physical_optimizer/global_sort_selection.rs | 6 +- datafusion/core/src/physical_optimizer/mod.rs | 1 + .../core/src/physical_optimizer/repartition.rs | 45 +- .../src/physical_optimizer/sort_enforcement.rs | 1004 +++++++++++++++----- .../core/src/physical_optimizer/sort_pushdown.rs | 416 ++++++++ datafusion/core/src/physical_optimizer/utils.rs | 43 + .../src/physical_plan/joins/sort_merge_join.rs | 17 +- datafusion/core/src/physical_plan/planner.rs | 1 - .../src/physical_plan/windows/window_agg_exec.rs | 3 - datafusion/physical-expr/src/utils.rs | 134 ++- datafusion/sql/src/statement.rs | 8 +- 12 files changed, 1424 insertions(+), 308 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index ef1b6a576d..d3e99945e9 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -38,11 +38,11 @@ use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::equivalence::EquivalenceProperties; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::expressions::NoOp; +use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr, PhysicalExpr, }; -use std::collections::HashMap; use std::sync::Arc; /// The EnforceDistribution rule ensures that distribution requirements are met @@ -487,30 +487,6 @@ fn reorder_aggregate_keys( } } -fn map_columns_before_projection( - parent_required: &[Arc<dyn PhysicalExpr>], - proj_exprs: &[(Arc<dyn PhysicalExpr>, String)], -) -> Vec<Arc<dyn PhysicalExpr>> { - let mut column_mapping = HashMap::new(); - for (expression, name) in proj_exprs.iter() { - if let Some(column) = expression.as_any().downcast_ref::<Column>() { - column_mapping.insert(name.clone(), column.clone()); - }; - } - let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required - .iter() - .filter_map(|r| { - if let Some(column) = r.as_any().downcast_ref::<Column>() { - column_mapping.get(column.name()) - } else { - None - } - }) - .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>) - .collect::<Vec<_>>(); - new_required -} - fn shift_right_required( parent_required: &[Arc<dyn PhysicalExpr>], left_columns_len: usize, @@ -1026,6 +1002,30 @@ mod tests { )) } + // Created a sorted parquet exec with multiple files + fn parquet_exec_multiple_sorted( + output_ordering: Option<Vec<PhysicalSortExpr>>, + ) -> Arc<ParquetExec> { + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema(), + file_groups: vec![ + vec![PartitionedFile::new("x".to_string(), 100)], + vec![PartitionedFile::new("y".to_string(), 100)], + ], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering, + infinite_source: false, + }, + None, + None, + )) + } + fn projection_exec_with_alias( input: Arc<dyn ExecutionPlan>, alias_pairs: Vec<(String, String)>, @@ -2108,7 +2108,7 @@ mod tests { }]; // Scan some sorted parquet files - let exec = parquet_exec_with_sort(Some(sort_key.clone())); + let exec = parquet_exec_multiple_sorted(Some(sort_key.clone())); // CoalesceBatchesExec to mimic behavior after a filter let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096)); @@ -2121,7 +2121,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", "CoalesceBatchesExec: target_batch_size=4096", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[a@0 ASC], projection=[a, b, c, d, e]", + "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[a@0 ASC], projection=[a, b, c, d, e]", ]; assert_optimized!(expected, exec); Ok(()) diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index 9342298633..e29735e741 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -48,7 +48,7 @@ impl PhysicalOptimizerRule for GlobalSortSelection { fn optimize( &self, plan: Arc<dyn ExecutionPlan>, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result<Arc<dyn ExecutionPlan>> { plan.transform_up(&|plan| { let transformed = @@ -56,10 +56,10 @@ impl PhysicalOptimizerRule for GlobalSortSelection { .downcast_ref::<SortExec>() .and_then(|sort_exec| { if sort_exec.input().output_partitioning().partition_count() > 1 - && sort_exec.fetch().is_some() // It's already preserving the partitioning so that it can be regarded as a local sort && !sort_exec.preserve_partitioning() - { + && (sort_exec.fetch().is_some() || config.optimizer.repartition_sorts) + { let sort = SortExec::new_with_partitioning( sort_exec.expr().to_vec(), sort_exec.input().clone(), diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 3958a546a9..5111e55292 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -28,6 +28,7 @@ pub mod pipeline_checker; pub mod pruning; pub mod repartition; pub mod sort_enforcement; +mod sort_pushdown; mod utils; pub mod pipeline_fixer; diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 71274d177d..6c2d5c9348 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -321,9 +321,6 @@ fn init() { mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortRequirement, - }; use super::*; use crate::datasource::listing::PartitionedFile; @@ -342,6 +339,9 @@ mod tests { use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, DisplayFormatType, Statistics}; + use datafusion_physical_expr::{ + make_sort_requirements_from_exprs, PhysicalSortRequirement, + }; fn schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)])) @@ -412,6 +412,33 @@ mod tests { )) } + // Created a sorted parquet exec with multiple files + fn parquet_exec_multiple_sorted() -> Arc<ParquetExec> { + let sort_exprs = vec![PhysicalSortExpr { + expr: col("c1", &schema()).unwrap(), + options: SortOptions::default(), + }]; + + Arc::new(ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), + file_schema: schema(), + file_groups: vec![ + vec![PartitionedFile::new("x".to_string(), 100)], + vec![PartitionedFile::new("y".to_string(), 100)], + ], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: Some(sort_exprs), + infinite_source: false, + }, + None, + None, + )) + } + fn sort_preserving_merge_exec( input: Arc<dyn ExecutionPlan>, ) -> Arc<dyn ExecutionPlan> { @@ -737,12 +764,12 @@ mod tests { #[test] fn repartition_ignores_sort_preserving_merge() -> Result<()> { // sort preserving merge already sorted input, - let plan = sort_preserving_merge_exec(parquet_exec_sorted()); + let plan = sort_preserving_merge_exec(parquet_exec_multiple_sorted()); // should not repartition / sort (as the data was already sorted) let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; assert_optimized!(expected, plan); @@ -838,13 +865,14 @@ mod tests { #[test] fn repartition_ignores_transitively_with_projection() -> Result<()> { // sorted input - let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted())); + let plan = + sort_preserving_merge_exec(projection_exec(parquet_exec_multiple_sorted())); // data should not be repartitioned / resorted let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", - "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", + "ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; assert_optimized!(expected, plan); @@ -857,7 +885,6 @@ mod tests { sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true)); let expected = &[ - "SortPreservingMergeExec: [c1@0 ASC]", "SortExec: expr=[c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", "ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]", @@ -1040,7 +1067,6 @@ mod tests { // parallelization potentially could break sort order let expected = &[ - "SortPreservingMergeExec: [c1@0 ASC]", "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; @@ -1090,7 +1116,6 @@ mod tests { // data should not be repartitioned / resorted let expected = &[ - "SortPreservingMergeExec: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", "ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]", ]; diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 265c86cdf1..7428c339dc 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -35,13 +35,15 @@ //! by another SortExec. Therefore, this rule removes it from the physical plan. use crate::config::ConfigOptions; use crate::error::Result; -use crate::physical_optimizer::utils::add_sort_above; +use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown}; +use crate::physical_optimizer::utils::{ + add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort, + is_sort_preserving_merge, is_union, is_window, +}; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; use arrow::datatypes::SchemaRef; @@ -104,8 +106,8 @@ impl ExecTree { } } -/// This object is used within the [EnforceSorting] rule to track the closest -/// `SortExec` descendant(s) for every child of a plan. +/// This object is used within the [`EnforceSorting`] rule to track the closest +/// [`SortExec`] descendant(s) for every child of a plan. #[derive(Debug, Clone)] struct PlanWithCorrespondingSort { plan: Arc<dyn ExecutionPlan>, @@ -149,19 +151,11 @@ impl PlanWithCorrespondingSort { return None; } let is_spm = is_sort_preserving_merge(plan); - let is_union = plan.as_any().is::<UnionExec>(); - // If the executor is a `UnionExec`, and it has an output ordering; - // then it at least partially maintains some child's output ordering. - // Therefore, we propagate this information upwards. - let partially_maintains = is_union && plan.output_ordering().is_some(); let required_orderings = plan.required_input_ordering(); let flags = plan.maintains_input_order(); let children = izip!(flags, item.sort_onwards, required_orderings) .filter_map(|(maintains, element, required_ordering)| { - if (required_ordering.is_none() - && (maintains || partially_maintains)) - || is_spm - { + if (required_ordering.is_none() && maintains) || is_spm { element } else { None @@ -227,7 +221,7 @@ impl TreeNode for PlanWithCorrespondingSort { } /// This object is used within the [EnforceSorting] rule to track the closest -/// `CoalescePartitionsExec` descendant(s) for every child of a plan. +/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. #[derive(Debug, Clone)] struct PlanWithCorrespondingCoalescePartitions { plan: Arc<dyn ExecutionPlan>, @@ -235,7 +229,7 @@ struct PlanWithCorrespondingCoalescePartitions { // child until the `CoalescePartitionsExec`(s) -- could be multiple for // n-ary plans like Union -- that affect the output partitioning of the // child. If the child has no connection to any `CoalescePartitionsExec`, - // simplify store None (and not a subtree). + // simply store None (and not a subtree). coalesce_onwards: Vec<Option<ExecTree>>, } @@ -265,11 +259,11 @@ impl PlanWithCorrespondingCoalescePartitions { // maintain a single partition. If we just saw a `CoalescePartitionsExec` // operator, we reset the tree and start accumulating. let plan = item.plan; - if plan.as_any().is::<CoalescePartitionsExec>() { - Some(ExecTree::new(plan, idx, vec![])) - } else if plan.children().is_empty() { + if plan.children().is_empty() { // Plan has no children, there is nothing to propagate. None + } else if is_coalesce_partitions(&plan) { + Some(ExecTree::new(plan, idx, vec![])) } else { let children = item .coalesce_onwards @@ -356,16 +350,23 @@ impl PhysicalOptimizerRule for EnforceSorting { config: &ConfigOptions, ) -> Result<Arc<dyn ExecutionPlan>> { let plan_requirements = PlanWithCorrespondingSort::new(plan); + // Execute a bottom-up traversal to enforce sorting requirements, + // remove unnecessary sorts, and optimize sort-sensitive operators: let adjusted = plan_requirements.transform_up(&ensure_sorting)?; - if config.optimizer.repartition_sorts { + let new_plan = if config.optimizer.repartition_sorts { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new(adjusted.plan); let parallel = plan_with_coalesce_partitions.transform_up(¶llelize_sorts)?; - Ok(parallel.plan) + parallel.plan } else { - Ok(adjusted.plan) - } + adjusted.plan + }; + // Execute a top-down traversal to exploit sort push-down opportunities + // missed by the bottom-up traversal: + let sort_pushdown = SortPushDown::init(new_plan); + let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?; + Ok(adjusted.plan) } fn name(&self) -> &str { @@ -378,77 +379,63 @@ impl PhysicalOptimizerRule for EnforceSorting { } /// This function turns plans of the form -/// "SortExec: expr=[a@0 ASC]", +/// "SortExec: expr=\[a@0 ASC\]", /// " CoalescePartitionsExec", /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// to -/// "SortPreservingMergeExec: [a@0 ASC]", -/// " SortExec: expr=[a@0 ASC]", +/// "SortPreservingMergeExec: \[a@0 ASC\]", +/// " SortExec: expr=\[a@0 ASC\]", /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", -/// by following connections from `CoalescePartitionsExec`s to `SortExec`s. +/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. /// By performing sorting in parallel, we can increase performance in some scenarios. fn parallelize_sorts( requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> { - if requirements.plan.children().is_empty() { - return Ok(Transformed::No(requirements)); - } let plan = requirements.plan; let mut coalesce_onwards = requirements.coalesce_onwards; - // We know that `plan` has children, so `coalesce_onwards` is non-empty. - if coalesce_onwards[0].is_some() { - if (is_sort(&plan) || is_sort_preserving_merge(&plan)) - // Make sure that Sort is actually global sort - && plan.output_partitioning().partition_count() == 1 - { - // If there is a connection between a `CoalescePartitionsExec` and a - // Global Sort that satisfy the requirements (i.e. intermediate - // executors don't require single partition), then we can - // replace the `CoalescePartitionsExec`+ GlobalSort cascade with - // the `SortExec` + `SortPreservingMergeExec` - // cascade to parallelize sorting. - let mut prev_layer = plan.clone(); - update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; - let sort_exprs = get_sort_exprs(&plan)?; - add_sort_above(&mut prev_layer, sort_exprs.to_vec())?; - let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer); - return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { - plan: Arc::new(spm), - coalesce_onwards: vec![None], - })); - } else if plan.as_any().is::<CoalescePartitionsExec>() { - // There is an unnecessary `CoalescePartitionExec` in the plan. - let mut prev_layer = plan.clone(); - update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; - let new_plan = plan.with_new_children(vec![prev_layer])?; - return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { - plan: new_plan, - coalesce_onwards: vec![None], - })); - } + if plan.children().is_empty() || coalesce_onwards[0].is_none() { + // We only take an action when the plan is either a SortExec, a + // SortPreservingMergeExec or a CoalescePartitionsExec, and they + // all have a single child. Therefore, if the first child is `None`, + // we can return immediately. + return Ok(Transformed::No(PlanWithCorrespondingCoalescePartitions { + plan, + coalesce_onwards, + })); + } else if (is_sort(&plan) || is_sort_preserving_merge(&plan)) + && plan.output_partitioning().partition_count() <= 1 + { + // If there is a connection between a CoalescePartitionsExec and a + // global sort that satisfy the requirements (i.e. intermediate + // executors don't require single partition), then we can replace + // the CoalescePartitionsExec + Sort cascade with a SortExec + + // SortPreservingMergeExec cascade to parallelize sorting. + let mut prev_layer = plan.clone(); + update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; + let sort_exprs = get_sort_exprs(&plan)?; + add_sort_above(&mut prev_layer, sort_exprs.to_vec())?; + let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer); + return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { + plan: Arc::new(spm), + coalesce_onwards: vec![None], + })); + } else if is_coalesce_partitions(&plan) { + // There is an unnecessary `CoalescePartitionExec` in the plan. + let mut prev_layer = plan.clone(); + update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; + let new_plan = plan.with_new_children(vec![prev_layer])?; + return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { + plan: new_plan, + coalesce_onwards: vec![None], + })); } + Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { plan, coalesce_onwards, })) } -/// Checks whether the given executor is a limit; -/// i.e. either a `LocalLimitExec` or a `GlobalLimitExec`. -fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool { - plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>() -} - -/// Checks whether the given executor is a `SortExec`. -fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool { - plan.as_any().is::<SortExec>() -} - -/// Checks whether the given executor is a `SortPreservingMergeExec`. -fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool { - plan.as_any().is::<SortPreservingMergeExec>() -} - /// This function enforces sorting requirements and makes optimizations without /// violating these requirements whenever possible. fn ensure_sorting( @@ -489,17 +476,6 @@ fn ensure_sorting( *sort_onwards = None; } } - if let Some(tree) = sort_onwards { - // For window expressions, we can remove some sorts when we can - // calculate the result in reverse: - if plan.as_any().is::<WindowAggExec>() - || plan.as_any().is::<BoundedWindowAggExec>() - { - if let Some(result) = analyze_window_sort_removal(tree, &plan)? { - return Ok(Transformed::Yes(result)); - } - } - } } (Some(required), None) => { // Ordering requirement is not met, we should add a `SortExec` to the plan. @@ -509,31 +485,40 @@ fn ensure_sorting( } (None, Some(_)) => { // We have a `SortExec` whose effect may be neutralized by - // another order-imposing operator. Remove or update this sort: - if !plan.maintains_input_order()[idx] { - let count = plan.output_ordering().map_or(0, |e| e.len()); - if (count > 0) && !is_sort(&plan) { - update_child_to_change_finer_sort(child, sort_onwards, count)?; - } else { - update_child_to_remove_unnecessary_sort( - child, - sort_onwards, - &plan, - )?; - } + // another order-imposing operator. Remove this sort. + if !plan.maintains_input_order()[idx] || is_union(&plan) { + update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; } } (None, None) => {} } } + // For window expressions, we can remove some sorts when we can + // calculate the result in reverse: + if is_window(&plan) { + if let Some(tree) = &mut sort_onwards[0] { + if let Some(result) = analyze_window_sort_removal(tree, &plan)? { + return Ok(Transformed::Yes(result)); + } + } + } else if is_sort_preserving_merge(&plan) + && children[0].output_partitioning().partition_count() <= 1 + { + // This SortPreservingMergeExec is unnecessary, input already has a + // single partition. + return Ok(Transformed::Yes(PlanWithCorrespondingSort { + plan: children[0].clone(), + sort_onwards: vec![sort_onwards[0].clone()], + })); + } Ok(Transformed::Yes(PlanWithCorrespondingSort { plan: plan.with_new_children(children)?, sort_onwards, })) } -/// Analyzes a given `SortExec` (`plan`) to determine whether its input already -/// has a finer ordering than this `SortExec` enforces. +/// Analyzes a given [`SortExec`] (`plan`) to determine whether its input +/// already has a finer ordering than it enforces. fn analyze_immediate_sort_removal( plan: &Arc<dyn ExecutionPlan>, sort_onwards: &[Option<ExecTree>], @@ -580,8 +565,8 @@ fn analyze_immediate_sort_removal( None } -/// Analyzes a [WindowAggExec] or a [BoundedWindowAggExec] to determine whether -/// it may allow removing a sort. +/// Analyzes a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine +/// whether it may allow removing a sort. fn analyze_window_sort_removal( sort_tree: &mut ExecTree, window_exec: &Arc<dyn ExecutionPlan>, @@ -597,6 +582,10 @@ fn analyze_window_sort_removal( "Expects to receive either WindowAggExec of BoundedWindowAggExec".to_string(), )); }; + let n_req = window_exec.required_input_ordering()[0] + .as_ref() + .map(|elem| elem.len()) + .unwrap_or(0); let mut first_should_reverse = None; for sort_any in sort_tree.get_leaves() { @@ -606,14 +595,12 @@ fn analyze_window_sort_removal( // Therefore, we can use the 0th index without loss of generality. let sort_input = sort_any.children()[0].clone(); let physical_ordering = sort_input.output_ordering(); - // TODO: Once we can ensure that required ordering information propagates with - // the necessary lineage information, compare `physical_ordering` and the - // ordering required by the window executor instead of `sort_output_ordering`. - // This will enable us to handle cases such as (a,b) -> Sort -> (a,b,c) -> Required(a,b). - // Currently, we can not remove such sorts. - let required_ordering = sort_output_ordering.ok_or_else(|| { + let sort_output_ordering = sort_output_ordering.ok_or_else(|| { DataFusionError::Plan("A SortExec should have output ordering".to_string()) })?; + // It is enough to check whether first n_req element of the sort output satisfies window_exec requirement. + // Because length of window_exec requirement is n_req. + let required_ordering = &sort_output_ordering[0..n_req]; if let Some(physical_ordering) = physical_ordering { let (can_skip_sorting, should_reverse) = can_skip_sort( window_expr[0].partition_by(), @@ -623,8 +610,7 @@ fn analyze_window_sort_removal( )?; if !can_skip_sorting { return Ok(None); - } - if let Some(first_should_reverse) = first_should_reverse { + } else if let Some(first_should_reverse) = first_should_reverse { if first_should_reverse != should_reverse { return Ok(None); } @@ -685,7 +671,7 @@ fn update_child_to_remove_coalesce( coalesce_onwards: &mut Option<ExecTree>, ) -> Result<()> { if let Some(coalesce_onwards) = coalesce_onwards { - *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards)?; + *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards, child)?; } Ok(()) } @@ -693,27 +679,29 @@ fn update_child_to_remove_coalesce( /// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`. fn remove_corresponding_coalesce_in_sub_plan( coalesce_onwards: &mut ExecTree, + parent: &Arc<dyn ExecutionPlan>, ) -> Result<Arc<dyn ExecutionPlan>> { - Ok( - if coalesce_onwards - .plan - .as_any() - .is::<CoalescePartitionsExec>() + Ok(if is_coalesce_partitions(&coalesce_onwards.plan) { + // We can safely use the 0th index since we have a `CoalescePartitionsExec`. + let mut new_plan = coalesce_onwards.plan.children()[0].clone(); + while new_plan.output_partitioning() == parent.output_partitioning() + && is_repartition(&new_plan) + && is_repartition(parent) { - // We can safely use the 0th index since we have a `CoalescePartitionsExec`. - coalesce_onwards.plan.children()[0].clone() - } else { - let plan = coalesce_onwards.plan.clone(); - let mut children = plan.children(); - for item in &mut coalesce_onwards.children { - children[item.idx] = remove_corresponding_coalesce_in_sub_plan(item)?; - } - plan.with_new_children(children)? - }, - ) + new_plan = new_plan.children()[0].clone() + } + new_plan + } else { + let plan = coalesce_onwards.plan.clone(); + let mut children = plan.children(); + for item in &mut coalesce_onwards.children { + children[item.idx] = remove_corresponding_coalesce_in_sub_plan(item, &plan)?; + } + plan.with_new_children(children)? + }) } -/// Updates child to remove the unnecessary sorting below it. +/// Updates child to remove the unnecessary sort below it. fn update_child_to_remove_unnecessary_sort( child: &mut Arc<dyn ExecutionPlan>, sort_onwards: &mut Option<ExecTree>, @@ -739,8 +727,8 @@ fn remove_corresponding_sort_from_sub_plan( requires_single_partition: bool, ) -> Result<Arc<dyn ExecutionPlan>> { // A `SortExec` is always at the bottom of the tree. - if is_sort(&sort_onwards.plan) { - Ok(sort_onwards.plan.children()[0].clone()) + let mut updated_plan = if is_sort(&sort_onwards.plan) { + sort_onwards.plan.children()[0].clone() } else { let plan = &sort_onwards.plan; let mut children = plan.children(); @@ -753,62 +741,28 @@ fn remove_corresponding_sort_from_sub_plan( remove_corresponding_sort_from_sub_plan(item, requires_single_partition)?; } if is_sort_preserving_merge(plan) { - let child = &children[0]; - if requires_single_partition - && child.output_partitioning().partition_count() > 1 - { - Ok(Arc::new(CoalescePartitionsExec::new(child.clone()))) - } else { - Ok(child.clone()) - } + children[0].clone() } else { - plan.clone().with_new_children(children) - } - } -} - -/// Updates child to modify the unnecessarily fine sorting below it. -fn update_child_to_change_finer_sort( - child: &mut Arc<dyn ExecutionPlan>, - sort_onwards: &mut Option<ExecTree>, - n_sort_expr: usize, -) -> Result<()> { - if let Some(sort_onwards) = sort_onwards { - *child = change_finer_sort_in_sub_plan(sort_onwards, n_sort_expr)?; - } - Ok(()) -} - -/// Change the unnecessarily fine sort in `sort_onwards`. -fn change_finer_sort_in_sub_plan( - sort_onwards: &mut ExecTree, - n_sort_expr: usize, -) -> Result<Arc<dyn ExecutionPlan>> { - let plan = &sort_onwards.plan; - // A `SortExec` is always at the bottom of the tree. - if is_sort(plan) { - let mut prev_layer = plan.children()[0].clone(); - let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec(); - add_sort_above(&mut prev_layer, new_sort_expr)?; - *sort_onwards = ExecTree::new(prev_layer.clone(), sort_onwards.idx, vec![]); - Ok(prev_layer) - } else { - let mut children = plan.children(); - for item in &mut sort_onwards.children { - children[item.idx] = change_finer_sort_in_sub_plan(item, n_sort_expr)?; + plan.clone().with_new_children(children)? } - if is_sort_preserving_merge(plan) { - let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec(); - let updated_plan = Arc::new(SortPreservingMergeExec::new( - new_sort_expr, - children[0].clone(), - )) as Arc<dyn ExecutionPlan>; - sort_onwards.plan = updated_plan.clone(); - Ok(updated_plan) + }; + // Deleting a merging sort may invalidate distribution requirements. + // Ensure that we stay compliant with such requirements: + if requires_single_partition + && updated_plan.output_partitioning().partition_count() > 1 + { + // If there is existing ordering, to preserve ordering use SortPreservingMergeExec + // instead of CoalescePartitionsExec. + if let Some(ordering) = updated_plan.output_ordering() { + updated_plan = Arc::new(SortPreservingMergeExec::new( + ordering.to_vec(), + updated_plan, + )); } else { - plan.clone().with_new_children(children) + updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan.clone())); } } + Ok(updated_plan) } /// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice when possible. @@ -925,6 +879,9 @@ mod tests { use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; + use crate::physical_plan::joins::utils::JoinOn; + use crate::physical_plan::joins::SortMergeJoinExec; + use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -935,7 +892,9 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{Result, Statistics}; + use datafusion_expr::JoinType; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; + use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::expressions::{col, NotExpr}; use datafusion_physical_expr::PhysicalSortExpr; use std::sync::Arc; @@ -948,6 +907,13 @@ mod tests { Ok(schema) } + fn create_test_schema2() -> Result<SchemaRef> { + let col_a = Field::new("col_a", DataType::Int32, true); + let col_b = Field::new("col_b", DataType::Int32, true); + let schema = Arc::new(Schema::new(vec![col_a, col_b])); + Ok(schema) + } + // Util function to get string representation of a physical plan fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> { let formatted = displayable(plan.as_ref()).indent().to_string(); @@ -1066,6 +1032,7 @@ mod tests { // Run the actual optimizer let optimized_physical_plan = EnforceSorting::new().optimize(physical_plan, state.config_options())?; + // Get string representation of the plan let actual = get_plan_string(&optimized_physical_plan); assert_eq!( @@ -1111,7 +1078,7 @@ mod tests { )]; let sort = sort_exec(sort_exprs.clone(), source); - let window_agg = window_exec("non_nullable_col", sort_exprs, sort); + let window_agg = bounded_window_exec("non_nullable_col", sort_exprs, sort); let sort_exprs = vec![sort_expr_options( "non_nullable_col", @@ -1132,14 +1099,13 @@ mod tests { sort, ); - // let filter_exec = sort_exec; - let physical_plan = window_exec("non_nullable_col", sort_exprs, filter); + let physical_plan = bounded_window_exec("non_nullable_col", sort_exprs, filter); let expected_input = vec![ - "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", " FilterExec: NOT non_nullable_col@1", " SortExec: expr=[non_nullable_col@1 ASC NULLS LAST]", - " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", " SortExec: expr=[non_nullable_col@1 DESC]", " MemoryExec: partitions=0, partition_sizes=[]", ]; @@ -1147,7 +1113,7 @@ mod tests { let expected_optimized = vec![ "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", " FilterExec: NOT non_nullable_col@1", - " WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", " SortExec: expr=[non_nullable_col@1 DESC]", " MemoryExec: partitions=0, partition_sizes=[]", ]; @@ -1169,9 +1135,8 @@ mod tests { " MemoryExec: partitions=0, partition_sizes=[]", ]; let expected_optimized = vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: expr=[nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", + "SortExec: expr=[nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1196,10 +1161,8 @@ mod tests { " MemoryExec: partitions=0, partition_sizes=[]", ]; let expected_optimized = vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: expr=[nullable_col@0 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", + "SortExec: expr=[nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1257,7 +1220,12 @@ mod tests { sort_expr("non_nullable_col", &schema), ]; let repartition_exec = repartition_exec(spm); - let sort2 = sort_exec(sort_exprs.clone(), repartition_exec); + let sort2 = Arc::new(SortExec::new_with_partitioning( + sort_exprs.clone(), + repartition_exec, + true, + None, + )) as _; let spm2 = sort_preserving_merge_exec(sort_exprs, sort2); let physical_plan = aggregate_exec(spm2); @@ -1285,6 +1253,95 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_remove_unnecessary_sort4() -> Result<()> { + let schema = create_test_schema()?; + let source1 = repartition_exec(memory_exec(&schema)); + + let source2 = repartition_exec(memory_exec(&schema)); + let union = union_exec(vec![source1, source2]); + + let sort_exprs = vec![sort_expr("non_nullable_col", &schema)]; + // let sort = sort_exec(sort_exprs.clone(), union); + let sort = Arc::new(SortExec::new_with_partitioning( + sort_exprs.clone(), + union, + true, + None, + )) as _; + let spm = sort_preserving_merge_exec(sort_exprs, sort); + + let filter = filter_exec( + Arc::new(NotExpr::new( + col("non_nullable_col", schema.as_ref()).unwrap(), + )), + spm, + ); + + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let physical_plan = sort_exec(sort_exprs, filter); + + // When removing a `SortPreservingMergeExec`, make sure that partitioning + // requirements are not violated. In some cases, we may need to replace + // it with a `CoalescePartitionsExec` instead of directly removing it. + let expected_input = vec![ + "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " FilterExec: NOT non_nullable_col@1", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: expr=[non_nullable_col@1 ASC]", + " UnionExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " FilterExec: NOT non_nullable_col@1", + " UnionExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_remove_unnecessary_spm1() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let input = sort_preserving_merge_exec( + vec![sort_expr("non_nullable_col", &schema)], + source, + ); + let input2 = sort_preserving_merge_exec( + vec![sort_expr("non_nullable_col", &schema)], + input, + ); + let physical_plan = + sort_preserving_merge_exec(vec![sort_expr("nullable_col", &schema)], input2); + + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortExec: expr=[nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + #[tokio::test] async fn test_do_not_remove_sort_with_limit() -> Result<()> { let schema = create_test_schema()?; @@ -1295,8 +1352,7 @@ mod tests { sort_expr("non_nullable_col", &schema), ]; let sort = sort_exec(sort_exprs.clone(), source1); - let limit = local_limit_exec(sort); - let limit = global_limit_exec(limit); + let limit = limit_exec(sort); let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)]; let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs); @@ -1348,9 +1404,35 @@ mod tests { " MemoryExec: partitions=0, partition_sizes=[]", ]; let expected_optimized = vec![ - "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " MemoryExec: partitions=0, partition_sizes=[]", + "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_change_wrong_sorting2() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let sort_exprs = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let spm1 = sort_preserving_merge_exec(sort_exprs.clone(), source); + let sort2 = sort_exec(vec![sort_exprs[0].clone()], spm1); + let physical_plan = + sort_preserving_merge_exec(vec![sort_exprs[1].clone()], sort2); + + let expected_input = vec![ + "SortPreservingMergeExec: [non_nullable_col@1 ASC]", + " SortExec: expr=[nullable_col@0 ASC]", + " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortExec: expr=[non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1432,9 +1514,8 @@ mod tests { let physical_plan = sort_preserving_merge_exec(sort_exprs, union); // Input is an invalid plan. In this case rule should add required sorting in appropriate places. - // First ParquetExec has output ordering(nullable_col@0 ASC). However, it doesn't satisfy required ordering - // of SortPreservingMergeExec. Hence rule should remove unnecessary sort for second child of the UnionExec - // and put a sort above Union to satisfy required ordering. + // First ParquetExec has output ordering(nullable_col@0 ASC). However, it doesn't satisfy the + // required ordering of SortPreservingMergeExec. let expected_input = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", " UnionExec", @@ -1442,12 +1523,13 @@ mod tests { " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; - // should remove unnecessary sorting from below and move it to top + let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " UnionExec", + " UnionExec", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -1532,10 +1614,12 @@ mod tests { ]; let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", - " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", - " UnionExec", + " UnionExec", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); @@ -1636,10 +1720,99 @@ mod tests { " SortExec: expr=[nullable_col@0 ASC]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", - " SortPreservingMergeExec: [nullable_col@0 ASC]", - " SortExec: expr=[nullable_col@0 ASC]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted7() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort_exprs3 = vec![sort_expr("nullable_col", &schema)]; + let sort1 = sort_exec(sort_exprs1.clone(), source1.clone()); + let sort2 = sort_exec(sort_exprs1, source1); + + let union = union_exec(vec![sort1, sort2]); + let physical_plan = sort_preserving_merge_exec(sort_exprs3, union); + + // Union has unnecessarily fine ordering below it. We should be able to replace them with absolutely necessary ordering. + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // Union preserves the inputs ordering and we should not change any of the SortExecs under UnionExec + let expected_output = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_output, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted8() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort_exprs2 = vec![ + sort_expr_options( + "nullable_col", + &schema, + SortOptions { + descending: true, + nulls_first: false, + }, + ), + sort_expr_options( + "non_nullable_col", + &schema, + SortOptions { + descending: true, + nulls_first: false, + }, + ), + ]; + let sort1 = sort_exec(sort_exprs1, source1.clone()); + let sort2 = sort_exec(sort_exprs2, source1); + + let physical_plan = union_exec(vec![sort1, sort2]); + + // The `UnionExec` doesn't preserve any of the inputs ordering in the + // example below. + let expected_input = vec![ + "UnionExec", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 DESC NULLS LAST,non_nullable_col@1 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + // Since `UnionExec` doesn't preserve ordering in the plan above. + // We shouldn't keep SortExecs in the plan. + let expected_optimized = vec![ + "UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1669,25 +1842,378 @@ mod tests { let sort2 = sort_exec(sort_exprs3.clone(), source2); let union = union_exec(vec![sort1, sort2]); - let physical_plan = window_exec("nullable_col", sort_exprs3, union); + let spm = sort_preserving_merge_exec(sort_exprs3.clone(), union); + let physical_plan = bounded_window_exec("nullable_col", sort_exprs3, spm); // The `WindowAggExec` gets its sorting from multiple children jointly. // During the removal of `SortExec`s, it should be able to remove the // corresponding SortExecs together. Also, the inputs of these `SortExec`s // are not necessarily the same to be able to remove them. let expected_input = vec![ - "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", - " UnionExec", - " SortExec: expr=[nullable_col@0 DESC NULLS LAST]", + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]", + " UnionExec", + " SortExec: expr=[nullable_col@0 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + ]; + let expected_optimized = vec![ + "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", - " SortExec: expr=[nullable_col@0 DESC NULLS LAST]", " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_window_multi_path_sort2() -> Result<()> { + let schema = create_test_schema()?; + + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort_exprs2 = vec![sort_expr("nullable_col", &schema)]; + let source1 = parquet_exec_sorted(&schema, sort_exprs2.clone()); + let source2 = parquet_exec_sorted(&schema, sort_exprs2.clone()); + let sort1 = sort_exec(sort_exprs1.clone(), source1); + let sort2 = sort_exec(sort_exprs1.clone(), source2); + + let union = union_exec(vec![sort1, sort2]); + let spm = Arc::new(SortPreservingMergeExec::new(sort_exprs1, union)) as _; + let physical_plan = bounded_window_exec("nullable_col", sort_exprs2, spm); + + // The `WindowAggExec` can get its required sorting from the leaf nodes directly. + // The unnecessary SortExecs should be removed + let expected_input = vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + " UnionExec", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + ]; let expected_optimized = vec![ - "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]", + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_union_inputs_different_sorted_with_limit() -> Result<()> { + let schema = create_test_schema()?; + + let source1 = parquet_exec(&schema); + let sort_exprs1 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + let sort_exprs2 = vec![ + sort_expr("nullable_col", &schema), + sort_expr_options( + "non_nullable_col", + &schema, + SortOptions { + descending: true, + nulls_first: false, + }, + ), + ]; + let sort_exprs3 = vec![sort_expr("nullable_col", &schema)]; + let sort1 = sort_exec(sort_exprs1, source1.clone()); + + let sort2 = sort_exec(sort_exprs2, source1); + let limit = local_limit_exec(sort2); + let limit = global_limit_exec(limit); + + let union = union_exec(vec![sort1, limit]); + let physical_plan = sort_preserving_merge_exec(sort_exprs3, union); + + // Should not change the unnecessarily fine `SortExec`s because there is `LimitExec` + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", " UnionExec", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]", - " ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " GlobalLimitExec: skip=0, fetch=100", + " LocalLimitExec: fetch=100", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " UnionExec", + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " GlobalLimitExec: skip=0, fetch=100", + " LocalLimitExec: fetch=100", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC NULLS LAST]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + + #[tokio::test] + async fn test_sort_merge_join_order_by_left() -> Result<()> { + let left_schema = create_test_schema()?; + let right_schema = create_test_schema2()?; + + let left = parquet_exec(&left_schema); + let right = parquet_exec(&right_schema); + + // Join on (nullable_col == col_a) + let join_on = vec![( + Column::new_with_schema("nullable_col", &left.schema()).unwrap(), + Column::new_with_schema("col_a", &right.schema()).unwrap(), + )]; + + let join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::LeftSemi, + JoinType::LeftAnti, + ]; + for join_type in join_types { + let join = + sort_merge_join_exec(left.clone(), right.clone(), &join_on, &join_type); + let sort_exprs = vec![ + sort_expr("nullable_col", &join.schema()), + sort_expr("non_nullable_col", &join.schema()), + ]; + let physical_plan = sort_preserving_merge_exec(sort_exprs.clone(), join); + + let join_plan = + format!("SortMergeJoin: join_type={join_type}, on=[(Column {{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]"); + let join_plan2 = + format!(" SortMergeJoin: join_type={join_type}, on=[(Column {{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]"); + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]", + join_plan2.as_str(), + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ]; + let expected_optimized = match join_type { + JoinType::Inner + | JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftAnti => { + // can push down the sort requirements and save 1 SortExec + vec![ + join_plan.as_str(), + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[col_a@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ] + } + _ => { + // can not push down the sort requirements + vec![ + "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + join_plan2.as_str(), + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[col_a@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ] + } + }; + assert_optimized!(expected_input, expected_optimized, physical_plan); + } + Ok(()) + } + + #[tokio::test] + async fn test_sort_merge_join_order_by_right() -> Result<()> { + let left_schema = create_test_schema()?; + let right_schema = create_test_schema2()?; + + let left = parquet_exec(&left_schema); + let right = parquet_exec(&right_schema); + + // Join on (nullable_col == col_a) + let join_on = vec![( + Column::new_with_schema("nullable_col", &left.schema()).unwrap(), + Column::new_with_schema("col_a", &right.schema()).unwrap(), + )]; + + let join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::RightAnti, + ]; + for join_type in join_types { + let join = + sort_merge_join_exec(left.clone(), right.clone(), &join_on, &join_type); + let sort_exprs = vec![ + sort_expr("col_a", &join.schema()), + sort_expr("col_b", &join.schema()), + ]; + let physical_plan = sort_preserving_merge_exec(sort_exprs, join); + + let join_plan = + format!("SortMergeJoin: join_type={join_type}, on=[(Column {{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]"); + let spm_plan = match join_type { + JoinType::RightAnti => { + "SortPreservingMergeExec: [col_a@0 ASC,col_b@1 ASC]" + } + _ => "SortPreservingMergeExec: [col_a@2 ASC,col_b@3 ASC]", + }; + let join_plan2 = + format!(" SortMergeJoin: join_type={join_type}, on=[(Column {{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]"); + let expected_input = vec![ + spm_plan, + join_plan2.as_str(), + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ]; + let expected_optimized = match join_type { + JoinType::Inner | JoinType::Right | JoinType::RightAnti => { + // can push down the sort requirements and save 1 SortExec + vec![ + join_plan.as_str(), + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[col_a@0 ASC,col_b@1 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ] + } + _ => { + // can not push down the sort requirements for Left and Full join. + vec![ + "SortExec: expr=[col_a@2 ASC,col_b@3 ASC]", + join_plan2.as_str(), + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[col_a@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ] + } + }; + assert_optimized!(expected_input, expected_optimized, physical_plan); + } + Ok(()) + } + + #[tokio::test] + async fn test_sort_merge_join_complex_order_by() -> Result<()> { + let left_schema = create_test_schema()?; + let right_schema = create_test_schema2()?; + + let left = parquet_exec(&left_schema); + let right = parquet_exec(&right_schema); + + // Join on (nullable_col == col_a) + let join_on = vec![( + Column::new_with_schema("nullable_col", &left.schema()).unwrap(), + Column::new_with_schema("col_a", &right.schema()).unwrap(), + )]; + + let join = sort_merge_join_exec(left, right, &join_on, &JoinType::Inner); + + // order by (col_b, col_a) + let sort_exprs1 = vec![ + sort_expr("col_b", &join.schema()), + sort_expr("col_a", &join.schema()), + ]; + let physical_plan = sort_preserving_merge_exec(sort_exprs1, join.clone()); + + let expected_input = vec![ + "SortPreservingMergeExec: [col_b@3 ASC,col_a@2 ASC]", + " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ]; + + // can not push down the sort requirements, need to add SortExec + let expected_optimized = vec![ + "SortExec: expr=[col_b@3 ASC,col_a@2 ASC]", + " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[col_a@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + + // order by (nullable_col, col_b, col_a) + let sort_exprs2 = vec![ + sort_expr("nullable_col", &join.schema()), + sort_expr("col_b", &join.schema()), + sort_expr("col_a", &join.schema()), + ]; + let physical_plan = sort_preserving_merge_exec(sort_exprs2, join); + + let expected_input = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]", + " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ]; + + // can not push down the sort requirements, need to add SortExec + let expected_optimized = vec![ + "SortExec: expr=[nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]", + " SortMergeJoin: join_type=Inner, on=[(Column { name: \"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]", + " SortExec: expr=[nullable_col@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", + " SortExec: expr=[col_a@0 ASC]", + " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[col_a, col_b]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + + Ok(()) + } + + #[tokio::test] + async fn test_multiple_sort_window_exec() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + + let sort_exprs1 = vec![sort_expr("nullable_col", &schema)]; + let sort_exprs2 = vec![ + sort_expr("nullable_col", &schema), + sort_expr("non_nullable_col", &schema), + ]; + + let sort1 = sort_exec(sort_exprs1.clone(), source); + let window_agg1 = + bounded_window_exec("non_nullable_col", sort_exprs1.clone(), sort1); + let window_agg2 = + bounded_window_exec("non_nullable_col", sort_exprs2, window_agg1); + // let filter_exec = sort_exec; + let physical_plan = + bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2); + + let expected_input = vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortExec: expr=[nullable_col@0 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + + let expected_optimized = vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]", + " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]", + " MemoryExec: partitions=0, partition_sizes=[]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1745,7 +2271,7 @@ mod tests { let memory_exec = memory_exec(&schema); let sort_exprs = vec![sort_expr("nullable_col", &schema)]; - let window = window_exec("nullable_col", sort_exprs.clone(), memory_exec); + let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec); let repartition = repartition_exec(window); let orig_plan = Arc::new(SortExec::new_with_partitioning( @@ -1813,9 +2339,8 @@ mod tests { let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", " SortExec: expr=[nullable_col@0 ASC]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", - " MemoryExec: partitions=0, partition_sizes=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -1865,7 +2390,7 @@ mod tests { Arc::new(FilterExec::try_new(predicate, input).unwrap()) } - fn window_exec( + fn bounded_window_exec( col_name: &str, sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>, @@ -1874,7 +2399,7 @@ mod tests { let schema = input.schema(); Arc::new( - WindowAggExec::try_new( + BoundedWindowAggExec::try_new( vec![create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Count), "count".to_owned(), @@ -1940,6 +2465,10 @@ mod tests { Arc::new(UnionExec::new(input)) } + fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> { + global_limit_exec(local_limit_exec(input)) + } + fn local_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> { Arc::new(LocalLimitExec::new(input, 100)) } @@ -1967,4 +2496,23 @@ mod tests { .unwrap(), ) } + + fn sort_merge_join_exec( + left: Arc<dyn ExecutionPlan>, + right: Arc<dyn ExecutionPlan>, + join_on: &JoinOn, + join_type: &JoinType, + ) -> Arc<dyn ExecutionPlan> { + Arc::new( + SortMergeJoinExec::try_new( + left, + right, + join_on.clone(), + *join_type, + vec![SortOptions::default(); join_on.len()], + false, + ) + .unwrap(), + ) + } } diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs new file mode 100644 index 0000000000..07d0002548 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use crate::physical_optimizer::utils::{add_sort_above, is_limit, is_union, is_window}; +use crate::physical_plan::filter::FilterExec; +use crate::physical_plan::joins::utils::JoinSide; +use crate::physical_plan::joins::SortMergeJoinExec; +use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::repartition::RepartitionExec; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::JoinType; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::utils::{ + make_sort_exprs_from_requirements, ordering_satisfy_requirement, + requirements_compatible, +}; +use datafusion_physical_expr::{ + make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement, +}; +use itertools::izip; +use std::ops::Deref; +use std::sync::Arc; + +/// This is a "data class" we use within the [`EnforceSorting`] rule to push +/// down [`SortExec`] in the plan. In some cases, we can reduce the total +/// computational cost by pushing down `SortExec`s through some executors. +#[derive(Debug, Clone)] +pub(crate) struct SortPushDown { + /// Current plan + pub plan: Arc<dyn ExecutionPlan>, + /// Parent required sort ordering + required_ordering: Option<Vec<PhysicalSortRequirement>>, + /// The adjusted request sort ordering to children. + /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties. + adjusted_request_ordering: Vec<Option<Vec<PhysicalSortRequirement>>>, +} + +impl SortPushDown { + pub fn init(plan: Arc<dyn ExecutionPlan>) -> Self { + let request_ordering = plan.required_input_ordering(); + SortPushDown { + plan, + required_ordering: None, + adjusted_request_ordering: request_ordering, + } + } + + pub fn children(&self) -> Vec<SortPushDown> { + izip!( + self.plan.children().into_iter(), + self.adjusted_request_ordering.clone().into_iter(), + ) + .map(|(child, from_parent)| { + let child_request_ordering = child.required_input_ordering(); + SortPushDown { + plan: child, + required_ordering: from_parent, + adjusted_request_ordering: child_request_ordering, + } + }) + .collect() + } +} + +impl TreeNode for SortPushDown { + fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion> + where + F: FnMut(&Self) -> Result<VisitRecursion>, + { + let children = self.children(); + for child in children { + match op(&child)? { + VisitRecursion::Continue => {} + VisitRecursion::Skip => return Ok(VisitRecursion::Continue), + VisitRecursion::Stop => return Ok(VisitRecursion::Stop), + } + } + + Ok(VisitRecursion::Continue) + } + + fn map_children<F>(mut self, transform: F) -> Result<Self> + where + F: FnMut(Self) -> Result<Self>, + { + let children = self.children(); + if !children.is_empty() { + let children_plans = children + .into_iter() + .map(transform) + .map(|r| r.map(|s| s.plan)) + .collect::<Result<Vec<_>>>()?; + + match with_new_children_if_necessary(self.plan, children_plans)? { + Transformed::Yes(plan) | Transformed::No(plan) => { + self.plan = plan; + } + } + }; + Ok(self) + } +} + +pub(crate) fn pushdown_sorts( + requirements: SortPushDown, +) -> Result<Transformed<SortPushDown>> { + let plan = &requirements.plan; + let parent_required = requirements.required_ordering.as_deref(); + const ERR_MSG: &str = "Expects parent requirement to contain something"; + let err = || DataFusionError::Plan(ERR_MSG.to_string()); + if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() { + let mut new_plan = plan.clone(); + if !ordering_satisfy_requirement(plan.output_ordering(), parent_required, || { + plan.equivalence_properties() + }) { + // If the current plan is a SortExec, modify it to satisfy parent requirements: + let parent_required_expr = + make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?); + new_plan = sort_exec.input.clone(); + add_sort_above(&mut new_plan, parent_required_expr)?; + }; + let required_ordering = new_plan + .output_ordering() + .map(make_sort_requirements_from_exprs); + // Since new_plan is a SortExec, we can safely get the 0th index. + let child = &new_plan.children()[0]; + if let Some(adjusted) = + pushdown_requirement_to_children(child, required_ordering.as_deref())? + { + // Can push down requirements + Ok(Transformed::Yes(SortPushDown { + plan: child.clone(), + required_ordering: None, + adjusted_request_ordering: adjusted, + })) + } else { + // Can not push down requirements + Ok(Transformed::Yes(SortPushDown::init(new_plan))) + } + } else { + // Executors other than SortExec + if ordering_satisfy_requirement(plan.output_ordering(), parent_required, || { + plan.equivalence_properties() + }) { + // Satisfies parent requirements, immediately return. + return Ok(Transformed::Yes(SortPushDown { + required_ordering: None, + ..requirements + })); + } + // Can not satisfy the parent requirements, check whether the requirements can be pushed down: + if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_required)? { + Ok(Transformed::Yes(SortPushDown { + plan: plan.clone(), + required_ordering: None, + adjusted_request_ordering: adjusted, + })) + } else { + // Can not push down requirements, add new SortExec: + let parent_required_expr = + make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?); + let mut new_plan = plan.clone(); + add_sort_above(&mut new_plan, parent_required_expr)?; + Ok(Transformed::Yes(SortPushDown::init(new_plan))) + } + } +} + +fn pushdown_requirement_to_children( + plan: &Arc<dyn ExecutionPlan>, + parent_required: Option<&[PhysicalSortRequirement]>, +) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> { + const ERR_MSG: &str = "Expects parent requirement to contain something"; + let err = || DataFusionError::Plan(ERR_MSG.to_string()); + let maintains_input_order = plan.maintains_input_order(); + if is_window(plan) { + let required_input_ordering = plan.required_input_ordering(); + let request_child = required_input_ordering[0].as_deref(); + let child_plan = plan.children()[0].clone(); + match determine_children_requirement(parent_required, request_child, child_plan) { + RequirementsCompatibility::Satisfy => { + Ok(Some(vec![request_child.map(|r| r.to_vec())])) + } + RequirementsCompatibility::Compatible(adjusted) => Ok(Some(vec![adjusted])), + RequirementsCompatibility::NonCompatible => Ok(None), + } + } else if is_union(plan) { + // UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and + // propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec + Ok(Some(vec![ + parent_required.map(|elem| elem.to_vec()); + plan.children().len() + ])) + } else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() { + // If the current plan is SortMergeJoinExec + let left_columns_len = smj.left.schema().fields().len(); + let parent_required_expr = + make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?); + let expr_source_side = + expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len); + match expr_source_side { + Some(JoinSide::Left) if maintains_input_order[0] => { + try_pushdown_requirements_to_join( + plan, + parent_required, + parent_required_expr, + JoinSide::Left, + ) + } + Some(JoinSide::Right) if maintains_input_order[1] => { + let new_right_required = match smj.join_type { + JoinType::Inner | JoinType::Right => shift_right_required( + parent_required.ok_or_else(err)?, + left_columns_len, + )?, + JoinType::RightSemi | JoinType::RightAnti => { + parent_required.ok_or_else(err)?.to_vec() + } + _ => Err(DataFusionError::Plan( + "Unexpected SortMergeJoin type here".to_string(), + ))?, + }; + try_pushdown_requirements_to_join( + plan, + Some(new_right_required.deref()), + parent_required_expr, + JoinSide::Right, + ) + } + _ => { + // Can not decide the expr side for SortMergeJoinExec, can not push down + Ok(None) + } + } + } else if maintains_input_order.is_empty() + || !maintains_input_order.iter().any(|o| *o) + || plan.as_any().is::<RepartitionExec>() + || plan.as_any().is::<FilterExec>() + // TODO: Add support for Projection push down + || plan.as_any().is::<ProjectionExec>() + || is_limit(plan) + { + // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements. + // For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering. + // Pushing down is not beneficial + Ok(None) + } else { + Ok(Some(vec![ + parent_required.map(|elem| elem.to_vec()); + plan.children().len() + ])) + } + // TODO: Add support for Projection push down +} + +/// Determine the children requirements +/// If the children requirements are more specific, do not push down the parent requirements +/// If the the parent requirements are more specific, push down the parent requirements +/// If they are not compatible, need to add Sort. +fn determine_children_requirement( + parent_required: Option<&[PhysicalSortRequirement]>, + request_child: Option<&[PhysicalSortRequirement]>, + child_plan: Arc<dyn ExecutionPlan>, +) -> RequirementsCompatibility { + if requirements_compatible(request_child, parent_required, || { + child_plan.equivalence_properties() + }) { + // request child requirements are more specific, no need to push down the parent requirements + RequirementsCompatibility::Satisfy + } else if requirements_compatible(parent_required, request_child, || { + child_plan.equivalence_properties() + }) { + // parent requirements are more specific, adjust the request child requirements and push down the new requirements + let adjusted = parent_required.map(|r| r.to_vec()); + RequirementsCompatibility::Compatible(adjusted) + } else { + RequirementsCompatibility::NonCompatible + } +} + +fn try_pushdown_requirements_to_join( + plan: &Arc<dyn ExecutionPlan>, + parent_required: Option<&[PhysicalSortRequirement]>, + sort_expr: Vec<PhysicalSortExpr>, + push_side: JoinSide, +) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> { + let child_idx = match push_side { + JoinSide::Left => 0, + JoinSide::Right => 1, + }; + let required_input_ordering = plan.required_input_ordering(); + let request_child = required_input_ordering[child_idx].as_deref(); + let child_plan = plan.children()[child_idx].clone(); + match determine_children_requirement(parent_required, request_child, child_plan) { + RequirementsCompatibility::Satisfy => Ok(None), + RequirementsCompatibility::Compatible(adjusted) => { + let new_adjusted = match push_side { + JoinSide::Left => { + vec![adjusted, required_input_ordering[1].clone()] + } + JoinSide::Right => { + vec![required_input_ordering[0].clone(), adjusted] + } + }; + Ok(Some(new_adjusted)) + } + RequirementsCompatibility::NonCompatible => { + // Can not push down, add new SortExec + add_sort_above(&mut plan.clone(), sort_expr)?; + Ok(None) + } + } +} + +fn expr_source_sides( + required_exprs: &[PhysicalSortExpr], + join_type: JoinType, + left_columns_len: usize, +) -> Option<JoinSide> { + match join_type { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { + let all_column_sides = required_exprs + .iter() + .filter_map(|r| { + r.expr.as_any().downcast_ref::<Column>().map(|col| { + if col.index() < left_columns_len { + JoinSide::Left + } else { + JoinSide::Right + } + }) + }) + .collect::<Vec<_>>(); + + // If the exprs are all coming from one side, the requirements can be pushed down + if all_column_sides.len() != required_exprs.len() { + None + } else if all_column_sides + .iter() + .all(|side| matches!(side, JoinSide::Left)) + { + Some(JoinSide::Left) + } else if all_column_sides + .iter() + .all(|side| matches!(side, JoinSide::Right)) + { + Some(JoinSide::Right) + } else { + None + } + } + JoinType::LeftSemi | JoinType::LeftAnti => required_exprs + .iter() + .all(|e| e.expr.as_any().downcast_ref::<Column>().is_some()) + .then_some(JoinSide::Left), + JoinType::RightSemi | JoinType::RightAnti => required_exprs + .iter() + .all(|e| e.expr.as_any().downcast_ref::<Column>().is_some()) + .then_some(JoinSide::Right), + } +} + +fn shift_right_required( + parent_required: &[PhysicalSortRequirement], + left_columns_len: usize, +) -> Result<Vec<PhysicalSortRequirement>> { + let new_right_required: Vec<PhysicalSortRequirement> = parent_required + .iter() + .filter_map(|r| { + r.expr.as_any().downcast_ref::<Column>().and_then(|col| { + (col.index() >= left_columns_len).then_some(PhysicalSortRequirement { + expr: Arc::new(Column::new( + col.name(), + col.index() - left_columns_len, + )) as _, + options: r.options, + }) + }) + }) + .collect::<Vec<_>>(); + if new_right_required.len() == parent_required.len() { + Ok(new_right_required) + } else { + Err(DataFusionError::Plan( + "Expect to shift all the parent required column indexes for SortMergeJoin" + .to_string(), + )) + } +} + +/// Define the Requirements Compatibility +#[derive(Debug)] +enum RequirementsCompatibility { + /// Requirements satisfy + Satisfy, + /// Requirements compatible + Compatible(Option<Vec<PhysicalSortRequirement>>), + /// Requirements not compatible + NonCompatible, +} diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 30b8243e46..2fa833bb7e 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -21,7 +21,13 @@ use super::optimizer::PhysicalOptimizerRule; use crate::config::ConfigOptions; use crate::error::Result; +use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use crate::physical_plan::union::UnionExec; +use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::tree_node::Transformed; use datafusion_physical_expr::utils::ordering_satisfy; @@ -68,3 +74,40 @@ pub fn add_sort_above( } Ok(()) } + +/// Checks whether the given operator is a limit; +/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`]. +pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool { + plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>() +} + +/// Checks whether the given operator is a window; +/// i.e. either a [`WindowAggExec`] or a [`BoundedWindowAggExec`]. +pub fn is_window(plan: &Arc<dyn ExecutionPlan>) -> bool { + plan.as_any().is::<WindowAggExec>() || plan.as_any().is::<BoundedWindowAggExec>() +} + +/// Checks whether the given operator is a [`SortExec`]. +pub fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool { + plan.as_any().is::<SortExec>() +} + +/// Checks whether the given operator is a [`SortPreservingMergeExec`]. +pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool { + plan.as_any().is::<SortPreservingMergeExec>() +} + +/// Checks whether the given operator is a [`CoalescePartitionsExec`]. +pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool { + plan.as_any().is::<CoalescePartitionsExec>() +} + +/// Checks whether the given operator is a [`UnionExec`]. +pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool { + plan.as_any().is::<UnionExec>() +} + +/// Checks whether the given operator is a [`RepartitionExec`]. +pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool { + plan.as_any().is::<RepartitionExec>() +} diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 623bafe093..94f5c9e5ef 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -35,6 +35,9 @@ use arrow::compute::{concat_batches, take, SortOptions}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::{ + make_sort_requirements_from_exprs, PhysicalSortRequirement, +}; use futures::{Stream, StreamExt}; use crate::error::DataFusionError; @@ -55,9 +58,6 @@ use crate::physical_plan::{ }; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortRequirement, -}; /// join execution plan executes partitions in parallel and combines them into a set of /// partitions. @@ -249,6 +249,17 @@ impl ExecutionPlan for SortMergeJoinExec { self.output_ordering.as_deref() } + fn maintains_input_order(&self) -> Vec<bool> { + match self.join_type { + JoinType::Inner => vec![true, true], + JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![true, false], + JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + vec![false, true] + } + _ => vec![false, false], + } + } + fn equivalence_properties(&self) -> EquivalenceProperties { let left_columns_len = self.left.schema().fields.len(); combine_join_equivalence_properties( diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index aa6f8361dc..f9a9209454 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -576,7 +576,6 @@ impl DefaultPhysicalPlanner { } let logical_input_schema = input.schema(); - let physical_input_schema = input_exec.schema(); let window_expr = window_expr .iter() diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 29d5c4dc8f..75598f1d52 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -43,7 +43,6 @@ use datafusion_common::DataFusionError; use datafusion_physical_expr::PhysicalSortRequirement; use futures::stream::Stream; use futures::{ready, StreamExt}; -use log::debug; use std::any::Any; use std::ops::Range; use std::pin::Pin; @@ -179,10 +178,8 @@ impl ExecutionPlan for WindowAggExec { fn required_input_distribution(&self) -> Vec<Distribution> { if self.partition_keys.is_empty() { - debug!("No partition defined for WindowAggExec!!!"); vec![Distribution::SinglePartition] } else { - //TODO support PartitionCollections if there is no common partition columns in the window_expr vec![Distribution::HashPartitioned(self.partition_keys.clone())] } } diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index cd4ac6ff3d..5c30d523d8 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -120,32 +120,24 @@ pub fn normalize_out_expr_with_alias_schema( alias_map: &HashMap<Column, Vec<Column>>, schema: &SchemaRef, ) -> Arc<dyn PhysicalExpr> { - let expr_clone = expr.clone(); - expr_clone + expr.clone() .transform(&|expr| { - let normalized_form: Option<Arc<dyn PhysicalExpr>> = - match expr.as_any().downcast_ref::<Column>() { - Some(column) => { - let out = alias_map - .get(column) - .map(|c| { - let out_col: Arc<dyn PhysicalExpr> = - Arc::new(c[0].clone()); - out_col - }) - .or_else(|| match schema.index_of(column.name()) { - // Exactly matching, return None, no need to do the transform - Ok(idx) if column.index() == idx => None, - _ => { - let out_col: Arc<dyn PhysicalExpr> = - Arc::new(UnKnownColumn::new(column.name())); - Some(out_col) - } - }); - out - } - None => None, - }; + let normalized_form: Option<Arc<dyn PhysicalExpr>> = match expr + .as_any() + .downcast_ref::<Column>() + { + Some(column) => { + alias_map + .get(column) + .map(|c| Arc::new(c[0].clone()) as _) + .or_else(|| match schema.index_of(column.name()) { + // Exactly matching, return None, no need to do the transform + Ok(idx) if column.index() == idx => None, + _ => Some(Arc::new(UnKnownColumn::new(column.name())) as _), + }) + } + None => None, + }; Ok(if let Some(normalized_form) = normalized_form { Transformed::Yes(normalized_form) } else { @@ -159,8 +151,7 @@ pub fn normalize_expr_with_equivalence_properties( expr: Arc<dyn PhysicalExpr>, eq_properties: &[EquivalentClass], ) -> Arc<dyn PhysicalExpr> { - let expr_clone = expr.clone(); - expr_clone + expr.clone() .transform(&|expr| { let normalized_form: Option<Arc<dyn PhysicalExpr>> = match expr.as_any().downcast_ref::<Column>() { @@ -235,7 +226,9 @@ pub fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>( } } -pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>( +/// Checks whether the required [`PhysicalSortExpr`]s are satisfied by the +/// provided [`PhysicalSortExpr`]s. +fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>( provided: &[PhysicalSortExpr], required: &[PhysicalSortExpr], equal_properties: F, @@ -312,6 +305,91 @@ pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> EquivalencePropertie } } +/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more +/// specific than the provided [`PhysicalSortRequirement`]s. +pub fn requirements_compatible<F: FnOnce() -> EquivalenceProperties>( + provided: Option<&[PhysicalSortRequirement]>, + required: Option<&[PhysicalSortRequirement]>, + equal_properties: F, +) -> bool { + match (provided, required) { + (_, None) => true, + (None, Some(_)) => false, + (Some(provided), Some(required)) => { + requirements_compatible_concrete(provided, required, equal_properties) + } + } +} + +/// Checks whether the given [`PhysicalSortRequirement`]s are equal or more +/// specific than the provided [`PhysicalSortRequirement`]s. +fn requirements_compatible_concrete<F: FnOnce() -> EquivalenceProperties>( + provided: &[PhysicalSortRequirement], + required: &[PhysicalSortRequirement], + equal_properties: F, +) -> bool { + if required.len() > provided.len() { + false + } else if required + .iter() + .zip(provided.iter()) + .all(|(req, given)| given.compatible(req)) + { + true + } else if let eq_classes @ [_, ..] = equal_properties().classes() { + required + .iter() + .map(|e| { + normalize_sort_requirement_with_equivalence_properties( + e.clone(), + eq_classes, + ) + }) + .zip(provided.iter().map(|e| { + normalize_sort_requirement_with_equivalence_properties( + e.clone(), + eq_classes, + ) + })) + .all(|(req, given)| given.compatible(&req)) + } else { + false + } +} + +/// This function maps back requirement after ProjectionExec +/// to the Executor for its input. +// Specifically, `ProjectionExec` changes index of `Column`s in the schema of its input executor. +// This function changes requirement given according to ProjectionExec schema to the requirement +// according to schema of input executor to the ProjectionExec. +// For instance, Column{"a", 0} would turn to Column{"a", 1}. Please note that this function assumes that +// name of the Column is unique. If we have a requirement such that Column{"a", 0}, Column{"a", 1}. +// This function will produce incorrect result (It will only emit single Column as a result). +pub fn map_columns_before_projection( + parent_required: &[Arc<dyn PhysicalExpr>], + proj_exprs: &[(Arc<dyn PhysicalExpr>, String)], +) -> Vec<Arc<dyn PhysicalExpr>> { + let column_mapping = proj_exprs + .iter() + .filter_map(|(expr, name)| { + expr.as_any() + .downcast_ref::<Column>() + .map(|column| (name.clone(), column.clone())) + }) + .collect::<HashMap<_, _>>(); + parent_required + .iter() + .filter_map(|r| { + if let Some(column) = r.as_any().downcast_ref::<Column>() { + column_mapping.get(column.name()) + } else { + None + } + }) + .map(|e| Arc::new(e.clone()) as _) + .collect() +} + /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` /// for each entry in the input. If required ordering is None for an entry /// default ordering `ASC, NULLS LAST` if given. diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 6f9babefe8..b396976186 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -824,11 +824,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut assign_map = assignments .iter() .map(|assign| { - let col_name: &Ident = assign - .id - .iter() - .last() - .ok_or(DataFusionError::Plan("Empty column id".to_string()))?; + let col_name: &Ident = assign.id.iter().last().ok_or_else(|| { + DataFusionError::Plan("Empty column id".to_string()) + })?; // Validate that the assignment target column exists table_schema.field_with_unqualified_name(&col_name.value)?; Ok((col_name.value.clone(), assign.value.clone()))