This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 76075e2f35 Preserve SPM when parent maintains input order (#21097)
76075e2f35 is described below

commit 76075e2f359d309ec6baee2553ea538932a78979
Author: Rohan Krishnaswamy <[email protected]>
AuthorDate: Tue Mar 24 13:14:09 2026 -0700

    Preserve SPM when parent maintains input order (#21097)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #21096
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    - Updates `enforce_distribution.rs`
    - Adds tests
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    Yes
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    N/A
---
 .../physical_optimizer/enforce_distribution.rs     | 174 +++++++++++++++++++++
 .../physical-optimizer/src/enforce_distribution.rs |   9 +-
 2 files changed, 181 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index 993798ff75..e14dc389d1 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -59,6 +59,7 @@ use datafusion_physical_plan::aggregates::{
     AggregateExec, AggregateMode, PhysicalGroupBy,
 };
 
+use datafusion_physical_expr::Distribution;
 use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion_physical_plan::execution_plan::ExecutionPlan;
 use datafusion_physical_plan::expressions::col;
@@ -227,6 +228,106 @@ impl ExecutionPlan for SortRequiredExec {
     }
 }
 
+#[derive(Debug)]
+struct SinglePartitionMaintainsOrderExec {
+    input: Arc<dyn ExecutionPlan>,
+    cache: Arc<PlanProperties>,
+}
+
+impl SinglePartitionMaintainsOrderExec {
+    fn new(input: Arc<dyn ExecutionPlan>) -> Self {
+        let cache = Self::compute_properties(&input);
+        Self {
+            input,
+            cache: Arc::new(cache),
+        }
+    }
+
+    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
+        PlanProperties::new(
+            input.equivalence_properties().clone(),
+            input.output_partitioning().clone(),
+            input.pipeline_behavior(),
+            input.boundedness(),
+        )
+    }
+}
+
+impl DisplayAs for SinglePartitionMaintainsOrderExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "SinglePartitionMaintainsOrderExec")
+            }
+            DisplayFormatType::TreeRender => write!(f, ""),
+        }
+    }
+}
+
+impl ExecutionPlan for SinglePartitionMaintainsOrderExec {
+    fn name(&self) -> &'static str {
+        "SinglePartitionMaintainsOrderExec"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn properties(&self) -> &Arc<PlanProperties> {
+        &self.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::SinglePartition]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        assert_eq!(children.len(), 1);
+        let child = children.pop().unwrap();
+        Ok(Arc::new(Self::new(child)))
+    }
+
+    fn apply_expressions(
+        &self,
+        _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
+    ) -> Result<TreeNodeRecursion> {
+        Ok(TreeNodeRecursion::Continue)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<datafusion::execution::context::TaskContext>,
+    ) -> Result<datafusion_physical_plan::SendableRecordBatchStream> {
+        unreachable!();
+    }
+}
+
+fn single_partition_maintains_order_exec(
+    input: Arc<dyn ExecutionPlan>,
+) -> Arc<dyn ExecutionPlan> {
+    Arc::new(SinglePartitionMaintainsOrderExec::new(input))
+}
+
 fn parquet_exec() -> Arc<DataSourceExec> {
     parquet_exec_with_sort(schema(), vec![])
 }
@@ -3681,3 +3782,76 @@ fn test_replace_order_preserving_variants_with_fetch() 
-> Result<()> {
 
     Ok(())
 }
+
+/// When a parent requires SinglePartition and maintains input order, 
order-preserving
+/// variants (e.g. SortPreservingMergeExec) should be kept so that ordering can
+/// propagate to ancestors. Replacing them with CoalescePartitionsExec would 
destroy
+/// ordering and force unnecessary sorts later.
+#[test]
+fn maintains_order_preserves_spm_for_single_partition() -> Result<()> {
+    let schema = schema();
+    let sort_key: LexOrdering = [PhysicalSortExpr {
+        expr: col("c", &schema)?,
+        options: SortOptions::default(),
+    }]
+    .into();
+
+    // GlobalLimitExec -> LocalLimitExec -> sorted multi-partition parquet
+    let plan: Arc<dyn ExecutionPlan> =
+        limit_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
+
+    // Test EnforceDistribution in isolation: SPM should be preserved because
+    // GlobalLimitExec maintains input order.
+    let result = ensure_distribution_helper(plan, 10, false)?;
+    assert_plan!(result,
+        @r"
+    GlobalLimitExec: skip=0, fetch=100
+      SortPreservingMergeExec: [c@2 ASC]
+        LocalLimitExec: fetch=100
+          DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet
+    ");
+
+    Ok(())
+}
+
+/// Tests the cascading effect through a UnionExec with the full optimizer
+/// pipeline and `prefer_existing_sort=true`. Each Union branch has an operator
+/// that requires SinglePartition and maintains input order. 
SortPreservingMergeExec
+/// should be preserved in each branch, allowing ordering to flow through to 
the
+/// ancestor SortRequiredExec.
+#[test]
+fn maintains_order_preserves_spm_through_union_with_prefer_existing_sort() -> 
Result<()> {
+    let schema = schema();
+    let sort_key: LexOrdering = [PhysicalSortExpr {
+        expr: col("c", &schema)?,
+        options: SortOptions::default(),
+    }]
+    .into();
+
+    let branch1 =
+        
single_partition_maintains_order_exec(parquet_exec_multiple_sorted(vec![
+            sort_key.clone(),
+        ]));
+    let branch2 =
+        
single_partition_maintains_order_exec(parquet_exec_multiple_sorted(vec![
+            sort_key.clone(),
+        ]));
+    let plan = sort_required_exec_with_req(union_exec(vec![branch1, branch2]), 
sort_key);
+
+    let test_config = TestConfig::default().with_prefer_existing_sort();
+
+    let plan_distrib = test_config.to_plan(plan.clone(), 
&DISTRIB_DISTRIB_SORT);
+    assert_plan!(plan_distrib,
+        @r"
+    SortRequiredExec: [c@2 ASC]
+      UnionExec
+        SinglePartitionMaintainsOrderExec
+          SortPreservingMergeExec: [c@2 ASC]
+            DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet
+        SinglePartitionMaintainsOrderExec
+          SortPreservingMergeExec: [c@2 ASC]
+            DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet
+    ");
+
+    Ok(())
+}
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs 
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index d23a699f71..0ea1b766cd 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -1376,8 +1376,13 @@ pub fn ensure_distribution(
                 match requirement {
                     // Operator requires specific distribution.
                     Distribution::SinglePartition | 
Distribution::HashPartitioned(_) => {
-                        // Since there is no ordering requirement, preserving 
ordering is pointless
-                        child = replace_order_preserving_variants(child)?;
+                        // If the parent doesn't maintain input order, 
preserving
+                        // ordering is pointless. However, if it does maintain
+                        // input order, we keep order-preserving variants so
+                        // ordering can flow through to ancestors that need it.
+                        if !maintains {
+                            child = replace_order_preserving_variants(child)?;
+                        }
                     }
                     Distribution::UnspecifiedDistribution => {
                         // Since ordering is lost, trying to preserve ordering 
is pointless


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to