This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fcd94fb5b7 Enhance Enforce Dist capabilities to fix, sub optimal bad 
plans (#7671)
fcd94fb5b7 is described below

commit fcd94fb5b7a49597bde4b27742f1d54b97f149c6
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Sep 29 16:27:21 2023 +0300

    Enhance Enforce Dist capabilities to fix, sub optimal bad plans (#7671)
    
    * Extend capabilities of enforcedist
    
    * Simplifications
    
    * Fix test
    
    * Do not use hard coded partition number
    
    * Add comments, Fix with_new_children of CustomPlan
    
    * Use sub-rule as separate rule.
    
    * Add unbounded method
    
    * Final review
    
    * Move util code to exectree file
    
    * Update variables and comments
    
    * Apply suggestions from code review
    
    Update comments
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Address reviews
    
    * Add new tests, do not satisfy requirement if not absolutely necessary 
enforce dist
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/listing/table.rs    |  60 +--
 .../src/physical_optimizer/enforce_distribution.rs | 417 ++++++++++++++++-----
 .../core/src/physical_optimizer/enforce_sorting.rs |   1 -
 datafusion/core/src/physical_optimizer/mod.rs      |   1 +
 .../core/src/physical_optimizer/optimizer.rs       |   7 +
 .../src/physical_optimizer/output_requirements.rs  | 275 ++++++++++++++
 datafusion/core/src/physical_optimizer/utils.rs    |  24 ++
 .../provider_filter_pushdown.rs                    |  19 +-
 .../physical-plan/src/coalesce_partitions.rs       |   4 +
 datafusion/physical-plan/src/memory.rs             |   9 +-
 datafusion/physical-plan/src/streaming.rs          |   8 +-
 datafusion/sqllogictest/test_files/explain.slt     |   4 +
 datafusion/sqllogictest/test_files/groupby.slt     |   6 +-
 datafusion/sqllogictest/test_files/window.slt      |  66 ++++
 14 files changed, 758 insertions(+), 143 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 8360847e1b..797562e92d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -20,16 +20,8 @@
 use std::str::FromStr;
 use std::{any::Any, sync::Arc};
 
-use arrow::compute::SortOptions;
-use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
-use arrow_schema::Schema;
-use async_trait::async_trait;
-use datafusion_common::FileTypeWriterOptions;
-use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, 
ToDFSchema};
-use datafusion_expr::expr::Sort;
-use datafusion_optimizer::utils::conjunction;
-use datafusion_physical_expr::{create_physical_expr, LexOrdering, 
PhysicalSortExpr};
-use futures::{future, stream, StreamExt, TryStreamExt};
+use super::helpers::{expr_applicable_for_cols, pruned_partition_list, 
split_files};
+use super::PartitionedFile;
 
 use crate::datasource::file_format::file_compression_type::{
     FileCompressionType, FileTypeExt,
@@ -54,13 +46,21 @@ use crate::{
     logical_expr::Expr,
     physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
 };
-use datafusion_common::FileType;
+use arrow::compute::SortOptions;
+use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
+use arrow_schema::Schema;
+use datafusion_common::{
+    internal_err, plan_err, project_schema, FileType, FileTypeWriterOptions, 
SchemaExt,
+    ToDFSchema,
+};
 use datafusion_execution::cache::cache_manager::FileStatisticsCache;
 use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_expr::expr::Sort;
+use datafusion_optimizer::utils::conjunction;
+use datafusion_physical_expr::{create_physical_expr, LexOrdering, 
PhysicalSortExpr};
 
-use super::PartitionedFile;
-
-use super::helpers::{expr_applicable_for_cols, pruned_partition_list, 
split_files};
+use async_trait::async_trait;
+use futures::{future, stream, StreamExt, TryStreamExt};
 
 /// Configuration for creating a [`ListingTable`]
 #[derive(Debug, Clone)]
@@ -996,6 +996,9 @@ impl ListingTable {
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
+    use std::fs::File;
+
     use super::*;
     use crate::datasource::{provider_as_source, MemTable};
     use crate::execution::options::ArrowReadOptions;
@@ -1010,14 +1013,13 @@ mod tests {
         logical_expr::{col, lit},
         test::{columns, object_store::register_test_store},
     };
+
     use arrow::datatypes::{DataType, Schema};
     use arrow::record_batch::RecordBatch;
-    use datafusion_common::assert_contains;
-    use datafusion_common::GetExt;
-    use datafusion_expr::LogicalPlanBuilder;
+    use datafusion_common::{assert_contains, GetExt, ScalarValue};
+    use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
+
     use rstest::*;
-    use std::collections::HashMap;
-    use std::fs::File;
     use tempfile::TempDir;
 
     /// It creates dummy file and checks if it can create unbounded input 
executors.
@@ -2048,6 +2050,7 @@ mod tests {
             }
             None => SessionContext::new(),
         };
+        let target_partition_number = 
session_ctx.state().config().target_partitions();
 
         // Create a new schema with one field called "a" of type Int32
         let schema = Arc::new(Schema::new(vec![Field::new(
@@ -2056,6 +2059,12 @@ mod tests {
             false,
         )]));
 
+        let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(Expr::Column("column1".into())),
+            Operator::GtEq,
+            Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))),
+        ));
+
         // Create a new batch of data to insert into the table
         let batch = RecordBatch::try_new(
             schema.clone(),
@@ -2136,8 +2145,10 @@ mod tests {
         let source = provider_as_source(source_table);
         // Create a table scan logical plan to read from the source table
         let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
-            
.repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))?
+            .filter(filter_predicate)?
             .build()?;
+        // Since logical plan contains a filter, increasing parallelism is 
helpful.
+        // Therefore, we will have 8 partitions in the final plan.
         // Create an insert plan to insert the source data into the initial 
table
         let insert_into_table =
             LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, 
false)?.build()?;
@@ -2146,7 +2157,6 @@ mod tests {
             .state()
             .create_physical_plan(&insert_into_table)
             .await?;
-
         // Execute the physical plan and collect the results
         let res = collect(plan, session_ctx.task_ctx()).await?;
         // Insert returns the number of rows written, in our case this would 
be 6.
@@ -2178,9 +2188,9 @@ mod tests {
         // Assert that the batches read from the file match the expected 
result.
         assert_batches_eq!(expected, &batches);
 
-        // Assert that 6 files were added to the table
+        // Assert that `target_partition_number` many files were added to the 
table.
         let num_files = tmp_dir.path().read_dir()?.count();
-        assert_eq!(num_files, 6);
+        assert_eq!(num_files, target_partition_number);
 
         // Create a physical plan from the insert plan
         let plan = session_ctx
@@ -2221,9 +2231,9 @@ mod tests {
         // Assert that the batches read from the file after the second append 
match the expected result.
         assert_batches_eq!(expected, &batches);
 
-        // Assert that another 6 files were added to the table
+        // Assert that another `target_partition_number` many files were added 
to the table.
         let num_files = tmp_dir.path().read_dir()?.count();
-        assert_eq!(num_files, 12);
+        assert_eq!(num_files, 2 * target_partition_number);
 
         // Return Ok if the function
         Ok(())
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index b2a1a03383..b3fb41ea10 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -27,8 +27,11 @@ use std::sync::Arc;
 
 use crate::config::ConfigOptions;
 use crate::datasource::physical_plan::{CsvExec, ParquetExec};
-use crate::error::{DataFusionError, Result};
-use crate::physical_optimizer::utils::{add_sort_above, get_plan_string, 
ExecTree};
+use crate::error::Result;
+use crate::physical_optimizer::utils::{
+    add_sort_above, get_children_exectrees, get_plan_string, 
is_coalesce_partitions,
+    is_repartition, is_sort_preserving_merge, ExecTree,
+};
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -44,7 +47,6 @@ use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
 
-use datafusion_common::internal_err;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::equivalence::EquivalenceProperties;
@@ -57,6 +59,7 @@ use datafusion_physical_expr::{
 };
 use datafusion_physical_plan::unbounded_output;
 
+use datafusion_physical_plan::windows::{get_best_fitting_window, 
BoundedWindowAggExec};
 use itertools::izip;
 
 /// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -213,9 +216,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
             distribution_context.transform_up(&|distribution_context| {
                 ensure_distribution(distribution_context, config)
             })?;
-
-        // If output ordering is not necessary, removes it
-        update_plan_to_remove_unnecessary_final_order(distribution_context)
+        Ok(distribution_context.plan)
     }
 
     fn name(&self) -> &str {
@@ -979,12 +980,6 @@ fn add_roundrobin_on_top(
             RepartitionExec::try_new(input, 
Partitioning::RoundRobinBatch(n_target))?
                 .with_preserve_order(should_preserve_ordering),
         ) as Arc<dyn ExecutionPlan>;
-        if let Some(exec_tree) = dist_onward {
-            return internal_err!(
-                "ExecTree should have been empty, but got:{:?}",
-                exec_tree
-            );
-        }
 
         // update distribution onward with new operator
         update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
@@ -1112,8 +1107,9 @@ fn add_spm_on_top(
     }
 }
 
-/// Updates the physical plan inside `distribution_context` if having a
-/// `RepartitionExec(RoundRobin)` is not helpful.
+/// Updates the physical plan inside `distribution_context` so that 
distribution
+/// changing operators are removed from the top. If they are necessary, they 
will
+/// be added in subsequent stages.
 ///
 /// Assume that following plan is given:
 /// ```text
@@ -1122,14 +1118,13 @@ fn add_spm_on_top(
 /// "    ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, 
b, c, d, e], output_ordering=\[a@0 ASC]",
 /// ```
 ///
-/// `RepartitionExec` at the top is unnecessary. Since it doesn't help with 
increasing parallelism.
-/// This function removes top repartition, and returns following plan.
+/// Since `RepartitionExec`s change the distribution, this function removes
+/// them and returns following plan:
 ///
 /// ```text
-/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
-/// "  ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, 
c, d, e], output_ordering=\[a@0 ASC]",
+/// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, 
d, e], output_ordering=\[a@0 ASC]",
 /// ```
-fn remove_unnecessary_repartition(
+fn remove_dist_changing_operators(
     distribution_context: DistributionContext,
 ) -> Result<DistributionContext> {
     let DistributionContext {
@@ -1137,22 +1132,17 @@ fn remove_unnecessary_repartition(
         mut distribution_onwards,
     } = distribution_context;
 
-    // Remove any redundant RoundRobin at the start:
-    if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() 
{
-        if let Partitioning::RoundRobinBatch(n_out) = 
repartition.partitioning() {
-            // Repartition is useless:
-            if *n_out <= 
repartition.input().output_partitioning().partition_count() {
-                let mut new_distribution_onwards =
-                    vec![None; repartition.input().children().len()];
-                if let Some(exec_tree) = &distribution_onwards[0] {
-                    for child in &exec_tree.children {
-                        new_distribution_onwards[child.idx] = 
Some(child.clone());
-                    }
-                }
-                plan = repartition.input().clone();
-                distribution_onwards = new_distribution_onwards;
-            }
-        }
+    // Remove any distribution changing operators at the beginning:
+    // Note that they will be re-inserted later on if necessary or helpful.
+    while is_repartition(&plan)
+        || is_coalesce_partitions(&plan)
+        || is_sort_preserving_merge(&plan)
+    {
+        // All of above operators have a single child. When we remove the top
+        // operator, we take the first child.
+        plan = plan.children()[0].clone();
+        distribution_onwards =
+            get_children_exectrees(plan.children().len(), 
&distribution_onwards[0]);
     }
 
     // Create a plan with the updated children:
@@ -1162,29 +1152,6 @@ fn remove_unnecessary_repartition(
     })
 }
 
-/// Changes each child of the `dist_context.plan` such that they no longer
-/// use order preserving variants, if no ordering is required at the output
-/// of the physical plan (there is no global ordering requirement by the 
query).
-fn update_plan_to_remove_unnecessary_final_order(
-    dist_context: DistributionContext,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    let DistributionContext {
-        plan,
-        distribution_onwards,
-    } = dist_context;
-    let new_children = izip!(plan.children(), distribution_onwards)
-        .map(|(mut child, mut dist_onward)| {
-            replace_order_preserving_variants(&mut child, &mut dist_onward)?;
-            Ok(child)
-        })
-        .collect::<Result<Vec<_>>>()?;
-    if !new_children.is_empty() {
-        plan.with_new_children(new_children)
-    } else {
-        Ok(plan)
-    }
-}
-
 /// Updates the physical plan `input` by using `dist_onward` replace order 
preserving operator variants
 /// with their corresponding operators that do not preserve order. It is a 
wrapper for `replace_order_preserving_variants_helper`
 fn replace_order_preserving_variants(
@@ -1224,18 +1191,16 @@ fn replace_order_preserving_variants_helper(
     for child in &exec_tree.children {
         updated_children[child.idx] = 
replace_order_preserving_variants_helper(child)?;
     }
-    if let Some(spm) = exec_tree
-        .plan
-        .as_any()
-        .downcast_ref::<SortPreservingMergeExec>()
-    {
-        return Ok(Arc::new(CoalescePartitionsExec::new(spm.input().clone())));
+    if is_sort_preserving_merge(&exec_tree.plan) {
+        return Ok(Arc::new(CoalescePartitionsExec::new(
+            updated_children[0].clone(),
+        )));
     }
     if let Some(repartition) = 
exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
         if repartition.preserve_order() {
             return Ok(Arc::new(
                 RepartitionExec::try_new(
-                    repartition.input().clone(),
+                    updated_children[0].clone(),
                     repartition.partitioning().clone(),
                 )?
                 .with_preserve_order(false),
@@ -1275,12 +1240,29 @@ fn ensure_distribution(
         repartition_beneficial_stat =
             stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
     }
-
     // Remove unnecessary repartition from the physical plan if any
     let DistributionContext {
-        plan,
+        mut plan,
         mut distribution_onwards,
-    } = remove_unnecessary_repartition(dist_context)?;
+    } = remove_dist_changing_operators(dist_context)?;
+
+    if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
+        if let Some(updated_window) = get_best_fitting_window(
+            exec.window_expr(),
+            exec.input(),
+            &exec.partition_keys,
+        )? {
+            plan = updated_window;
+        }
+    } else if let Some(exec) = 
plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
+        if let Some(updated_window) = get_best_fitting_window(
+            exec.window_expr(),
+            exec.input(),
+            &exec.partition_keys,
+        )? {
+            plan = updated_window;
+        }
+    };
 
     let n_children = plan.children().len();
     // This loop iterates over all the children to:
@@ -1369,19 +1351,23 @@ fn ensure_distribution(
                 // Either:
                 // - Ordering requirement cannot be satisfied by preserving 
ordering through repartitions, or
                 // - using order preserving variant is not desirable.
-                if !ordering_satisfy_requirement_concrete(
+                let ordering_satisfied = ordering_satisfy_requirement_concrete(
                     existing_ordering,
                     required_input_ordering,
                     || child.equivalence_properties(),
                     || child.ordering_equivalence_properties(),
-                ) || !order_preserving_variants_desirable
-                {
+                );
+                if !ordering_satisfied || !order_preserving_variants_desirable 
{
                     replace_order_preserving_variants(&mut child, 
dist_onward)?;
-                    let sort_expr = PhysicalSortRequirement::to_sort_exprs(
-                        required_input_ordering.clone(),
-                    );
-                    // Make sure to satisfy ordering requirement
-                    add_sort_above(&mut child, sort_expr, None)?;
+                    // If ordering requirements were satisfied before 
repartitioning,
+                    // make sure ordering requirements are still satisfied 
after.
+                    if ordering_satisfied {
+                        // Make sure to satisfy ordering requirement:
+                        let sort_expr = PhysicalSortRequirement::to_sort_exprs(
+                            required_input_ordering.clone(),
+                        );
+                        add_sort_above(&mut child, sort_expr, None)?;
+                    }
                 }
                 // Stop tracking distribution changing operators
                 *dist_onward = None;
@@ -1690,6 +1676,7 @@ mod tests {
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
     use crate::physical_optimizer::enforce_sorting::EnforceSorting;
+    use crate::physical_optimizer::output_requirements::OutputRequirements;
     use crate::physical_plan::aggregates::{
         AggregateExec, AggregateMode, PhysicalGroupBy,
     };
@@ -1703,9 +1690,12 @@ mod tests {
     use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
     use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, 
Statistics};
 
-    use crate::physical_optimizer::test_utils::repartition_exec;
+    use crate::physical_optimizer::test_utils::{
+        coalesce_partitions_exec, repartition_exec,
+    };
     use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use crate::physical_plan::sorts::sort::SortExec;
+
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_common::ScalarValue;
@@ -1714,7 +1704,7 @@ mod tests {
     use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
     use datafusion_physical_expr::{
         expressions, expressions::binary, expressions::lit, 
expressions::Column,
-        PhysicalExpr, PhysicalSortExpr,
+        LexOrdering, PhysicalExpr, PhysicalSortExpr,
     };
 
     /// Models operators like BoundedWindowExec that require an input
@@ -1722,11 +1712,23 @@ mod tests {
     #[derive(Debug)]
     struct SortRequiredExec {
         input: Arc<dyn ExecutionPlan>,
+        expr: LexOrdering,
     }
 
     impl SortRequiredExec {
         fn new(input: Arc<dyn ExecutionPlan>) -> Self {
-            Self { input }
+            let expr = input.output_ordering().unwrap_or(&[]).to_vec();
+            Self { input, expr }
+        }
+
+        fn new_with_requirement(
+            input: Arc<dyn ExecutionPlan>,
+            requirement: Vec<PhysicalSortExpr>,
+        ) -> Self {
+            Self {
+                input,
+                expr: requirement,
+            }
         }
     }
 
@@ -1736,7 +1738,8 @@ mod tests {
             _t: DisplayFormatType,
             f: &mut std::fmt::Formatter,
         ) -> std::fmt::Result {
-            write!(f, "SortRequiredExec")
+            let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
+            write!(f, "SortRequiredExec: [{}]", expr.join(","))
         }
     }
 
@@ -1778,7 +1781,10 @@ mod tests {
         ) -> Result<Arc<dyn ExecutionPlan>> {
             assert_eq!(children.len(), 1);
             let child = children.pop().unwrap();
-            Ok(Arc::new(Self::new(child)))
+            Ok(Arc::new(Self::new_with_requirement(
+                child,
+                self.expr.clone(),
+            )))
         }
 
         fn execute(
@@ -2054,6 +2060,13 @@ mod tests {
         Arc::new(SortRequiredExec::new(input))
     }
 
+    fn sort_required_exec_with_req(
+        input: Arc<dyn ExecutionPlan>,
+        sort_exprs: LexOrdering,
+    ) -> Arc<dyn ExecutionPlan> {
+        Arc::new(SortRequiredExec::new_with_requirement(input, sort_exprs))
+    }
+
     fn trim_plan_display(plan: &str) -> Vec<&str> {
         plan.split('\n')
             .map(|s| s.trim())
@@ -2118,10 +2131,15 @@ mod tests {
             //       `EnforceSorting` and `EnforceDistribution`.
             // TODO: Orthogonalize the tests here just to verify 
`EnforceDistribution` and create
             //       new tests for the cascade.
+
+            // Add the ancillary output requirements operator at the start:
+            let optimizer = OutputRequirements::new_add_mode();
+            let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+
             let optimized = if $FIRST_ENFORCE_DIST {
                 // Run enforce distribution rule first:
                 let optimizer = EnforceDistribution::new();
-                let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+                let optimized = optimizer.optimize(optimized, &config)?;
                 // The rule should be idempotent.
                 // Re-running this rule shouldn't introduce unnecessary 
operators.
                 let optimizer = EnforceDistribution::new();
@@ -2133,7 +2151,7 @@ mod tests {
             } else {
                 // Run the enforce sorting rule first:
                 let optimizer = EnforceSorting::new();
-                let optimized = optimizer.optimize($PLAN.clone(), &config)?;
+                let optimized = optimizer.optimize(optimized, &config)?;
                 // Run enforce distribution rule:
                 let optimizer = EnforceDistribution::new();
                 let optimized = optimizer.optimize(optimized, &config)?;
@@ -2144,6 +2162,10 @@ mod tests {
                 optimized
             };
 
+            // Remove the ancillary output requirements operator when done:
+            let optimizer = OutputRequirements::new_remove_mode();
+            let optimized = optimizer.optimize(optimized, &config)?;
+
             // Now format correctly
             let plan = 
displayable(optimized.as_ref()).indent(true).to_string();
             let actual_lines = trim_plan_display(&plan);
@@ -2978,7 +3000,7 @@ mod tests {
                 format!("SortMergeJoin: join_type={join_type}, on=[(a@0, 
c@2)]");
 
             let expected = match join_type {
-                // Should include 6 RepartitionExecs 3 SortExecs
+                // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 
3 SortExecs
                 JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti =>
                     vec![
                         top_join_plan.as_str(),
@@ -2997,9 +3019,18 @@ mod tests {
                         "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                         "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                     ],
-                // Should include 7 RepartitionExecs
+                // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 
4 SortExecs
+                // Since ordering of the left child is not preserved after 
SortMergeJoin
+                // when mode is Right, RgihtSemi, RightAnti, Full
+                // - We need to add one additional SortExec after 
SortMergeJoin in contrast the test cases
+                //   when mode is Inner, Left, LeftSemi, LeftAnti
+                // Similarly, since partitioning of the left side is not 
preserved
+                // when mode is Right, RgihtSemi, RightAnti, Full
+                // - We need to add one additional Hash Repartition after 
SortMergeJoin in contrast the test
+                //   cases when mode is Inner, Left, LeftSemi, LeftAnti
                 _ => vec![
                         top_join_plan.as_str(),
+                        // Below 2 operators are differences introduced, when 
join mode is changed
                         "SortExec: expr=[a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
                         join_plan.as_str(),
@@ -3021,7 +3052,7 @@ mod tests {
             assert_optimized!(expected, top_join.clone(), true, true);
 
             let expected_first_sort_enforcement = match join_type {
-                // Should include 3 RepartitionExecs 3 SortExecs
+                // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 
3 SortExecs
                 JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti =>
                     vec![
                         top_join_plan.as_str(),
@@ -3040,9 +3071,18 @@ mod tests {
                         "SortExec: expr=[c@2 ASC]",
                         "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                     ],
-                // Should include 8 RepartitionExecs (4 of them preserves 
ordering)
+                // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 
4 SortExecs
+                // Since ordering of the left child is not preserved after 
SortMergeJoin
+                // when mode is Right, RgihtSemi, RightAnti, Full
+                // - We need to add one additional SortExec after 
SortMergeJoin in contrast the test cases
+                //   when mode is Inner, Left, LeftSemi, LeftAnti
+                // Similarly, since partitioning of the left side is not 
preserved
+                // when mode is Right, RgihtSemi, RightAnti, Full
+                // - We need to add one additional Hash Repartition and 
Roundrobin repartition after
+                //   SortMergeJoin in contrast the test cases when mode is 
Inner, Left, LeftSemi, LeftAnti
                 _ => vec![
                     top_join_plan.as_str(),
+                    // Below 4 operators are differences introduced, when join 
mode is changed
                     "SortPreservingRepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10",
                     "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                     "SortExec: expr=[a@0 ASC]",
@@ -3083,7 +3123,7 @@ mod tests {
                         format!("SortMergeJoin: join_type={join_type}, 
on=[(b1@6, c@2)]");
 
                     let expected = match join_type {
-                        // Should include 3 RepartitionExecs and 3 SortExecs
+                        // Should include 6 RepartitionExecs(3 hash, 3 
round-robin) and 3 SortExecs
                         JoinType::Inner | JoinType::Right => vec![
                             top_join_plan.as_str(),
                             join_plan.as_str(),
@@ -3101,8 +3141,8 @@ mod tests {
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         ],
-                        // Should include 4 RepartitionExecs and 4 SortExecs
-                        _ => vec![
+                        // Should include 7 RepartitionExecs (4 hash, 3 
round-robin) and 4 SortExecs
+                        JoinType::Left | JoinType::Full => vec![
                             top_join_plan.as_str(),
                             "SortExec: expr=[b1@6 ASC]",
                             "RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10",
@@ -3121,6 +3161,8 @@ mod tests {
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         ],
+                        // this match arm cannot be reached
+                        _ => unreachable!()
                     };
                     assert_optimized!(expected, top_join.clone(), true, true);
 
@@ -3144,7 +3186,7 @@ mod tests {
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         ],
                         // Should include 8 RepartitionExecs (4 of them 
preserves order) and 4 SortExecs
-                        _ => vec![
+                        JoinType::Left | JoinType::Full => vec![
                             top_join_plan.as_str(),
                             "SortPreservingRepartitionExec: 
partitioning=Hash([b1@6], 10), input_partitions=10",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
@@ -3165,6 +3207,8 @@ mod tests {
                             "SortExec: expr=[c@2 ASC]",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         ],
+                        // this match arm cannot be reached
+                        _ => unreachable!()
                     };
                     assert_optimized!(
                         expected_first_sort_enforcement,
@@ -3301,6 +3345,16 @@ mod tests {
             "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[a@0 ASC]",
         ];
         assert_optimized!(expected, exec, true);
+        // In this case preserving ordering through order preserving operators 
is not desirable
+        // (according to flag: bounded_order_preserving_variants)
+        // hence in this case ordering lost during CoalescePartitionsExec and 
re-introduced with
+        // SortExec at the top.
+        let expected = &[
+            "SortExec: expr=[a@0 ASC]",
+            "CoalescePartitionsExec",
+            "CoalesceBatchesExec: target_batch_size=4096",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[a@0 ASC]",
+        ];
         assert_optimized!(expected, exec, false);
         Ok(())
     }
@@ -3435,7 +3489,7 @@ mod tests {
             sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(), 
false)));
 
         let expected = &[
-            "SortRequiredExec",
+            "SortRequiredExec: [c@2 ASC]",
             "FilterExec: c@2 = 0",
             // We can use repartition here, ordering requirement by 
SortRequiredExec
             // is still satisfied.
@@ -3541,6 +3595,12 @@ mod tests {
         ];
 
         assert_optimized!(expected, plan.clone(), true);
+
+        let expected = &[
+            "SortExec: expr=[c@2 ASC]",
+            "CoalescePartitionsExec",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC]",
+        ];
         assert_optimized!(expected, plan, false);
         Ok(())
     }
@@ -3565,6 +3625,14 @@ mod tests {
         ];
 
         assert_optimized!(expected, plan.clone(), true);
+
+        let expected = &[
+            "SortExec: expr=[c@2 ASC]",
+            "CoalescePartitionsExec",
+            "UnionExec",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC]",
+        ];
         assert_optimized!(expected, plan, false);
         Ok(())
     }
@@ -3583,7 +3651,7 @@ mod tests {
 
         // during repartitioning ordering is preserved
         let expected = &[
-            "SortRequiredExec",
+            "SortRequiredExec: [c@2 ASC]",
             "FilterExec: c@2 = 0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC]",
@@ -3619,7 +3687,7 @@ mod tests {
         let expected = &[
             "UnionExec",
             // union input 1: no repartitioning
-            "SortRequiredExec",
+            "SortRequiredExec: [c@2 ASC]",
             "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC]",
             // union input 2: should repartition
             "FilterExec: c@2 = 0",
@@ -3687,16 +3755,13 @@ mod tests {
             ("c".to_string(), "c".to_string()),
         ];
         // sorted input
-        let plan = sort_preserving_merge_exec(
-            sort_key.clone(),
-            projection_exec_with_alias(
-                parquet_exec_multiple_sorted(vec![sort_key]),
-                alias,
-            ),
-        );
+        let plan = sort_required_exec(projection_exec_with_alias(
+            parquet_exec_multiple_sorted(vec![sort_key]),
+            alias,
+        ));
 
         let expected = &[
-            "SortPreservingMergeExec: [c@2 ASC]",
+            "SortRequiredExec: [c@2 ASC]",
             // Since this projection is trivial, increasing parallelism is not 
beneficial
             "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
             "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC]",
@@ -4186,11 +4251,11 @@ mod tests {
 
         // no parallelization, because SortRequiredExec doesn't benefit from 
increased parallelism
         let expected_parquet = &[
-            "SortRequiredExec",
+            "SortRequiredExec: [c@2 ASC]",
             "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC]",
         ];
         let expected_csv = &[
-            "SortRequiredExec",
+            "SortRequiredExec: [c@2 ASC]",
             "CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], has_header=false",
         ];
 
@@ -4348,6 +4413,14 @@ mod tests {
         ];
 
         assert_optimized!(expected, physical_plan.clone(), true);
+
+        let expected = &[
+            "SortExec: expr=[c@2 ASC]",
+            "CoalescePartitionsExec",
+            "FilterExec: c@2 = 0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC]",
+        ];
         assert_optimized!(expected, physical_plan, false);
 
         Ok(())
@@ -4377,6 +4450,15 @@ mod tests {
         ];
 
         assert_optimized!(expected, physical_plan.clone(), true);
+
+        let expected = &[
+            "SortExec: expr=[a@0 ASC]",
+            "CoalescePartitionsExec",
+            "SortExec: expr=[a@0 ASC]",
+            "FilterExec: c@2 = 0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC]",
+        ];
         assert_optimized!(expected, physical_plan, false);
 
         Ok(())
@@ -4404,6 +4486,86 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
+        let schema = schema();
+        let sort_key = vec![PhysicalSortExpr {
+            expr: col("a", &schema).unwrap(),
+            options: SortOptions::default(),
+        }];
+        let input = parquet_exec();
+        let physical_plan = sort_required_exec_with_req(filter_exec(input), 
sort_key);
+        let expected = &[
+            // Ordering requirement of sort required exec is NOT satisfied
+            // by existing ordering at the source.
+            "SortRequiredExec: [a@0 ASC]",
+            "FilterExec: c@2 = 0",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e]",
+        ];
+        assert_plan_txt!(expected, physical_plan);
+
+        let expected = &[
+            "SortRequiredExec: [a@0 ASC]",
+            // Since at the start of the rule ordering requirement is not 
satisfied
+            // EnforceDistribution rule doesn't satisfy this requirement 
either.
+            "FilterExec: c@2 = 0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e]",
+        ];
+
+        let mut config = ConfigOptions::new();
+        config.execution.target_partitions = 10;
+        config.optimizer.enable_round_robin_repartition = true;
+        config.optimizer.bounded_order_preserving_variants = false;
+        let distribution_plan =
+            EnforceDistribution::new().optimize(physical_plan, &config)?;
+        assert_plan_txt!(expected, distribution_plan);
+
+        Ok(())
+    }
+
+    #[test]
+    fn put_sort_when_input_is_valid() -> Result<()> {
+        let schema = schema();
+        let sort_key = vec![PhysicalSortExpr {
+            expr: col("a", &schema).unwrap(),
+            options: SortOptions::default(),
+        }];
+        let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
+        let physical_plan = sort_required_exec_with_req(filter_exec(input), 
sort_key);
+
+        let expected = &[
+            // Ordering requirement of sort required exec is satisfied
+            // by existing ordering at the source.
+            "SortRequiredExec: [a@0 ASC]",
+            "FilterExec: c@2 = 0",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[a@0 ASC]",
+        ];
+        assert_plan_txt!(expected, physical_plan);
+
+        let expected = &[
+            "SortRequiredExec: [a@0 ASC]",
+            // Since at the start of the rule ordering requirement is satisfied
+            // EnforceDistribution rule satisfy this requirement also.
+            // ordering is re-satisfied by introduction of SortExec.
+            "SortExec: expr=[a@0 ASC]",
+            "FilterExec: c@2 = 0",
+            // ordering is lost here
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[a@0 ASC]",
+        ];
+
+        let mut config = ConfigOptions::new();
+        config.execution.target_partitions = 10;
+        config.optimizer.enable_round_robin_repartition = true;
+        config.optimizer.bounded_order_preserving_variants = false;
+        let distribution_plan =
+            EnforceDistribution::new().optimize(physical_plan, &config)?;
+        assert_plan_txt!(expected, distribution_plan);
+
+        Ok(())
+    }
+
     #[test]
     fn do_not_add_unnecessary_hash() -> Result<()> {
         let schema = schema();
@@ -4458,4 +4620,51 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn optimize_away_unnecessary_repartition() -> Result<()> {
+        let physical_plan = 
coalesce_partitions_exec(repartition_exec(parquet_exec()));
+        let expected = &[
+            "CoalescePartitionsExec",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e]",
+        ];
+        plans_matches_expected!(expected, physical_plan.clone());
+
+        let expected =
+            &["ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e]"];
+
+        assert_optimized!(expected, physical_plan.clone(), true);
+        assert_optimized!(expected, physical_plan, false);
+
+        Ok(())
+    }
+
+    #[test]
+    fn optimize_away_unnecessary_repartition2() -> Result<()> {
+        let physical_plan = 
filter_exec(repartition_exec(coalesce_partitions_exec(
+            filter_exec(repartition_exec(parquet_exec())),
+        )));
+        let expected = &[
+            "FilterExec: c@2 = 0",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "    CoalescePartitionsExec",
+            "      FilterExec: c@2 = 0",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
+        ];
+        plans_matches_expected!(expected, physical_plan.clone());
+
+        let expected = &[
+            "FilterExec: c@2 = 0",
+            "FilterExec: c@2 = 0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e]",
+        ];
+
+        assert_optimized!(expected, physical_plan.clone(), true);
+        assert_optimized!(expected, physical_plan, false);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index a149330181..c4b72a7cb3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -2035,7 +2035,6 @@ mod tests {
         let orig_plan =
             Arc::new(SortExec::new(sort_exprs, repartition)) as Arc<dyn 
ExecutionPlan>;
         let actual = get_plan_string(&orig_plan);
-        println!("{:?}", actual);
         let expected_input = vec![
             "SortExec: expr=[nullable_col@0 ASC]",
             "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index 0801a9bc59..9e22bff340 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -28,6 +28,7 @@ pub mod enforce_distribution;
 pub mod enforce_sorting;
 pub mod join_selection;
 pub mod optimizer;
+pub mod output_requirements;
 pub mod pipeline_checker;
 pub mod pruning;
 pub mod replace_with_order_preserving_variants;
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs 
b/datafusion/core/src/physical_optimizer/optimizer.rs
index 5de70efe3c..95035e5f81 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -26,6 +26,7 @@ use 
crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAgg
 use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
 use crate::physical_optimizer::enforce_sorting::EnforceSorting;
 use crate::physical_optimizer::join_selection::JoinSelection;
+use crate::physical_optimizer::output_requirements::OutputRequirements;
 use crate::physical_optimizer::pipeline_checker::PipelineChecker;
 use crate::physical_optimizer::topk_aggregation::TopKAggregation;
 use crate::{error::Result, physical_plan::ExecutionPlan};
@@ -68,6 +69,9 @@ impl PhysicalOptimizer {
     /// Create a new optimizer using the recommended list of rules
     pub fn new() -> Self {
         let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+            // If there is a output requirement of the query, make sure that
+            // this information is not lost across different rules during 
optimization.
+            Arc::new(OutputRequirements::new_add_mode()),
             Arc::new(AggregateStatistics::new()),
             // Statistics-based join selection will change the Auto mode to a 
real join implementation,
             // like collect left, or hash join, or future sort merge join, 
which will influence the
@@ -90,6 +94,9 @@ impl PhysicalOptimizer {
             // The CoalesceBatches rule will not influence the distribution 
and ordering of the
             // whole plan tree. Therefore, to avoid influencing other rules, 
it should run last.
             Arc::new(CoalesceBatches::new()),
+            // Remove the ancillary output requirement operator since we are 
done with the planning
+            // phase.
+            Arc::new(OutputRequirements::new_remove_mode()),
             // The PipelineChecker rule will reject non-runnable query plans 
that use
             // pipeline-breaking operators on infinite input(s). The rule 
generates a
             // diagnostic error message when this happens. It makes no changes 
to the
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs 
b/datafusion/core/src/physical_optimizer/output_requirements.rs
new file mode 100644
index 0000000000..4b687d7f35
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/output_requirements.rs
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The GlobalOrderRequire optimizer rule either:
+//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of 
global
+//!   ordering and distribution requirement across rules, or
+//! - Removes the auxiliary `OutputRequirementExec` operator from the physical 
plan.
+//!   Since the `OutputRequirementExec` operator is only a helper operator, it
+//!   shouldn't occur in the final plan (i.e. the executed plan).
+
+use std::sync::Arc;
+
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+use arrow_schema::SchemaRef;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{Result, Statistics};
+use datafusion_physical_expr::{
+    Distribution, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement,
+};
+use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+
+/// This rule either adds or removes [`OutputRequirements`]s to/from the 
physical
+/// plan according to its `mode` attribute, which is set by the constructors
+/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
+/// the global requirements (ordering and distribution) across rules.
+///
+/// The primary usecase of this node and rule is to specify and preserve the 
desired output
+/// ordering and distribution the entire plan. When sending to a single 
client, a single partition may
+/// be desirable, but when sending to a multi-partitioned writer, keeping 
multiple partitions may be
+/// better.
+#[derive(Debug)]
+pub struct OutputRequirements {
+    mode: RuleMode,
+}
+
+impl OutputRequirements {
+    /// Create a new rule which works in `Add` mode; i.e. it simply adds a
+    /// top-level [`OutputRequirementExec`] into the physical plan to keep 
track
+    /// of global ordering and distribution requirements if there are any.
+    /// Note that this rule should run at the beginning.
+    pub fn new_add_mode() -> Self {
+        Self {
+            mode: RuleMode::Add,
+        }
+    }
+
+    /// Create a new rule which works in `Remove` mode; i.e. it simply removes
+    /// the top-level [`OutputRequirementExec`] from the physical plan if 
there is
+    /// any. We do this because a `OutputRequirementExec` is an ancillary,
+    /// non-executable operator whose sole purpose is to track global
+    /// requirements during optimization. Therefore, a
+    /// `OutputRequirementExec` should not appear in the final plan.
+    pub fn new_remove_mode() -> Self {
+        Self {
+            mode: RuleMode::Remove,
+        }
+    }
+}
+
+#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)]
+enum RuleMode {
+    Add,
+    Remove,
+}
+
+/// An ancillary, non-executable operator whose sole purpose is to track global
+/// requirements during optimization. It imposes
+/// - the ordering requirement in its `order_requirement` attribute.
+/// - the distribution requirement in its `dist_requirement` attribute.
+///
+/// See [`OutputRequirements`] for more details
+#[derive(Debug)]
+struct OutputRequirementExec {
+    input: Arc<dyn ExecutionPlan>,
+    order_requirement: Option<LexOrderingReq>,
+    dist_requirement: Distribution,
+}
+
+impl OutputRequirementExec {
+    fn new(
+        input: Arc<dyn ExecutionPlan>,
+        requirements: Option<LexOrderingReq>,
+        dist_requirement: Distribution,
+    ) -> Self {
+        Self {
+            input,
+            order_requirement: requirements,
+            dist_requirement,
+        }
+    }
+
+    fn input(&self) -> Arc<dyn ExecutionPlan> {
+        self.input.clone()
+    }
+}
+
+impl DisplayAs for OutputRequirementExec {
+    fn fmt_as(
+        &self,
+        _t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "OutputRequirementExec")
+    }
+}
+
+impl ExecutionPlan for OutputRequirementExec {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    fn output_partitioning(&self) -> crate::physical_plan::Partitioning {
+        self.input.output_partitioning()
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![self.dist_requirement.clone()]
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
+        vec![self.order_requirement.clone()]
+    }
+
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        // Has a single child
+        Ok(children[0])
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(Self::new(
+            children.remove(0), // has a single child
+            self.order_requirement.clone(),
+            self.dist_requirement.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<crate::execution::context::TaskContext>,
+    ) -> Result<crate::physical_plan::SendableRecordBatchStream> {
+        unreachable!();
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.input.statistics()
+    }
+}
+
+impl PhysicalOptimizerRule for OutputRequirements {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match self.mode {
+            RuleMode::Add => require_top_ordering(plan),
+            RuleMode::Remove => plan.transform_up(&|plan| {
+                if let Some(sort_req) =
+                    plan.as_any().downcast_ref::<OutputRequirementExec>()
+                {
+                    Ok(Transformed::Yes(sort_req.input()))
+                } else {
+                    Ok(Transformed::No(plan))
+                }
+            }),
+        }
+    }
+
+    fn name(&self) -> &str {
+        "OutputRequirements"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// This functions adds ancillary `OutputRequirementExec` to the the physical 
plan, so that
+/// global requirements are not lost during optimization.
+fn require_top_ordering(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn 
ExecutionPlan>> {
+    let (new_plan, is_changed) = require_top_ordering_helper(plan)?;
+    if is_changed {
+        Ok(new_plan)
+    } else {
+        // Add `OutputRequirementExec` to the top, with no specified ordering 
and distribution requirement.
+        Ok(Arc::new(OutputRequirementExec::new(
+            new_plan,
+            // there is no ordering requirement
+            None,
+            Distribution::UnspecifiedDistribution,
+        )) as _)
+    }
+}
+
+/// Helper function that adds an ancillary `OutputRequirementExec` to the 
given plan.
+/// First entry in the tuple is resulting plan, second entry indicates whether 
any
+/// `OutputRequirementExec` is added to the plan.
+fn require_top_ordering_helper(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<(Arc<dyn ExecutionPlan>, bool)> {
+    let mut children = plan.children();
+    // Global ordering defines desired ordering in the final result.
+    if children.len() != 1 {
+        Ok((plan, false))
+    } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let req_ordering = sort_exec.output_ordering().unwrap_or(&[]);
+        let req_dist = sort_exec.required_input_distribution()[0].clone();
+        let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
+        Ok((
+            Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) 
as _,
+            true,
+        ))
+    } else if let Some(spm) = 
plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
+        let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr());
+        Ok((
+            Arc::new(OutputRequirementExec::new(
+                plan,
+                Some(reqs),
+                Distribution::SinglePartition,
+            )) as _,
+            true,
+        ))
+    } else if plan.maintains_input_order()[0]
+        && plan.required_input_ordering()[0].is_none()
+    {
+        // Keep searching for a `SortExec` as long as ordering is maintained,
+        // and on-the-way operators do not themselves require an ordering.
+        // When an operator requires an ordering, any `SortExec` below can not
+        // be responsible for (i.e. the originator of) the global ordering.
+        let (new_child, is_changed) =
+            require_top_ordering_helper(children.swap_remove(0))?;
+        Ok((plan.with_new_children(vec![new_child])?, is_changed))
+    } else {
+        // Stop searching, there is no global ordering desired for the query.
+        Ok((plan, false))
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index 21c976e07a..0d6c85f9f2 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -73,6 +73,30 @@ impl ExecTree {
     }
 }
 
+/// Get `ExecTree` for each child of the plan if they are tracked.
+/// # Arguments
+///
+/// * `n_children` - Children count of the plan of interest
+/// * `onward` - Contains `Some(ExecTree)` of the plan tracked.
+///            - Contains `None` is plan is not tracked.
+///
+/// # Returns
+///
+/// A `Vec<Option<ExecTree>>` that contains tracking information of each child.
+/// If a child is `None`, it is not tracked. If `Some(ExecTree)` child is 
tracked also.
+pub(crate) fn get_children_exectrees(
+    n_children: usize,
+    onward: &Option<ExecTree>,
+) -> Vec<Option<ExecTree>> {
+    let mut children_onward = vec![None; n_children];
+    if let Some(exec_tree) = &onward {
+        for child in &exec_tree.children {
+            children_onward[child.idx] = Some(child.clone());
+        }
+    }
+    children_onward
+}
+
 /// This utility function adds a `SortExec` above an operator according to the
 /// given ordering requirements while preserving the original partitioning.
 pub fn add_sort_above(
diff --git 
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs 
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index 79214092fa..73085937cb 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -15,10 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::ops::Deref;
+use std::sync::Arc;
+
 use arrow::array::{Int32Builder, Int64Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use async_trait::async_trait;
 use datafusion::datasource::provider::{TableProvider, TableType};
 use datafusion::error::Result;
 use datafusion::execution::context::{SessionContext, SessionState, 
TaskContext};
@@ -32,10 +34,10 @@ use datafusion::physical_plan::{
 use datafusion::prelude::*;
 use datafusion::scalar::ScalarValue;
 use datafusion_common::cast::as_primitive_array;
-use datafusion_common::{not_impl_err, DataFusionError};
+use datafusion_common::{internal_err, not_impl_err, DataFusionError};
 use datafusion_expr::expr::{BinaryExpr, Cast};
-use std::ops::Deref;
-use std::sync::Arc;
+
+use async_trait::async_trait;
 
 fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
     let mut builder = Int32Builder::with_capacity(num_rows);
@@ -96,9 +98,14 @@ impl ExecutionPlan for CustomPlan {
 
     fn with_new_children(
         self: Arc<Self>,
-        _: Vec<Arc<dyn ExecutionPlan>>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        unreachable!()
+        // CustomPlan has no children
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            internal_err!("Children cannot be replaced in {self:?}")
+        }
     }
 
     fn execute(
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs 
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 8eddf57ae5..646d42795b 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -104,6 +104,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self.input.equivalence_properties()
     }
 
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index b29c8e9c7b..1dcdae56cf 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -126,9 +126,14 @@ impl ExecutionPlan for MemoryExec {
 
     fn with_new_children(
         self: Arc<Self>,
-        _: Vec<Arc<dyn ExecutionPlan>>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        internal_err!("Children cannot be replaced in {self:?}")
+        // MemoryExec has no children
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            internal_err!("Children cannot be replaced in {self:?}")
+        }
     }
 
     fn execute(
diff --git a/datafusion/physical-plan/src/streaming.rs 
b/datafusion/physical-plan/src/streaming.rs
index 00809b71e4..cb972fa41e 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -163,9 +163,13 @@ impl ExecutionPlan for StreamingTableExec {
 
     fn with_new_children(
         self: Arc<Self>,
-        _children: Vec<Arc<dyn ExecutionPlan>>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        internal_err!("Children cannot be replaced in {self:?}")
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            internal_err!("Children cannot be replaced in {self:?}")
+        }
     }
 
     fn execute(
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index b1ba1eb36d..27ab8671e9 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -242,12 +242,16 @@ logical_plan after eliminate_projection SAME TEXT AS ABOVE
 logical_plan after push_down_limit SAME TEXT AS ABOVE
 logical_plan TableScan: simple_explain_test projection=[a, b, c]
 initial_physical_plan CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], has_header=true
+physical_plan after OutputRequirements
+OutputRequirementExec
+--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], has_header=true
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE
 physical_plan after join_selection SAME TEXT AS ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
+physical_plan after OutputRequirements CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], has_header=true
 physical_plan after PipelineChecker SAME TEXT AS ABOVE
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], has_header=true
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index ffef93837b..5bb0f31ed5 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2014,9 +2014,9 @@ Sort: l.col0 ASC NULLS LAST
 ----------TableScan: tab0 projection=[col0, col1]
 physical_plan
 SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
---ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 
ASC NULLS LAST]@3 as last_col1]
-----AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, 
col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered
-------SortExec: expr=[col0@0 ASC NULLS LAST]
+--SortExec: expr=[col0@0 ASC NULLS LAST]
+----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 
ASC NULLS LAST]@3 as last_col1]
+------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), 
input_partitions=4
 ------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, 
col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 3d9f7511be..b6325fd889 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3197,6 +3197,72 @@ SELECT a_new, d, rn1 FROM (SELECT d, a as a_new,
 0 0 4
 0 1 5
 
+query TT
+EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1,
+SUM(a) OVER(partition by b, a order by c) as sum2,
+  SUM(a) OVER(partition by a, d order by b) as sum3,
+  SUM(a) OVER(partition by d order by a) as sum4
+FROM annotated_data_infinite2;
+----
+logical_plan
+Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annota 
[...]
+--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(anno [...]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] 
ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+physical_plan
+ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
+--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), f [...]
+----ProjectionExec: expr=[a@0 as a, d@3 as d, SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@4 as SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW, SUM(annotated_data_infinite2.a [...]
+------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullable:  [...]
+--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullable [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullab [...]
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# re-execute the same query in multi partitions.
+# final plan should still be streamable
+query TT
+EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1,
+  SUM(a) OVER(partition by b, a order by c) as sum2,
+  SUM(a) OVER(partition by a, d order by b) as sum3,
+  SUM(a) OVER(partition by d order by a) as sum4
+FROM annotated_data_infinite2;
+----
+logical_plan
+Projection: SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annota 
[...]
+--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(anno [...]
+------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) 
PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW]]
+--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS 
Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] 
ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+------------TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+physical_plan
+ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
+--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), f [...]
+----CoalesceBatchesExec: target_batch_size=4096
+------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2), 
input_partitions=2
+--------ProjectionExec: expr=[a@0 as a, d@3 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinit [...]
+----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullab [...]
+------------CoalesceBatchesExec: target_batch_size=4096
+--------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2), 
input_partitions=2
+----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64,  [...]
+------------------CoalesceBatchesExec: target_batch_size=4096
+--------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, 
d@3], 2), input_partitions=2
+----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: I [...]
+------------------------CoalesceBatchesExec: target_batch_size=4096
+--------------------------SortPreservingRepartitionExec: 
partitioning=Hash([a@0, b@1], 2), input_partitions=2
+----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
+
+# reset the partition number 1 again
+statement ok
+set datafusion.execution.target_partitions = 1;
+
 statement ok
 drop table annotated_data_finite2
 

Reply via email to