This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b8b6214733 Fix: fetch is missing in plan_with_order_breaking_variants 
method (#15842)
b8b6214733 is described below

commit b8b621473335cb89f92ded133d65961d0b9ef0a3
Author: xudong.w <[email protected]>
AuthorDate: Fri Apr 25 11:37:13 2025 +0800

    Fix: fetch is missing in plan_with_order_breaking_variants method (#15842)
---
 .../replace_with_order_preserving_variants.rs      | 30 +++++++++++++++++++++-
 .../replace_with_order_preserving_variants.rs      | 13 ++++++----
 2 files changed, 37 insertions(+), 6 deletions(-)

diff --git 
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
index eb517c42b0..ada3f06d39 100644
--- 
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -45,10 +45,12 @@ use datafusion_common::{assert_contains, Result};
 use datafusion_expr::{JoinType, Operator};
 use datafusion_physical_expr::expressions::{self, col, Column};
 use datafusion_physical_expr::PhysicalSortExpr;
-use 
datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{plan_with_order_preserving_variants,
 replace_with_order_preserving_variants, OrderPreservationContext};
+use 
datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{plan_with_order_breaking_variants,
 plan_with_order_preserving_variants, replace_with_order_preserving_variants, 
OrderPreservationContext};
 use datafusion_common::config::ConfigOptions;
 
 use crate::physical_optimizer::enforce_sorting::parquet_exec_sorted;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use object_store::memory::InMemory;
 use object_store::ObjectStore;
 use rstest::rstest;
@@ -1310,3 +1312,29 @@ fn 
test_plan_with_order_preserving_variants_preserves_fetch() -> Result<()> {
     assert_eq!(res.plan.fetch(), Some(5),);
     Ok(())
 }
+
+#[test]
+fn test_plan_with_order_breaking_variants_preserves_fetch() -> Result<()> {
+    let schema = create_test_schema3()?;
+    let parquet_sort_exprs = 
vec![crate::physical_optimizer::test_utils::sort_expr(
+        "a", &schema,
+    )];
+    let parquet_exec = parquet_exec_sorted(&schema, 
parquet_sort_exprs.clone());
+    let spm = SortPreservingMergeExec::new(
+        LexOrdering::new(parquet_sort_exprs),
+        parquet_exec.clone(),
+    )
+    .with_fetch(Some(10));
+    let requirements = OrderPreservationContext::new(
+        Arc::new(spm),
+        true,
+        vec![OrderPreservationContext::new(
+            parquet_exec.clone(),
+            true,
+            vec![],
+        )],
+    );
+    let res = plan_with_order_breaking_variants(requirements)?;
+    assert_eq!(res.plan.fetch(), Some(10));
+    Ok(())
+}
diff --git 
a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
 
b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
index 7fe62a146a..65a6198040 100644
--- 
a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
@@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::EmissionType;
 use datafusion_physical_plan::repartition::RepartitionExec;
 use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use datafusion_physical_plan::tree_node::PlanContext;
-use datafusion_physical_plan::ExecutionPlanProperties;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
 use itertools::izip;
 
@@ -167,7 +167,7 @@ pub fn plan_with_order_preserving_variants(
 /// Calculates the updated plan by replacing operators that preserve ordering
 /// inside `sort_input` with their order-breaking variants. This will restore
 /// the original plan modified by [`plan_with_order_preserving_variants`].
-fn plan_with_order_breaking_variants(
+pub fn plan_with_order_breaking_variants(
     mut sort_input: OrderPreservationContext,
 ) -> Result<OrderPreservationContext> {
     let plan = &sort_input.plan;
@@ -202,10 +202,13 @@ fn plan_with_order_breaking_variants(
         let partitioning = plan.output_partitioning().clone();
         sort_input.plan = Arc::new(RepartitionExec::try_new(child, 
partitioning)?) as _;
     } else if is_sort_preserving_merge(plan) {
-        // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`:
+        // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`
+        // SPM may have `fetch`, so pass it to the `CoalescePartitionsExec`
         let child = Arc::clone(&sort_input.children[0].plan);
-        let coalesce = CoalescePartitionsExec::new(child);
-        sort_input.plan = Arc::new(coalesce) as _;
+        let coalesce = CoalescePartitionsExec::new(child)
+            .with_fetch(plan.fetch())
+            .unwrap();
+        sort_input.plan = coalesce;
     } else {
         return sort_input.update_plan_from_children();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to