This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ffbc6896b0 [MINOR]: Remove unecessary orderings from the final plan
(#8289)
ffbc6896b0 is described below
commit ffbc6896b0f4f1b417991d1a13266be10c3f3709
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Nov 21 22:41:53 2023 +0300
[MINOR]: Remove unecessary orderings from the final plan (#8289)
* Remove lost orderings from the final plan
* Improve comments
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../core/src/physical_optimizer/enforce_sorting.rs | 4 +++-
datafusion/physical-plan/src/insert.rs | 23 ++++++++--------------
datafusion/sqllogictest/test_files/select.slt | 23 ++++++++++++++++++++++
3 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 2590948d3b..6fec74f608 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -476,7 +476,9 @@ fn ensure_sorting(
update_child_to_remove_unnecessary_sort(child,
sort_onwards, &plan)?;
}
}
- (None, None) => {}
+ (None, None) => {
+ update_child_to_remove_unnecessary_sort(child, sort_onwards,
&plan)?;
+ }
}
}
// For window expressions, we can remove some sorts when we can
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index 4eeb58974a..81cdfd753f 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -219,24 +219,17 @@ impl ExecutionPlan for FileSinkExec {
}
fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
- // The input order is either exlicitly set (such as by a ListingTable),
- // or require that the [FileSinkExec] gets the data in the order the
- // input produced it (otherwise the optimizer may chose to reorder
- // the input which could result in unintended / poor UX)
- //
- // More rationale:
- //
https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
- match &self.sort_order {
- Some(requirements) => vec![Some(requirements.clone())],
- None => vec![self
- .input
- .output_ordering()
- .map(PhysicalSortRequirement::from_sort_exprs)],
- }
+ // The required input ordering is set externally (e.g. by a
`ListingTable`).
+ // Otherwise, there is no specific requirement (i.e. `sort_expr` is
`None`).
+ vec![self.sort_order.as_ref().cloned()]
}
fn maintains_input_order(&self) -> Vec<bool> {
- vec![false]
+ // Maintains ordering in the sense that the written file will reflect
+ // the ordering of the input. For more context, see:
+ //
+ //
https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
+ vec![true]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index 98ea061c73..bb81c5a9a1 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1013,6 +1013,29 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC
NULLS LAST], has_header=true
+# When ordering lost during projection, we shouldn't keep the SortExec.
+# in the final physical plan.
+query TT
+EXPLAIN SELECT c2, COUNT(*)
+FROM (SELECT c2
+FROM aggregate_test_100
+ORDER BY c1, c2)
+GROUP BY c2;
+----
+logical_plan
+Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[COUNT(UInt8(1)) AS
COUNT(*)]]
+--Projection: aggregate_test_100.c2
+----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC
NULLS LAST
+------Projection: aggregate_test_100.c2, aggregate_test_100.c1
+--------TableScan: aggregate_test_100 projection=[c1, c2]
+physical_plan
+AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[COUNT(*)]
+--CoalesceBatchesExec: target_batch_size=8192
+----RepartitionExec: partitioning=Hash([c2@0], 2), input_partitions=2
+------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[COUNT(*)]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2],
has_header=true
+
statement ok
drop table annotated_data_finite2;