This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e1bef86240 Minor: include `sort` expressions in
`SortPreservingRepartitionExec` explain plan (#7796)
e1bef86240 is described below
commit e1bef86240a99bd9a1032a63833b255922065fd3
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Oct 13 16:42:20 2023 -0400
Minor: include `sort` expressions in `SortPreservingRepartitionExec`
explain plan (#7796)
* Minor: include sort expressions in SortPreservingRepartitionExec plan
* update plan in new test
---
.../src/physical_optimizer/enforce_distribution.rs | 41 ++++++++++++----------
.../core/src/physical_optimizer/enforce_sorting.rs | 27 ++++++++------
.../replace_with_order_preserving_variants.rs | 41 +++++++++++++---------
datafusion/physical-expr/src/sort_expr.rs | 21 +++++++++++
datafusion/physical-plan/src/memory.rs | 7 ++--
datafusion/physical-plan/src/repartition/mod.rs | 29 ++++++++++++---
datafusion/physical-plan/src/sorts/sort.rs | 7 ++--
.../src/sorts/sort_preserving_merge.rs | 7 ++--
datafusion/sqllogictest/test_files/window.slt | 8 ++---
9 files changed, 124 insertions(+), 64 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a1f509d287..9be566f10a 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1738,8 +1738,11 @@ mod tests {
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
- let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
- write!(f, "SortRequiredExec: [{}]", expr.join(","))
+ write!(
+ f,
+ "SortRequiredExec: [{}]",
+ PhysicalSortExpr::format_list(&self.expr)
+ )
}
}
@@ -3056,16 +3059,16 @@ mod tests {
vec![
top_join_plan.as_str(),
join_plan.as_str(),
- "SortPreservingRepartitionExec:
partitioning=Hash([a@0], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortPreservingRepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortPreservingRepartitionExec:
partitioning=Hash([c@2], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
@@ -3082,21 +3085,21 @@ mod tests {
_ => vec![
top_join_plan.as_str(),
// Below 4 operators are differences introduced, when join
mode is changed
- "SortPreservingRepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
+ "SortPreservingRepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
- "SortPreservingRepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
+ "SortPreservingRepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e]",
- "SortPreservingRepartitionExec: partitioning=Hash([b1@1],
10), input_partitions=10",
+ "SortPreservingRepartitionExec: partitioning=Hash([b1@1],
10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1,
d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e]",
- "SortPreservingRepartitionExec: partitioning=Hash([c@2],
10), input_partitions=10",
+ "SortPreservingRepartitionExec: partitioning=Hash([c@2],
10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e]",
@@ -3170,16 +3173,16 @@ mod tests {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
- "SortPreservingRepartitionExec:
partitioning=Hash([a@0], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortPreservingRepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortPreservingRepartitionExec:
partitioning=Hash([c@2], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
@@ -3187,21 +3190,21 @@ mod tests {
// Should include 8 RepartitionExecs (4 of them
preserves order) and 4 SortExecs
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
- "SortPreservingRepartitionExec:
partitioning=Hash([b1@6], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([b1@6], 10), input_partitions=10, sort_exprs=b1@6 ASC",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@6 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
- "SortPreservingRepartitionExec:
partitioning=Hash([a@0], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortPreservingRepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortPreservingRepartitionExec:
partitioning=Hash([c@2], 10), input_partitions=10",
+ "SortPreservingRepartitionExec:
partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e]",
@@ -3292,7 +3295,7 @@ mod tests {
let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
- "SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0],
10), input_partitions=10",
+ "SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0],
10), input_partitions=10, sort_exprs=b3@1 ASC,a3@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[b3@1 ASC,a3@0 ASC]",
"CoalescePartitionsExec",
@@ -3303,7 +3306,7 @@ mod tests {
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e]",
- "SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0],
10), input_partitions=10",
+ "SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0],
10), input_partitions=10, sort_exprs=b2@1 ASC,a2@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"SortExec: expr=[b2@1 ASC,a2@0 ASC]",
"CoalescePartitionsExec",
@@ -4382,7 +4385,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
- "SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ "SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, sort_exprs=c@2 ASC",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC]",
];
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index a381bbb501..92db3bbd05 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -2156,15 +2156,19 @@ mod tests {
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)],
coalesce_partitions);
- let expected_input = ["SortExec: expr=[a@0 ASC]",
+ let expected_input = [
+ "SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
- let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC]",
- " SortPreservingRepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+ ];
+ let expected_optimized = [
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+ ];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -2186,11 +2190,14 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
- let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC]",
- " SortPreservingRepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+ ];
+ let expected_optimized = [
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+ ];
assert_optimized!(expected_input, expected_optimized, physical_plan,
false);
Ok(())
}
@@ -2259,7 +2266,7 @@ mod tests {
let expected_input = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }], mode=[Sorted]",
" SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
- " SortPreservingRepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=10",
+ " SortPreservingRepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=10, sort_exprs=a@0 ASC,b@1
ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" SortExec: expr=[a@0 ASC,b@1 ASC]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], has_header=false",
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index ede95fc677..f4b3608d00 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -368,7 +368,7 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -412,10 +412,10 @@ mod tests {
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
" FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" SortPreservingMergeExec: [a@0 ASC]",
- " SortPreservingRepartitionExec:
partitioning=Hash([c1@0], 8), input_partitions=8",
+ " SortPreservingRepartitionExec:
partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC],
has_header=true",
];
@@ -442,11 +442,14 @@ mod tests {
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
- let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ let expected_optimized = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+
+ ];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
@@ -475,7 +478,7 @@ mod tests {
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -509,7 +512,7 @@ mod tests {
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
@@ -561,21 +564,25 @@ mod tests {
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
- let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ let expected_input = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
- let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"
+ ];
+ let expected_optimized = [
+ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ ];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
@@ -627,7 +634,7 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -671,7 +678,7 @@ mod tests {
let expected_optimized = [
"SortPreservingMergeExec: [c@2 ASC]",
" FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=c@2 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" SortExec: expr=[c@2 ASC]",
" CoalescePartitionsExec",
@@ -756,7 +763,7 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
diff --git a/datafusion/physical-expr/src/sort_expr.rs
b/datafusion/physical-expr/src/sort_expr.rs
index 83d32dfeec..74179ba594 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -17,6 +17,7 @@
//! Sort expressions
+use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
@@ -89,6 +90,26 @@ impl PhysicalSortExpr {
.options
.map_or(true, |opts| self.options == opts)
}
+
+ /// Returns a [`Display`]able list of `PhysicalSortExpr`.
+ pub fn format_list(input: &[PhysicalSortExpr]) -> impl Display + '_ {
+ struct DisplayableList<'a>(&'a [PhysicalSortExpr]);
+ impl<'a> Display for DisplayableList<'a> {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let mut first = true;
+ for sort_expr in self.0 {
+ if first {
+ first = false;
+ } else {
+ write!(f, ",")?;
+ }
+ write!(f, "{}", sort_expr)?;
+ }
+ Ok(())
+ }
+ }
+ DisplayableList(input)
+ }
}
/// Represents sort requirement associated with a plan
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index 4c83ff1528..d919ded8d0 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -77,9 +77,10 @@ impl DisplayAs for MemoryExec {
.sort_information
.first()
.map(|output_ordering| {
- let order_strings: Vec<_> =
- output_ordering.iter().map(|e|
e.to_string()).collect();
- format!(", output_ordering={}",
order_strings.join(","))
+ format!(
+ ", output_ordering={}",
+ PhysicalSortExpr::format_list(output_ordering)
+ )
})
.unwrap_or_default();
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 14b54dc061..bcb9c3afee 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -308,7 +308,8 @@ pub struct RepartitionExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
- /// Boolean flag to decide whether to preserve ordering
+ /// Boolean flag to decide whether to preserve ordering. If true means
+ /// `SortPreservingRepartitionExec`, false means `RepartitionExec`.
preserve_order: bool,
}
@@ -370,7 +371,7 @@ impl RepartitionExec {
self.preserve_order
}
- /// Get name of the Executor
+ /// Get name used to display this Exec
pub fn name(&self) -> &str {
if self.preserve_order {
"SortPreservingRepartitionExec"
@@ -394,7 +395,16 @@ impl DisplayAs for RepartitionExec {
self.name(),
self.partitioning,
self.input.output_partitioning().partition_count()
- )
+ )?;
+
+ if let Some(sort_exprs) = self.sort_exprs() {
+ write!(
+ f,
+ ", sort_exprs={}",
+ PhysicalSortExpr::format_list(sort_exprs)
+ )?;
+ }
+ Ok(())
}
}
}
@@ -576,8 +586,8 @@ impl ExecutionPlan for RepartitionExec {
.collect::<Vec<_>>();
// Note that receiver size (`rx.len()`) and `num_input_partitions`
are same.
- // Get existing ordering:
- let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
+ // Get existing ordering to use for merging
+ let sort_exprs = self.sort_exprs().unwrap_or(&[]);
// Merge streams (while preserving ordering) coming from
// input partitions to this partition:
@@ -646,6 +656,15 @@ impl RepartitionExec {
self
}
+ /// Return the sort expressions that are used to merge
+ fn sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
+ if self.preserve_order {
+ self.input.output_ordering()
+ } else {
+ None
+ }
+ }
+
/// Pulls data from the specified input plan, feeding it to the
/// output partitions based on the desired partitioning
///
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index a56f8fec68..ffc4ef9dc3 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -763,17 +763,16 @@ impl DisplayAs for SortExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
+ let expr = PhysicalSortExpr::format_list(&self.expr);
match self.fetch {
Some(fetch) => {
write!(
f,
// TODO should this say topk?
- "SortExec: fetch={fetch}, expr=[{}]",
- expr.join(",")
+ "SortExec: fetch={fetch}, expr=[{expr}]",
)
}
- None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
+ None => write!(f, "SortExec: expr=[{expr}]"),
}
}
}
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 5b485e0b68..597b59f776 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -118,8 +118,11 @@ impl DisplayAs for SortPreservingMergeExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
- write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?;
+ write!(
+ f,
+ "SortPreservingMergeExec: [{}]",
+ PhysicalSortExpr::format_list(&self.expr)
+ )?;
if let Some(fetch) = self.fetch {
write!(f, ", fetch={fetch}")?;
};
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 5fb5a04c67..80e496a336 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3245,17 +3245,17 @@ physical_plan
ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d]
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), f [...]
----CoalesceBatchesExec: target_batch_size=4096
-------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2),
input_partitions=2
+------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2),
input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b ASC NULLS LAST,c ASC NULLS
LAST
--------ProjectionExec: expr=[a@0 as a, d@3 as d,
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
SUM(annotated_data_infinit [...]
----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, nullab [...]
------------CoalesceBatchesExec: target_batch_size=4096
---------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2),
input_partitions=2
+--------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2),
input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC
NULLS LAST
----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a)
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW", data_type: Int64, [...]
------------------CoalesceBatchesExec: target_batch_size=4096
---------------------SortPreservingRepartitionExec: partitioning=Hash([a@0,
d@3], 2), input_partitions=2
+--------------------SortPreservingRepartitionExec: partitioning=Hash([a@0,
d@3], 2), input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS
LAST,c@2 ASC NULLS LAST
----------------------BoundedWindowAggExec:
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a,
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: I [...]
------------------------CoalesceBatchesExec: target_batch_size=4096
---------------------------SortPreservingRepartitionExec:
partitioning=Hash([a@0, b@1], 2), input_partitions=2
+--------------------------SortPreservingRepartitionExec:
partitioning=Hash([a@0, b@1], 2), input_partitions=2, sort_exprs=a@0 ASC NULLS
LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST
----------------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true