This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 46182c894e Replace repartition execs with sort preserving repartition 
execs (#6921)
46182c894e is described below

commit 46182c894e5106adba7fb53e9848ce666fb6129b
Author: Mert Akkaya <[email protected]>
AuthorDate: Thu Jul 13 14:55:34 2023 +0300

    Replace repartition execs with sort preserving repartition execs (#6921)
    
    * initialize replace_repartition_execs rule for 
SortPreservingRepartitionExec optimization
    typo fix
    
    * fix linter checks
    
    * use common idioms
    fix the inter children replacement issue
    simplify replacement logic
    use ordering_satisfy to compare orderings
    
    * fix ordering satisfy parameters according to provided output orderings
    
    * add Result wrappings on functions
    aggresively replace repartitions and do the check on final step
    
    * update RepartitionExec and HashJoinExec strings
    
    * update the documentation
    remove the unwrap method by using children() for sort exec
    
    * use .any for checking input order maintaining
    
    * Use existing order preserving RepartitionExec instead of dummy
    
    * Move rule to function
    
    * Simplifications
    
    * Remove sub-sule from enforce sorting
    
    * Run replace repartition for fixing pipeline
    
    * Comment improvements
    
    * Update comments
    
    * Better commenting
    
    * Address reviews
    
    * Change test so that it doesn't contain unnecessary executor
    
    ---------
    
    Co-authored-by: Mustafa Akur <[email protected]>
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/core/src/physical_optimizer/mod.rs      |   1 +
 .../replace_repartition_execs.rs                   | 786 +++++++++++++++++++++
 .../src/physical_optimizer/sort_enforcement.rs     |  13 +-
 datafusion/core/src/physical_plan/memory.rs        |   6 +-
 .../core/src/physical_plan/repartition/mod.rs      |  10 +-
 5 files changed, 805 insertions(+), 11 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index 48a3d6ade7..cd61fac9de 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -31,6 +31,7 @@ pub mod optimizer;
 pub mod pipeline_checker;
 pub mod pruning;
 pub mod repartition;
+pub mod replace_repartition_execs;
 pub mod sort_enforcement;
 mod sort_pushdown;
 mod utils;
diff --git 
a/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs 
b/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
new file mode 100644
index 0000000000..faab482156
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/replace_repartition_execs.rs
@@ -0,0 +1,786 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Repartition optimizer that replaces `SortExec`s and their suitable 
`RepartitionExec` children with `SortPreservingRepartitionExec`s.
+use crate::error::Result;
+use crate::physical_optimizer::sort_enforcement::unbounded_output;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::ExecutionPlan;
+
+use super::utils::is_repartition;
+
+use datafusion_common::tree_node::Transformed;
+use datafusion_physical_expr::utils::ordering_satisfy;
+
+use itertools::enumerate;
+use std::sync::Arc;
+
+/// Creates a `SortPreservingRepartitionExec` from given `RepartitionExec`
+fn sort_preserving_repartition(
+    repartition: &RepartitionExec,
+) -> Result<Arc<RepartitionExec>> {
+    Ok(Arc::new(
+        RepartitionExec::try_new(
+            repartition.input().clone(),
+            repartition.partitioning().clone(),
+        )?
+        .with_preserve_order(),
+    ))
+}
+
+fn does_plan_maintain_input_order(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    plan.maintains_input_order().iter().any(|flag| *flag)
+}
+
+/// Check the children nodes of a `SortExec` until ordering is lost (e.g. until
+/// another `SortExec` or a `CoalescePartitionsExec` which doesn't maintain 
ordering)
+/// and replace `RepartitionExec`s that do not maintain ordering (e.g. those 
whose
+/// input partition counts are larger than unity) with 
`SortPreservingRepartitionExec`s.
+/// Note that doing this may render the `SortExec` in question unneccessary, 
which will
+/// be removed later on.
+///
+/// For example, we transform the plan below
+/// "FilterExec: c@2 > 3",
+/// "  RepartitionExec: partitioning=Hash(\[b@0], 16), input_partitions=16",
+/// "    RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
+/// "      MemoryExec: partitions=1, 
partition_sizes=\[(<depends_on_batch_size>)], output_ordering: 
\[PhysicalSortExpr { expr: Column { name: \"a\", index: 0 }, options: 
SortOptions { descending: false, nulls_first: false } }]",
+/// into
+/// "FilterExec: c@2 > 3",
+/// "  SortPreservingRepartitionExec: partitioning=Hash(\[b@0], 16), 
input_partitions=16",
+/// "    RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
+/// "      MemoryExec: partitions=1, 
partition_sizes=\[<depends_on_batch_size>], output_ordering: \[PhysicalSortExpr 
{ expr: Column { name: \"a\", index: 0 }, options: SortOptions { descending: 
false, nulls_first: false } }]",
+/// where the `FilterExec` in the latter has output ordering `a ASC`. This 
ordering will
+/// potentially remove a `SortExec` at the top of `FilterExec`. If this 
doesn't help remove
+/// a `SortExec`, the old version is used.
+fn replace_sort_children(
+    plan: &Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    if plan.children().is_empty() {
+        return Ok(plan.clone());
+    }
+
+    let mut children = plan.children();
+    for (idx, child) in enumerate(plan.children()) {
+        if !is_repartition(&child) && !does_plan_maintain_input_order(&child) {
+            break;
+        }
+
+        if let Some(repartition) = 
child.as_any().downcast_ref::<RepartitionExec>() {
+            // Replace this `RepartitionExec` with a 
`SortPreservingRepartitionExec`
+            // if it doesn't preserve ordering and its input is unbounded. 
Doing
+            // so avoids breaking the pipeline.
+            if !repartition.maintains_input_order()[0] && 
unbounded_output(&child) {
+                let spr = sort_preserving_repartition(repartition)?
+                    .with_new_children(repartition.children())?;
+                // Perform the replacement and recurse into this plan's 
children:
+                children[idx] = replace_sort_children(&spr)?;
+                continue;
+            }
+        }
+
+        children[idx] = replace_sort_children(&child)?;
+    }
+
+    plan.clone().with_new_children(children)
+}
+
+/// The `replace_repartition_execs` optimizer sub-rule searches for `SortExec`s
+/// and their `RepartitionExec` children with multiple input partitioning 
having
+/// local (per-partition) ordering, so that it can replace the 
`RepartitionExec`
+/// with a `SortPreservingRepartitionExec` and remove the pipeline-breaking 
`SortExec`
+/// from the physical plan.
+///
+/// The algorithm flow is simply like this:
+/// 1. Visit nodes of the physical plan top-down and look for `SortExec` nodes.
+/// 2. If a `SortExec` is found, iterate over its children recursively until an
+///    executor that doesn't maintain ordering is encountered (or until a leaf 
node).
+///    `RepartitionExec`s with multiple input partitions are considered as if 
they
+///    maintain input ordering because they are potentially replaceable with
+///    `SortPreservingRepartitionExec`s which maintain ordering.
+/// 3_1. Replace the `RepartitionExec`s with multiple input partitions (which 
doesn't
+///      maintain ordering) with a `SortPreservingRepartitionExec`.
+/// 3_2. Otherwise, keep the plan as is.
+/// 4. Check if the `SortExec` is still necessary in the updated plan by 
comparing
+///    its input ordering with the output ordering it imposes. We do this 
because
+///     replacing `RepartitionExec`s with `SortPreservingRepartitionExec`s 
enables us
+///     to preserve the previously lost ordering during `RepartitionExec`s.
+/// 5_1. If the `SortExec` in question turns out to be unnecessary, remove it 
and use
+///      updated plan. Otherwise, use the original plan.
+/// 6. Continue the top-down iteration until another `SortExec` is seen, or 
the iterations finish.
+pub fn replace_repartition_execs(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
+        let changed_plan = replace_sort_children(&plan)?;
+        // Since we have a `SortExec` here, it's guaranteed that it has a 
single child.
+        let input = &changed_plan.children()[0];
+        // Check if any child is changed, if so remove the `SortExec`. If the 
ordering
+        // is being satisfied with the child, then it means `SortExec` is 
unnecessary.
+        if ordering_satisfy(
+            input.output_ordering(),
+            sort_exec.output_ordering(),
+            || input.equivalence_properties(),
+            || input.ordering_equivalence_properties(),
+        ) {
+            Ok(Transformed::Yes(input.clone()))
+        } else {
+            Ok(Transformed::No(plan))
+        }
+    } else {
+        // We don't have anything to do until we get to the `SortExec` parent.
+        Ok(Transformed::No(plan))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use crate::datasource::file_format::file_type::FileCompressionType;
+    use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
+    use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
+    use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+
+    use crate::physical_plan::filter::FilterExec;
+    use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
+    use crate::physical_plan::repartition::RepartitionExec;
+    use crate::physical_plan::sorts::sort::SortExec;
+    use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+    use crate::physical_plan::{displayable, Partitioning};
+
+    use datafusion_common::tree_node::TreeNode;
+    use datafusion_common::{Result, Statistics};
+    use datafusion_execution::object_store::ObjectStoreUrl;
+    use datafusion_expr::{JoinType, Operator};
+    use datafusion_physical_expr::expressions::{self, col, Column};
+    use datafusion_physical_expr::PhysicalSortExpr;
+
+    use arrow::compute::SortOptions;
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+
+    /// Runs the `replace_repartition_execs` sub-rule and asserts the plan
+    /// against the original and expected plans.
+    ///
+    /// `$EXPECTED_PLAN_LINES`: input plan
+    /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan
+    /// `$PLAN`: the plan to optimized
+    macro_rules! assert_optimized {
+        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$PLAN: expr) => {
+            let physical_plan = $PLAN;
+            let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
+            let actual: Vec<&str> = formatted.trim().lines().collect();
+
+            let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
+                .iter().map(|s| *s).collect();
+
+            assert_eq!(
+                expected_plan_lines, actual,
+                "\n**Original Plan 
Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
+            );
+
+            let expected_optimized_lines: Vec<&str> = 
$EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect();
+
+            // Run the rule top-down
+            let optimized_physical_plan = 
physical_plan.transform_down(&replace_repartition_execs)?;
+
+            // Get string representation of the plan
+            let actual = get_plan_string(&optimized_physical_plan);
+            assert_eq!(
+                expected_optimized_lines, actual,
+                "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
+            );
+        };
+    }
+
+    #[tokio::test]
+    // Searches for a simple sort and a repartition just after it, the second 
repartition with 1 input partition should not be affected
+    async fn test_replace_multiple_input_repartition_1() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition = 
repartition_exec_hash(repartition_exec_round_robin(source));
+        let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_inter_children_change_only() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr_default("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let repartition_hash = repartition_exec_hash(repartition_rr);
+        let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
+        let sort = sort_exec(
+            vec![sort_expr_default("a", &schema)],
+            coalesce_partitions,
+            false,
+        );
+        let repartition_rr2 = repartition_exec_round_robin(sort);
+        let repartition_hash2 = repartition_exec_hash(repartition_rr2);
+        let filter = filter_exec(repartition_hash2, &schema);
+        let sort2 = sort_exec(vec![sort_expr_default("a", &schema)], filter, 
true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], 
sort2);
+
+        let expected_input = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    FilterExec: c@2 > 3",
+            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "          SortExec: expr=[a@0 ASC]",
+            "            CoalescePartitionsExec",
+            "              RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], 
has_header=true",
+        ];
+
+        let expected_optimized = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  FilterExec: c@2 > 3",
+            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        SortExec: expr=[a@0 ASC]",
+            "          CoalescePartitionsExec",
+            "            RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "              RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "                CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], 
has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_replace_multiple_input_repartition_2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let filter = filter_exec(repartition_rr, &schema);
+        let repartition_hash = repartition_exec_hash(filter);
+        let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash, 
true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      FilterExec: c@2 > 3",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    FilterExec: c@2 > 3",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_replace_multiple_input_repartition_with_extra_steps() -> 
Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let repartition_hash = repartition_exec_hash(repartition_rr);
+        let filter = filter_exec(repartition_hash, &schema);
+        let coalesce_batches_exec: Arc<dyn ExecutionPlan> = 
coalesce_batches_exec(filter);
+        let sort = sort_exec(vec![sort_expr("a", &schema)], 
coalesce_batches_exec, true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@2 > 3",
+            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@2 > 3",
+            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_replace_multiple_input_repartition_with_extra_steps_2() -> 
Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
+        let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
+        let filter = filter_exec(repartition_hash, &schema);
+        let coalesce_batches_exec_2 = coalesce_batches_exec(filter);
+        let sort =
+            sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec_2, 
true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@2 > 3",
+            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@2 > 3",
+            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_not_replacing_when_no_need_to_preserve_sorting() -> 
Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let repartition_hash = repartition_exec_hash(repartition_rr);
+        let filter = filter_exec(repartition_hash, &schema);
+        let coalesce_batches_exec: Arc<dyn ExecutionPlan> = 
coalesce_batches_exec(filter);
+
+        let physical_plan: Arc<dyn ExecutionPlan> =
+            coalesce_partitions_exec(coalesce_batches_exec);
+
+        let expected_input = vec![
+            "CoalescePartitionsExec",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@2 > 3",
+            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "CoalescePartitionsExec",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    FilterExec: c@2 > 3",
+            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_multiple_replacable_repartitions() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let repartition_hash = repartition_exec_hash(repartition_rr);
+        let filter = filter_exec(repartition_hash, &schema);
+        let coalesce_batches = coalesce_batches_exec(filter);
+        let repartition_hash_2 = repartition_exec_hash(coalesce_batches);
+        let sort = sort_exec(vec![sort_expr("a", &schema)], 
repartition_hash_2, true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      CoalesceBatchesExec: target_batch_size=8192",
+            "        FilterExec: c@2 > 3",
+            "          RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      FilterExec: c@2 > 3",
+            "        SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_not_replace_with_different_orderings() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let repartition_hash = repartition_exec_hash(repartition_rr);
+        let sort = sort_exec(
+            vec![sort_expr_default("c", &schema)],
+            repartition_hash,
+            true,
+        );
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], 
sort);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [c@2 ASC]",
+            "  SortExec: expr=[c@2 ASC]",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [c@2 ASC]",
+            "  SortExec: expr=[c@2 ASC]",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_lost_ordering() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let repartition_hash = repartition_exec_hash(repartition_rr);
+        let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
+        let physical_plan =
+            sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, 
false);
+
+        let expected_input = vec![
+            "SortExec: expr=[a@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortExec: expr=[a@0 ASC NULLS LAST]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_lost_and_kept_ordering() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec_round_robin(source);
+        let repartition_hash = repartition_exec_hash(repartition_rr);
+        let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
+        let sort = sort_exec(
+            vec![sort_expr_default("c", &schema)],
+            coalesce_partitions,
+            false,
+        );
+        let repartition_rr2 = repartition_exec_round_robin(sort);
+        let repartition_hash2 = repartition_exec_hash(repartition_rr2);
+        let filter = filter_exec(repartition_hash2, &schema);
+        let sort2 = sort_exec(vec![sort_expr_default("c", &schema)], filter, 
true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], 
sort2);
+
+        let expected_input = [
+            "SortPreservingMergeExec: [c@2 ASC]",
+            "  SortExec: expr=[c@2 ASC]",
+            "    FilterExec: c@2 > 3",
+            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "          SortExec: expr=[c@2 ASC]",
+            "            CoalescePartitionsExec",
+            "              RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+
+        let expected_optimized = [
+            "SortPreservingMergeExec: [c@2 ASC]",
+            "  FilterExec: c@2 > 3",
+            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        SortExec: expr=[c@2 ASC]",
+            "          CoalescePartitionsExec",
+            "            RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "              RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "                CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_multiple_child_trees() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let left_sort_exprs = vec![sort_expr("a", &schema)];
+        let left_source = csv_exec_sorted(&schema, left_sort_exprs, true);
+        let left_repartition_rr = repartition_exec_round_robin(left_source);
+        let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
+        let left_coalesce_partitions =
+            Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
+
+        let right_sort_exprs = vec![sort_expr("a", &schema)];
+        let right_source = csv_exec_sorted(&schema, right_sort_exprs, true);
+        let right_repartition_rr = repartition_exec_round_robin(right_source);
+        let right_repartition_hash = 
repartition_exec_hash(right_repartition_rr);
+        let right_coalesce_partitions =
+            Arc::new(CoalesceBatchesExec::new(right_repartition_hash, 4096));
+
+        let hash_join_exec =
+            hash_join_exec(left_coalesce_partitions, 
right_coalesce_partitions);
+        let sort = sort_exec(vec![sort_expr_default("a", &schema)], 
hash_join_exec, true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], 
sort);
+
+        let expected_input = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, 
c@1)]",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+
+        let expected_optimized = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, 
c@1)]",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+            "      CoalesceBatchesExec: target_batch_size=4096",
+            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    // End test cases
+    // Start test helpers
+
+    fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        let sort_opts = SortOptions {
+            nulls_first: false,
+            descending: false,
+        };
+        sort_expr_options(name, schema, sort_opts)
+    }
+
+    fn sort_expr_default(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        let sort_opts = SortOptions::default();
+        sort_expr_options(name, schema, sort_opts)
+    }
+
+    fn sort_expr_options(
+        name: &str,
+        schema: &Schema,
+        options: SortOptions,
+    ) -> PhysicalSortExpr {
+        PhysicalSortExpr {
+            expr: col(name, schema).unwrap(),
+            options,
+        }
+    }
+
+    fn sort_exec(
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        preserve_partitioning: bool,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+        Arc::new(
+            SortExec::new(sort_exprs, input)
+                .with_preserve_partitioning(preserve_partitioning),
+        )
+    }
+
+    fn sort_preserving_merge_exec(
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+        Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
+    }
+
+    fn repartition_exec_round_robin(
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        Arc::new(
+            RepartitionExec::try_new(input, 
Partitioning::RoundRobinBatch(8)).unwrap(),
+        )
+    }
+
+    fn repartition_exec_hash(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(
+            RepartitionExec::try_new(
+                input,
+                Partitioning::Hash(vec![Arc::new(Column::new("c1", 0))], 8),
+            )
+            .unwrap(),
+        )
+    }
+
+    fn filter_exec(
+        input: Arc<dyn ExecutionPlan>,
+        schema: &SchemaRef,
+    ) -> Arc<dyn ExecutionPlan> {
+        let predicate = expressions::binary(
+            col("c", schema).unwrap(),
+            Operator::Gt,
+            expressions::lit(3i32),
+            schema,
+        )
+        .unwrap();
+        Arc::new(FilterExec::try_new(predicate, input).unwrap())
+    }
+
+    fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(CoalesceBatchesExec::new(input, 8192))
+    }
+
+    fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(CoalescePartitionsExec::new(input))
+    }
+
+    fn hash_join_exec(
+        left: Arc<dyn ExecutionPlan>,
+        right: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        Arc::new(
+            HashJoinExec::try_new(
+                left,
+                right,
+                vec![(Column::new("c", 1), Column::new("c", 1))],
+                None,
+                &JoinType::Inner,
+                PartitionMode::Partitioned,
+                false,
+            )
+            .unwrap(),
+        )
+    }
+
+    fn create_test_schema() -> Result<SchemaRef> {
+        let column_a = Field::new("a", DataType::Int32, false);
+        let column_b = Field::new("b", DataType::Int32, false);
+        let column_c = Field::new("c", DataType::Int32, false);
+        let column_d = Field::new("d", DataType::Int32, false);
+        let schema = Arc::new(Schema::new(vec![column_a, column_b, column_c, 
column_d]));
+
+        Ok(schema)
+    }
+
+    // creates a csv exec source for the test purposes
+    // projection and has_header parameters are given static due to testing 
needs
+    pub fn csv_exec_sorted(
+        schema: &SchemaRef,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        infinite_source: bool,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+        let projection: Vec<usize> = vec![0, 2, 3];
+
+        Arc::new(CsvExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema.clone(),
+                file_groups: vec![vec![PartitionedFile::new(
+                    "file_path".to_string(),
+                    100,
+                )]],
+                statistics: Statistics::default(),
+                projection: Some(projection),
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: vec![sort_exprs],
+                infinite_source,
+            },
+            true,
+            0,
+            FileCompressionType::UNCOMPRESSED,
+        ))
+    }
+
+    // Util function to get string representation of a physical plan
+    fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
+        let formatted = displayable(plan.as_ref()).indent(true).to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        actual.iter().map(|elem| elem.to_string()).collect()
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index dc1f27738d..4587aadc68 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -36,6 +36,7 @@
 
 use crate::config::ConfigOptions;
 use crate::error::Result;
+use 
crate::physical_optimizer::replace_repartition_execs::replace_repartition_execs;
 use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
 use crate::physical_optimizer::utils::{
     add_sort_above, find_indices, is_coalesce_partitions, is_limit, 
is_repartition,
@@ -370,7 +371,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
         // missed by the bottom-up traversal:
         let sort_pushdown = SortPushDown::init(new_plan);
         let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?;
-        Ok(adjusted.plan)
+        adjusted.plan.transform_down(&replace_repartition_execs)
     }
 
     fn name(&self) -> &str {
@@ -428,7 +429,7 @@ fn parallelize_sorts(
             coalesce_onwards: vec![None],
         }));
     } else if is_coalesce_partitions(&plan) {
-        // There is an unnecessary `CoalescePartitionExec` in the plan.
+        // There is an unnecessary `CoalescePartitionsExec` in the plan.
         let mut prev_layer = plan.clone();
         update_child_to_remove_coalesce(&mut prev_layer, &mut 
coalesce_onwards[0])?;
         let new_plan = plan.with_new_children(vec![prev_layer])?;
@@ -947,9 +948,9 @@ fn check_alignment(
     })
 }
 
-// Get unbounded_output information for the executor
-fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
-    let res = if plan.children().is_empty() {
+// Get output (un)boundedness information for the given `plan`.
+pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    let result = if plan.children().is_empty() {
         plan.unbounded_output(&[])
     } else {
         let children_unbounded_output = plan
@@ -959,7 +960,7 @@ fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
             .collect::<Vec<_>>();
         plan.unbounded_output(&children_unbounded_output)
     };
-    res.unwrap_or(true)
+    result.unwrap_or(true)
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/physical_plan/memory.rs 
b/datafusion/core/src/physical_plan/memory.rs
index 0601d6fc94..fbf7f46dd5 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -52,7 +52,11 @@ impl fmt::Debug for MemoryExec {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         write!(f, "partitions: [...]")?;
         write!(f, "schema: {:?}", self.projected_schema)?;
-        write!(f, "projection: {:?}", self.projection)
+        write!(f, "projection: {:?}", self.projection)?;
+        if let Some(sort_info) = &self.sort_information {
+            write!(f, ", output_ordering: {:?}", sort_info)?;
+        }
+        Ok(())
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index cb4d5c8988..6175c19576 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -359,10 +359,12 @@ impl ExecutionPlan for RepartitionExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(RepartitionExec::try_new(
-            children[0].clone(),
-            self.partitioning.clone(),
-        )?))
+        let mut repartition =
+            RepartitionExec::try_new(children[0].clone(), 
self.partitioning.clone())?;
+        if self.preserve_order {
+            repartition = repartition.with_preserve_order();
+        }
+        Ok(Arc::new(repartition))
     }
 
     /// Specifies whether this plan generates an infinite stream of records.


Reply via email to