This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b8b6214733 Fix: fetch is missing in plan_with_order_breaking_variants
method (#15842)
b8b6214733 is described below
commit b8b621473335cb89f92ded133d65961d0b9ef0a3
Author: xudong.w <[email protected]>
AuthorDate: Fri Apr 25 11:37:13 2025 +0800
Fix: fetch is missing in plan_with_order_breaking_variants method (#15842)
---
.../replace_with_order_preserving_variants.rs | 30 +++++++++++++++++++++-
.../replace_with_order_preserving_variants.rs | 13 ++++++----
2 files changed, 37 insertions(+), 6 deletions(-)
diff --git
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
index eb517c42b0..ada3f06d39 100644
---
a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -45,10 +45,12 @@ use datafusion_common::{assert_contains, Result};
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{self, col, Column};
use datafusion_physical_expr::PhysicalSortExpr;
-use
datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{plan_with_order_preserving_variants,
replace_with_order_preserving_variants, OrderPreservationContext};
+use
datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{plan_with_order_breaking_variants,
plan_with_order_preserving_variants, replace_with_order_preserving_variants,
OrderPreservationContext};
use datafusion_common::config::ConfigOptions;
use crate::physical_optimizer::enforce_sorting::parquet_exec_sorted;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use object_store::memory::InMemory;
use object_store::ObjectStore;
use rstest::rstest;
@@ -1310,3 +1312,29 @@ fn
test_plan_with_order_preserving_variants_preserves_fetch() -> Result<()> {
assert_eq!(res.plan.fetch(), Some(5),);
Ok(())
}
+
+#[test]
+fn test_plan_with_order_breaking_variants_preserves_fetch() -> Result<()> {
+ let schema = create_test_schema3()?;
+ let parquet_sort_exprs =
vec![crate::physical_optimizer::test_utils::sort_expr(
+ "a", &schema,
+ )];
+ let parquet_exec = parquet_exec_sorted(&schema,
parquet_sort_exprs.clone());
+ let spm = SortPreservingMergeExec::new(
+ LexOrdering::new(parquet_sort_exprs),
+ parquet_exec.clone(),
+ )
+ .with_fetch(Some(10));
+ let requirements = OrderPreservationContext::new(
+ Arc::new(spm),
+ true,
+ vec![OrderPreservationContext::new(
+ parquet_exec.clone(),
+ true,
+ vec![],
+ )],
+ );
+ let res = plan_with_order_breaking_variants(requirements)?;
+ assert_eq!(res.plan.fetch(), Some(10));
+ Ok(())
+}
diff --git
a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
index 7fe62a146a..65a6198040 100644
---
a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
+++
b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs
@@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::repartition::RepartitionExec;
use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::tree_node::PlanContext;
-use datafusion_physical_plan::ExecutionPlanProperties;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use itertools::izip;
@@ -167,7 +167,7 @@ pub fn plan_with_order_preserving_variants(
/// Calculates the updated plan by replacing operators that preserve ordering
/// inside `sort_input` with their order-breaking variants. This will restore
/// the original plan modified by [`plan_with_order_preserving_variants`].
-fn plan_with_order_breaking_variants(
+pub fn plan_with_order_breaking_variants(
mut sort_input: OrderPreservationContext,
) -> Result<OrderPreservationContext> {
let plan = &sort_input.plan;
@@ -202,10 +202,13 @@ fn plan_with_order_breaking_variants(
let partitioning = plan.output_partitioning().clone();
sort_input.plan = Arc::new(RepartitionExec::try_new(child,
partitioning)?) as _;
} else if is_sort_preserving_merge(plan) {
- // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`:
+ // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`
+ // SPM may have `fetch`, so pass it to the `CoalescePartitionsExec`
let child = Arc::clone(&sort_input.children[0].plan);
- let coalesce = CoalescePartitionsExec::new(child);
- sort_input.plan = Arc::new(coalesce) as _;
+ let coalesce = CoalescePartitionsExec::new(child)
+ .with_fetch(plan.fetch())
+ .unwrap();
+ sort_input.plan = coalesce;
} else {
return sort_input.update_plan_from_children();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]