This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9a6b76e35 Minor: Add fetch to SortExec display (#5279)
9a6b76e35 is described below
commit 9a6b76e3506151ec8fa12a8adfe559b4dc767208
Author: Dan Harris <[email protected]>
AuthorDate: Tue Feb 14 11:25:43 2023 -0500
Minor: Add fetch to SortExec display (#5279)
* Add fetch to SortExec display
* fix sqllogictests
---
.../src/physical_optimizer/dist_enforcement.rs | 32 +++----
.../core/src/physical_optimizer/repartition.rs | 18 ++--
.../src/physical_optimizer/sort_enforcement.rs | 98 +++++++++++-----------
datafusion/core/src/physical_plan/sorts/sort.rs | 7 +-
datafusion/core/tests/sql/explain_analyze.rs | 2 +-
datafusion/core/tests/sql/joins.rs | 24 +++---
datafusion/core/tests/sql/window.rs | 62 +++++++-------
.../core/tests/sqllogictests/test_files/window.slt | 6 +-
8 files changed, 127 insertions(+), 122 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 0dbba2c31..ae56b70ba 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1925,31 +1925,31 @@ mod tests {
vec![
top_join_plan.as_str(),
join_plan.as_str(),
- "SortExec: [a@0 ASC]",
+ "SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortExec: [b1@1 ASC]",
+ "SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortExec: [c@2 ASC]",
+ "SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ => vec![
top_join_plan.as_str(),
- "SortExec: [a@0 ASC]",
+ "SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"a\", index: 0 }], 10), input_partitions=10",
join_plan.as_str(),
- "SortExec: [a@0 ASC]",
+ "SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortExec: [b1@1 ASC]",
+ "SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortExec: [c@2 ASC]",
+ "SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
@@ -1978,31 +1978,31 @@ mod tests {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
- "SortExec: [a@0 ASC]",
+ "SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
- "SortExec: [b1@1 ASC]",
+ "SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
- "SortExec: [c@2 ASC]",
+ "SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs and 4 SortExecs
_ => vec![
top_join_plan.as_str(),
- "SortExec: [b1@6 ASC]",
+ "SortExec: expr=[b1@6 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 6 }], 10), input_partitions=10",
join_plan.as_str(),
- "SortExec: [a@0 ASC]",
+ "SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
- "SortExec: [b1@1 ASC]",
+ "SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
- "SortExec: [c@2 ASC]",
+ "SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10), input_partitions=1",
"ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
@@ -2065,14 +2065,14 @@ mod tests {
// Only two RepartitionExecs added
let expected = &[
"SortMergeJoin: join_type=Inner, on=[(Column { name: \"b3\",
index: 1 }, Column { name: \"b2\", index: 1 }), (Column { name: \"a3\", index:
0 }, Column { name: \"a2\", index: 0 })]",
- "SortExec: [b3@1 ASC,a3@0 ASC]",
+ "SortExec: expr=[b3@1 ASC,a3@0 ASC]",
"ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as
a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index:
0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
- "SortExec: [b2@1 ASC,a2@0 ASC]",
+ "SortExec: expr=[b2@1 ASC,a2@0 ASC]",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index 1026defd3..b43b4f208 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -607,7 +607,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
@@ -625,7 +625,7 @@ mod tests {
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be
wrong
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
@@ -713,7 +713,7 @@ mod tests {
// need repartiton and resort as the data was not sorted correctly
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
@@ -813,7 +813,7 @@ mod tests {
// needs to repartition / sort as the data was not sorted correctly
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
@@ -846,7 +846,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
@@ -863,7 +863,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit
from additional parallelism)
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
@@ -883,7 +883,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit
from additional parallelism)
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"FilterExec: c1@0",
// repartition is lowest down
@@ -950,7 +950,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
// Doesn't parallelize for SortExec without preserve_partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
@@ -969,7 +969,7 @@ mod tests {
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be
wrong
- "SortExec: [c1@0 ASC]",
+ "SortExec: expr=[c1@0 ASC]",
// SortExec doesn't benefit from input partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 8dfa4199b..70880b750 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -27,8 +27,8 @@
//! somehow get the fragment
//!
//! ```text
-//! SortExec: [nullable_col@0 ASC]
-//! SortExec: [non_nullable_col@1 ASC]
+//! SortExec: expr=[nullable_col@0 ASC]
+//! SortExec: expr=[non_nullable_col@1 ASC]
//! ```
//!
//! in the physical plan. The first sort is unnecessary since its result is
overwritten
@@ -343,12 +343,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
}
/// This function turns plans of the form
-/// "SortExec: [a@0 ASC]",
+/// "SortExec: expr=[a@0 ASC]",
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
/// to
/// "SortPreservingMergeExec: [a@0 ASC]",
-/// " SortExec: [a@0 ASC]",
+/// " SortExec: expr=[a@0 ASC]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
/// by following connections from `CoalescePartitionsExec`s to `SortExec`s.
/// By performing sorting in parallel, we can increase performance in some
scenarios.
@@ -1053,12 +1053,12 @@ mod tests {
let physical_plan = sort_exec(vec![sort_expr("nullable_col",
&schema)], input);
let expected_input = vec![
- "SortExec: [nullable_col@0 ASC]",
- " SortExec: [non_nullable_col@1 ASC]",
+ "SortExec: expr=[nullable_col@0 ASC]",
+ " SortExec: expr=[non_nullable_col@1 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
- "SortExec: [nullable_col@0 ASC]",
+ "SortExec: expr=[nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1107,9 +1107,9 @@ mod tests {
let expected_input = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound:
CurrentRow }]",
" FilterExec: NOT non_nullable_col@1",
- " SortExec: [non_nullable_col@1 ASC NULLS LAST]",
+ " SortExec: expr=[non_nullable_col@1 ASC NULLS LAST]",
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }]",
- " SortExec: [non_nullable_col@1 DESC]",
+ " SortExec: expr=[non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
@@ -1117,7 +1117,7 @@ mod tests {
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound:
Following(NULL) }]",
" FilterExec: NOT non_nullable_col@1",
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }]",
- " SortExec: [non_nullable_col@1 DESC]",
+ " SortExec: expr=[non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1139,7 +1139,7 @@ mod tests {
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1159,15 +1159,15 @@ mod tests {
let physical_plan = sort_preserving_merge_exec(sort_exprs, sort);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1196,11 +1196,11 @@ mod tests {
let expected_input = vec![
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0
ASC,non_nullable_col@1 ASC]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1
ASC]",
" SortPreservingMergeExec: [non_nullable_col@1 ASC]",
- " SortExec: [non_nullable_col@1 ASC]",
+ " SortExec: expr=[non_nullable_col@1 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
@@ -1237,10 +1237,10 @@ mod tests {
let expected_input = vec![
"AggregateExec: mode=Final, gby=[], aggr=[]",
" SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" SortPreservingMergeExec: [non_nullable_col@1 ASC]",
- " SortExec: [non_nullable_col@1 ASC]",
+ " SortExec: expr=[non_nullable_col@1 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
@@ -1281,20 +1281,20 @@ mod tests {
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1
ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
// We should keep the bottom `SortExec`.
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
" GlobalLimitExec: skip=0, fetch=100",
" LocalLimitExec: fetch=100",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1
ASC]",
" ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1313,12 +1313,12 @@ mod tests {
let physical_plan = sort_preserving_merge_exec(sort_exprs, sort);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1343,7 +1343,7 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
// should not add a sort at the output of the union, input plan should
not be changed
@@ -1374,7 +1374,7 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
// should not add a sort at the output of the union, input plan should
not be changed
@@ -1408,13 +1408,13 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
// should remove unnecessary sorting from below and move it to top
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
@@ -1448,20 +1448,20 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
// should adjust sorting in the first input of the union such that it
is not unnecessarily fine
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1493,15 +1493,15 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
" UnionExec",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
@@ -1544,17 +1544,17 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
- " SortExec: [nullable_col@0 ASC,non_nullable_col@1 DESC NULLS
LAST]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC
NULLS LAST]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1590,7 +1590,7 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
" SortPreservingMergeExec: [nullable_col@0
ASC,non_nullable_col@1 ASC]",
@@ -1602,11 +1602,11 @@ mod tests {
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
@@ -1647,9 +1647,9 @@ mod tests {
let expected_input = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound:
CurrentRow }]",
" UnionExec",
- " SortExec: [nullable_col@0 DESC NULLS LAST]",
+ " SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
- " SortExec: [nullable_col@0 DESC NULLS LAST]",
+ " SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
];
let expected_optimized = vec![
@@ -1683,7 +1683,7 @@ mod tests {
// we should be able to parallelize Sorting also (given that executors
in between don't require)
// single partition.
let expected_input = vec![
- "SortExec: [nullable_col@0 ASC]",
+ "SortExec: expr=[nullable_col@0 ASC]",
" FilterExec: NOT non_nullable_col@1",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
@@ -1691,7 +1691,7 @@ mod tests {
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" FilterExec: NOT non_nullable_col@1",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
@@ -1771,9 +1771,9 @@ mod tests {
// Sort Parallelize rule should end Coalesce + Sort linkage when Sort
is Global Sort
// Also input plan is not valid as it is. We need to add SortExec
before SortPreservingMergeExec.
let expected_input = vec![
- "SortExec: [nullable_col@0 ASC]",
+ "SortExec: expr=[nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
@@ -1781,7 +1781,7 @@ mod tests {
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: expr=[nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
" MemoryExec: partitions=0, partition_sizes=[]",
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index e3263747f..9bacf9fad 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -793,7 +793,12 @@ impl ExecutionPlan for SortExec {
match t {
DisplayFormatType::Default => {
let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
- write!(f, "SortExec: [{}]", expr.join(","))
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "SortExec: fetch={fetch}, expr=[{}]",
expr.join(","))
+ }
+ None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
+ }
}
}
}
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 0f8dfb608..99f5bd2d6 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -600,7 +600,7 @@ async fn test_physical_plan_display_indent() {
let expected = vec![
"GlobalLimitExec: skip=0, fetch=10",
" SortPreservingMergeExec: [the_min@2 DESC]",
- " SortExec: [the_min@2 DESC]",
+ " SortExec: fetch=10, expr=[the_min@2 DESC]",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
diff --git a/datafusion/core/tests/sql/joins.rs
b/datafusion/core/tests/sql/joins.rs
index bcb90d451..3c0aa8b3f 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1881,12 +1881,12 @@ async fn sort_merge_join_on_date32() -> Result<()> {
let expected = vec![
"ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4,
c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]",
" SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index:
0 }, Column { name: \"c1\", index: 0 })]",
- " SortExec: [c1@0 ASC]",
+ " SortExec: expr=[c1@0 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 0 }], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
- " SortExec: [c1@0 ASC]",
+ " SortExec: expr=[c1@0 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 0 }], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
@@ -1927,13 +1927,13 @@ async fn sort_merge_join_on_decimal() -> Result<()> {
"ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4,
c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as
c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]",
" SortMergeJoin: join_type=Right, on=[(Column { name: \"CAST(t1.c3
AS Decimal128(10, 2))\", index: 4 }, Column { name: \"c3\", index: 2 })]",
- " SortExec: [CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
+ " SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name:
\"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2), input_partitions=2",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3,
c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10,
2))]",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[1]",
- " SortExec: [c3@2 ASC]",
+ " SortExec: expr=[c3@2 ASC]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c3\",
index: 2 }], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
@@ -1981,7 +1981,7 @@ async fn left_semi_join() -> Result<()> {
let expected = if repartition_joins {
vec![
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: [t1_id@0 ASC NULLS LAST]",
+ " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=LeftSemi,
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0
})]",
@@ -1997,7 +1997,7 @@ async fn left_semi_join() -> Result<()> {
]
} else {
vec![
- "SortExec: [t1_id@0 ASC NULLS LAST]",
+ "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi,
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0
})]",
@@ -2061,7 +2061,7 @@ async fn left_semi_join() -> Result<()> {
let expected = if repartition_joins {
vec![
"SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: [t1_id@0 ASC NULLS LAST]",
+ " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned, join_type=LeftSemi,
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0
})]",
@@ -2076,7 +2076,7 @@ async fn left_semi_join() -> Result<()> {
]
} else {
vec![
- "SortExec: [t1_id@0 ASC NULLS LAST]",
+ "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=LeftSemi,
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0
})]",
@@ -2256,7 +2256,7 @@ async fn right_semi_join() -> Result<()> {
let physical_plan = dataframe.create_physical_plan().await?;
let expected = if repartition_joins {
vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: [t1_id@0 ASC NULLS LAST]",
+ " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned,
join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name:
\"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\",
index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
@@ -2271,7 +2271,7 @@ async fn right_semi_join() -> Result<()> {
]
} else {
vec![
- "SortExec: [t1_id@0 ASC NULLS LAST]",
+ "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi,
on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0
})], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op:
NotEq, right: Column { name: \"t1_name\", index: 0 } }",
@@ -2302,7 +2302,7 @@ async fn right_semi_join() -> Result<()> {
let physical_plan = dataframe.create_physical_plan().await?;
let expected = if repartition_joins {
vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
- " SortExec: [t1_id@0 ASC NULLS LAST]",
+ " SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=Partitioned,
join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name:
\"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\",
index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
@@ -2317,7 +2317,7 @@ async fn right_semi_join() -> Result<()> {
]
} else {
vec![
- "SortExec: [t1_id@0 ASC NULLS LAST]",
+ "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name, t1_int@2 as t1_int]",
" CoalesceBatchesExec: target_batch_size=4096",
" HashJoinExec: mode=CollectLeft, join_type=RightSemi,
on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0
})], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op:
NotEq, right: Column { name: \"t1_name\", index: 1 } }",
diff --git a/datafusion/core/tests/sql/window.rs
b/datafusion/core/tests/sql/window.rs
index 3fc5b9569..99b9743f0 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1299,7 +1299,7 @@ async fn test_window_agg_sort() -> Result<()> {
"ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum2]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
+ " SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
]
};
@@ -1331,7 +1331,7 @@ async fn
over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }]",
" BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9):
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9):
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+ " SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
]
};
@@ -1359,13 +1359,13 @@ async fn
over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>
// 3 SortExec are added
let expected = {
vec![
- "SortExec: [c2@0 ASC NULLS LAST]",
+ "SortExec: expr=[c2@0 ASC NULLS LAST]",
" ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9),
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNB [...]
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }]",
" BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9):
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
+ " SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
" BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9):
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+ " SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS
LAST]",
]
};
@@ -1401,11 +1401,11 @@ async fn test_window_partition_by_order_by() ->
Result<()> {
vec![
"ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1))]",
" BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name:
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
- " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ " SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4):
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}]",
- " SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ " SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS
LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2),
input_partitions=2",
" RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1",
@@ -1444,7 +1444,7 @@ async fn test_window_agg_sort_reversed_plan() ->
Result<()> {
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1))
}]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: [c9@0 DESC]",
+ " SortExec: expr=[c9@0 DESC]",
]
};
@@ -1499,7 +1499,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() ->
Result<()> {
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name:
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1))
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name:
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32,
nullable: true, dict_id [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name:
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name:
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32,
nullable: true, dict_ [...]
- " SortExec: [c9@0 DESC]",
+ " SortExec: expr=[c9@0 DESC]",
]
};
@@ -1549,9 +1549,9 @@ async fn test_window_agg_sort_non_reversed_plan() ->
Result<()> {
"ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@0 ASC NULLS LAST]",
+ " SortExec: expr=[c9@0 ASC NULLS LAST]",
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
- " SortExec: [c9@0 DESC]"
+ " SortExec: expr=[c9@0 DESC]"
]
};
@@ -1602,10 +1602,10 @@ async fn
test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
"ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9)
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS
LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5
FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9
DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1
PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: [c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC
NULLS LAST]",
+ " SortExec: expr=[c9@2 ASC NULLS LAST,c1@0 ASC NULLS
LAST,c2@1 ASC NULLS LAST]",
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: [c9@2 DESC,c1@0 DESC]",
+ " SortExec: expr=[c9@2 DESC,c1@0 DESC]",
]
};
@@ -1690,15 +1690,15 @@ async fn test_window_agg_complex_plan() -> Result<()> {
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) },
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_bound: Preced [...]
" BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field {
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+ " SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
" BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@2 ASC NULLS LAST,c1@0 ASC]",
+ " SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field {
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
" WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) },
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Range, start_b [...]
" WindowAggExec: wdw=[SUM(null_cases.c1):
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(10)), end_bound:
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start [...]
" WindowAggExec: wdw=[SUM(null_cases.c1):
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(10)), end_bound:
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name:
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, sta
[...]
" BoundedWindowAggExec:
wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)),
end_bound: CurrentRow }]",
- " SortExec: [c3@2 DESC,c1@0 ASC NULLS
LAST]",
+ " SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS
LAST]",
]
};
@@ -1738,7 +1738,7 @@ async fn
test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+ " SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]",
]
};
@@ -1792,7 +1792,7 @@ async fn test_window_agg_sort_partitionby_reversed_plan()
-> Result<()> {
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1))
}]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
- " SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+ " SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]",
]
};
@@ -1845,7 +1845,7 @@ async fn
test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
" GlobalLimitExec: skip=0, fetch=5",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST,
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ord [...]
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST,
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
\"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 +
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST,
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBO [...]
- " SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0
ASC NULLS LAST]",
+ " SortExec: expr=[CAST(c3@1 AS Int16) + c4@2 DESC,c9@3
DESC,c2@0 ASC NULLS LAST]",
]
};
@@ -1956,7 +1956,7 @@ async fn
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
" GlobalLimitExec: skip=0, fetch=5",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
- " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
+ " SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
]
};
@@ -2005,7 +2005,7 @@ async fn test_window_agg_global_sort() -> Result<()> {
"SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
" ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@1 as rn1]",
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
- " SortExec: [c1@0 ASC NULLS LAST]",
+ " SortExec: expr=[c1@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
@@ -2040,11 +2040,11 @@ async fn
test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
// Only 1 SortExec was added
let expected = {
vec![
- "SortExec: [c1@0 ASC NULLS LAST]",
+ "SortExec: expr=[c1@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@1 as rn1]",
" BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
- " SortExec: [c1@0 ASC NULLS LAST]",
+ " SortExec: expr=[c1@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
@@ -2082,13 +2082,13 @@ async fn
test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()>
// Only 1 SortExec was added
let expected = {
vec![
- "SortExec: [c1@0 ASC NULLS LAST]",
+ "SortExec: expr=[c1@0 ASC NULLS LAST]",
" ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1,
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5))
}]",
" SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
- " SortExec: [c9@1 ASC NULLS LAST]",
+ " SortExec: expr=[c9@1 ASC NULLS LAST]",
" BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9):
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3))
}]",
- " SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+ " SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS
LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }], 2), input_partitions=2",
" RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=1",
@@ -2126,7 +2126,7 @@ async fn test_window_agg_with_global_limit() ->
Result<()> {
" AggregateExec: mode=Final, gby=[],
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
" AggregateExec: mode=Partial, gby=[],
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
" GlobalLimitExec: skip=0, fetch=1",
- " SortExec: [c13@0 ASC NULLS LAST]",
+ " SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]",
" ProjectionExec: expr=[c13@0 as c13]",
]
};
@@ -2320,7 +2320,7 @@ mod tests {
vec![
"ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2
as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7
as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1,
sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2,
minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3,
cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]",
" GlobalLimitExec: skip=0, fetch=5",
- " SortExec: [inc_col@24 DESC]",
+ " SortExec: fetch=5, expr=[inc_col@24 DESC]",
" ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING@14 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts
ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@15 as sum2,
SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as sum3, MIN(annotated_data.inc_col)
ORDER BY [annotated_data.ts ASC NULLS L [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.desc_col): Ok(Field { name:
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) },
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) },
SUM(annotated_data.desc_col): Ok(Field { name:
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), [...]
@@ -2395,7 +2395,7 @@ mod tests {
vec![
"ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as
lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2,
rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1,
dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as
lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1,
lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1,
leadr2@23 as leadr2]",
" GlobalLimitExec: skip=0, fetch=5",
- " SortExec: [ts@24 DESC]",
+ " SortExec: fetch=5, expr=[ts@24 DESC]",
" ProjectionExec:
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1,
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2,
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1,
LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1))
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_order [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10))
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ord [...]
@@ -2454,7 +2454,7 @@ mod tests {
vec![
"ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2
as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1,
count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]",
" GlobalLimitExec: skip=0, fetch=5",
- " SortExec: [inc_col@10 ASC NULLS LAST]",
+ " SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]",
" ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5
FOLLOWING@7 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts
DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2,
MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1,
MIN(annotated_data.inc_col) ORDER BY [annotate [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) },
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), fr [...]
" BoundedWindowAggExec:
wdw=[SUM(annotated_data.inc_col): Ok(Field { name:
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) },
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), [...]
@@ -2508,7 +2508,7 @@ mod tests {
vec![
"ProjectionExec: expr=[first_value1@0 as first_value1,
first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as
last_value2, nth_value1@4 as nth_value1]",
" GlobalLimitExec: skip=0, fetch=5",
- " SortExec: [inc_col@5 ASC NULLS LAST]",
+ " SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]",
" ProjectionExec:
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1,
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS
FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2,
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST]
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, LAS [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(1)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_orde [...]
" BoundedWindowAggExec:
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(3)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name:
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true,
dict_id: 0, dict_is_or [...]
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index d10946e5d..f155dc86a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -262,7 +262,7 @@ Sort: d.b ASC NULLS LAST
EmptyRelation
physical_plan
SortPreservingMergeExec: [b@0 ASC NULLS LAST]
- SortExec: [b@0 ASC NULLS LAST]
+ SortExec: expr=[b@0 ASC NULLS LAST]
ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a]
AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a)]
CoalesceBatchesExec: target_batch_size=8192
@@ -348,13 +348,13 @@ Sort: d.b ASC NULLS LAST
EmptyRelation
physical_plan
SortPreservingMergeExec: [b@0 ASC NULLS LAST]
- SortExec: [b@0 ASC NULLS LAST]
+ SortExec: expr=[b@0 ASC NULLS LAST]
ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as
MAX(d.seq)]
AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a),
MAX(d.seq)]
AggregateExec: mode=Partial, gby=[b@2 as b], aggr=[MAX(d.a),
MAX(d.seq)]
ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0
as a, b@1 as b]
BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name:
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
- SortExec: [b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
+ SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "b",
index: 1 }], 4), input_partitions=4
UnionExec