This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 19988a8957 with preserve order now receives argument (#7231)
19988a8957 is described below
commit 19988a89577d186bc2ed4e803d11e4c62367ca6c
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Aug 8 23:27:06 2023 +0300
with preserve order now receives argument (#7231)
---
.../replace_with_order_preserving_variants.rs | 2 +-
datafusion/core/src/physical_plan/repartition/mod.rs | 12 +++++-------
.../tests/fuzz_cases/sort_preserving_repartition_fuzz.rs | 4 ++--
3 files changed, 8 insertions(+), 10 deletions(-)
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 7d45c171b7..a7f0c88884 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -178,7 +178,7 @@ fn get_updated_plan(
let child = plan.children()[0].clone();
plan = Arc::new(
RepartitionExec::try_new(child, plan.output_partitioning())?
- .with_preserve_order(),
+ .with_preserve_order(true),
) as _
}
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index c47c992681..99b72a1b40 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -419,11 +419,9 @@ impl ExecutionPlan for RepartitionExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let mut repartition =
- RepartitionExec::try_new(children[0].clone(),
self.partitioning.clone())?;
- if self.preserve_order {
- repartition = repartition.with_preserve_order();
- }
+ let repartition =
+ RepartitionExec::try_new(children[0].clone(),
self.partitioning.clone())?
+ .with_preserve_order(self.preserve_order);
Ok(Arc::new(repartition))
}
@@ -625,8 +623,8 @@ impl RepartitionExec {
}
/// Set Order preserving flag
- pub fn with_preserve_order(mut self) -> Self {
- self.preserve_order = true;
+ pub fn with_preserve_order(mut self, preserve_order: bool) -> Self {
+ self.preserve_order = preserve_order;
self
}
diff --git
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index d0ce58f6e6..6b3a633f3c 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -140,7 +140,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
.unwrap()
- .with_preserve_order(),
+ .with_preserve_order(true),
)
}
@@ -159,7 +159,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::Hash(hash_expr, 2))
.unwrap()
- .with_preserve_order(),
+ .with_preserve_order(true),
)
}