This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a6b76e35 Minor: Add fetch to SortExec display (#5279)
9a6b76e35 is described below

commit 9a6b76e3506151ec8fa12a8adfe559b4dc767208
Author: Dan Harris <[email protected]>
AuthorDate: Tue Feb 14 11:25:43 2023 -0500

    Minor: Add fetch to SortExec display (#5279)
    
    * Add fetch to SortExec display
    
    * fix sqllogictests
---
 .../src/physical_optimizer/dist_enforcement.rs     | 32 +++----
 .../core/src/physical_optimizer/repartition.rs     | 18 ++--
 .../src/physical_optimizer/sort_enforcement.rs     | 98 +++++++++++-----------
 datafusion/core/src/physical_plan/sorts/sort.rs    |  7 +-
 datafusion/core/tests/sql/explain_analyze.rs       |  2 +-
 datafusion/core/tests/sql/joins.rs                 | 24 +++---
 datafusion/core/tests/sql/window.rs                | 62 +++++++-------
 .../core/tests/sqllogictests/test_files/window.slt |  6 +-
 8 files changed, 127 insertions(+), 122 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs 
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 0dbba2c31..ae56b70ba 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -1925,31 +1925,31 @@ mod tests {
                     vec![
                         top_join_plan.as_str(),
                         join_plan.as_str(),
-                        "SortExec: [a@0 ASC]",
+                        "SortExec: expr=[a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"a\", index: 0 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                        "SortExec: [b1@1 ASC]",
+                        "SortExec: expr=[b1@1 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"b1\", index: 1 }], 10), input_partitions=1",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                        "SortExec: [c@2 ASC]",
+                        "SortExec: expr=[c@2 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"c\", index: 2 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                     ],
                 // Should include 4 RepartitionExecs
                 _ => vec![
                         top_join_plan.as_str(),
-                        "SortExec: [a@0 ASC]",
+                        "SortExec: expr=[a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"a\", index: 0 }], 10), input_partitions=10",
                         join_plan.as_str(),
-                        "SortExec: [a@0 ASC]",
+                        "SortExec: expr=[a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"a\", index: 0 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                        "SortExec: [b1@1 ASC]",
+                        "SortExec: expr=[b1@1 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"b1\", index: 1 }], 10), input_partitions=1",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                        "SortExec: [c@2 ASC]",
+                        "SortExec: expr=[c@2 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"c\", index: 2 }], 10), input_partitions=1",
                         "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 ],
@@ -1978,31 +1978,31 @@ mod tests {
                         JoinType::Inner | JoinType::Right => vec![
                             top_join_plan.as_str(),
                             join_plan.as_str(),
-                            "SortExec: [a@0 ASC]",
+                            "SortExec: expr=[a@0 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
-                            "SortExec: [b1@1 ASC]",
+                            "SortExec: expr=[b1@1 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
-                            "SortExec: [c@2 ASC]",
+                            "SortExec: expr=[c@2 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                         ],
                         // Should include 4 RepartitionExecs and 4 SortExecs
                         _ => vec![
                             top_join_plan.as_str(),
-                            "SortExec: [b1@6 ASC]",
+                            "SortExec: expr=[b1@6 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 6 }], 10), input_partitions=10",
                             join_plan.as_str(),
-                            "SortExec: [a@0 ASC]",
+                            "SortExec: expr=[a@0 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
-                            "SortExec: [b1@1 ASC]",
+                            "SortExec: expr=[b1@1 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10), input_partitions=1",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
-                            "SortExec: [c@2 ASC]",
+                            "SortExec: expr=[c@2 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10), input_partitions=1",
                             "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                         ],
@@ -2065,14 +2065,14 @@ mod tests {
         // Only two RepartitionExecs added
         let expected = &[
             "SortMergeJoin: join_type=Inner, on=[(Column { name: \"b3\", 
index: 1 }, Column { name: \"b2\", index: 1 }), (Column { name: \"a3\", index: 
0 }, Column { name: \"a2\", index: 0 })]",
-            "SortExec: [b3@1 ASC,a3@0 ASC]",
+            "SortExec: expr=[b3@1 ASC,a3@0 ASC]",
             "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
             "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
             "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as 
a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 
0 }, Column { name: \"a1\", index: 1 }], 10), input_partitions=1",
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-            "SortExec: [b2@1 ASC,a2@0 ASC]",
+            "SortExec: expr=[b2@1 ASC,a2@0 ASC]",
             "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
             "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
0 }, Column { name: \"a\", index: 1 }], 10), input_partitions=1",
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 1026defd3..b43b4f208 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -607,7 +607,7 @@ mod tests {
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // data is sorted so can't repartition here
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
@@ -625,7 +625,7 @@ mod tests {
             "FilterExec: c1@0",
             // data is sorted so can't repartition here even though
             // filter would benefit from parallelism, the answers might be 
wrong
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
@@ -713,7 +713,7 @@ mod tests {
         // need repartiton and resort as the data was not sorted correctly
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
@@ -813,7 +813,7 @@ mod tests {
         // needs to repartition / sort as the data was not sorted correctly
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "ProjectionExec: expr=[c1@0 as c1]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
@@ -846,7 +846,7 @@ mod tests {
 
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
@@ -863,7 +863,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             // Expect repartition on the input to the sort (as it can benefit 
from additional parallelism)
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
@@ -883,7 +883,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             // Expect repartition on the input to the sort (as it can benefit 
from additional parallelism)
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
             "FilterExec: c1@0",
             // repartition is lowest down
@@ -950,7 +950,7 @@ mod tests {
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // data is sorted so can't repartition here
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             // Doesn't parallelize for SortExec without preserve_partitioning
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
@@ -969,7 +969,7 @@ mod tests {
             "FilterExec: c1@0",
             // data is sorted so can't repartition here even though
             // filter would benefit from parallelism, the answers might be 
wrong
-            "SortExec: [c1@0 ASC]",
+            "SortExec: expr=[c1@0 ASC]",
             // SortExec doesn't benefit from input partitioning
             "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 8dfa4199b..70880b750 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -27,8 +27,8 @@
 //! somehow get the fragment
 //!
 //! ```text
-//! SortExec: [nullable_col@0 ASC]
-//!   SortExec: [non_nullable_col@1 ASC]
+//! SortExec: expr=[nullable_col@0 ASC]
+//!   SortExec: expr=[non_nullable_col@1 ASC]
 //! ```
 //!
 //! in the physical plan. The first sort is unnecessary since its result is 
overwritten
@@ -343,12 +343,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
 }
 
 /// This function turns plans of the form
-///      "SortExec: [a@0 ASC]",
+///      "SortExec: expr=[a@0 ASC]",
 ///      "  CoalescePartitionsExec",
 ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
 /// to
 ///      "SortPreservingMergeExec: [a@0 ASC]",
-///      "  SortExec: [a@0 ASC]",
+///      "  SortExec: expr=[a@0 ASC]",
 ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
 /// by following connections from `CoalescePartitionsExec`s to `SortExec`s.
 /// By performing sorting in parallel, we can increase performance in some 
scenarios.
@@ -1053,12 +1053,12 @@ mod tests {
         let physical_plan = sort_exec(vec![sort_expr("nullable_col", 
&schema)], input);
 
         let expected_input = vec![
-            "SortExec: [nullable_col@0 ASC]",
-            "  SortExec: [non_nullable_col@1 ASC]",
+            "SortExec: expr=[nullable_col@0 ASC]",
+            "  SortExec: expr=[non_nullable_col@1 ASC]",
             "    MemoryExec: partitions=0, partition_sizes=[]",
         ];
         let expected_optimized = vec![
-            "SortExec: [nullable_col@0 ASC]",
+            "SortExec: expr=[nullable_col@0 ASC]",
             "  MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1107,9 +1107,9 @@ mod tests {
         let expected_input = vec![
             "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: 
CurrentRow }]",
             "  FilterExec: NOT non_nullable_col@1",
-            "    SortExec: [non_nullable_col@1 ASC NULLS LAST]",
+            "    SortExec: expr=[non_nullable_col@1 ASC NULLS LAST]",
             "      WindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
-            "        SortExec: [non_nullable_col@1 DESC]",
+            "        SortExec: expr=[non_nullable_col@1 DESC]",
             "          MemoryExec: partitions=0, partition_sizes=[]",
         ];
 
@@ -1117,7 +1117,7 @@ mod tests {
             "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL) }]",
             "  FilterExec: NOT non_nullable_col@1",
             "    WindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
-            "      SortExec: [non_nullable_col@1 DESC]",
+            "      SortExec: expr=[non_nullable_col@1 DESC]",
             "        MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1139,7 +1139,7 @@ mod tests {
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  SortExec: [nullable_col@0 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC]",
             "    MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1159,15 +1159,15 @@ mod tests {
         let physical_plan = sort_preserving_merge_exec(sort_exprs, sort);
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  SortExec: [nullable_col@0 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC]",
             "    SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "      SortExec: [nullable_col@0 ASC]",
+            "      SortExec: expr=[nullable_col@0 ASC]",
             "        MemoryExec: partitions=0, partition_sizes=[]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1196,11 +1196,11 @@ mod tests {
         let expected_input = vec![
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
             "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
-            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
-            "            SortExec: [non_nullable_col@1 ASC]",
+            "            SortExec: expr=[non_nullable_col@1 ASC]",
             "              MemoryExec: partitions=0, partition_sizes=[]",
         ];
 
@@ -1237,10 +1237,10 @@ mod tests {
         let expected_input = vec![
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
-            "          SortExec: [non_nullable_col@1 ASC]",
+            "          SortExec: expr=[non_nullable_col@1 ASC]",
             "            MemoryExec: partitions=0, partition_sizes=[]",
         ];
 
@@ -1281,20 +1281,20 @@ mod tests {
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
             "      GlobalLimitExec: skip=0, fetch=100",
             "        LocalLimitExec: fetch=100",
-            "          SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "            ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
 
         // We should keep the bottom `SortExec`.
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
             "      UnionExec",
             "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
             "        GlobalLimitExec: skip=0, fetch=100",
             "          LocalLimitExec: fetch=100",
-            "            SortExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "            SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "              ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1313,12 +1313,12 @@ mod tests {
         let physical_plan = sort_preserving_merge_exec(sort_exprs, sort);
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: [nullable_col@0 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC]",
             "    MemoryExec: partitions=0, partition_sizes=[]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "    MemoryExec: partitions=0, partition_sizes=[]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1343,7 +1343,7 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         // should not add a sort at the output of the union, input plan should 
not be changed
@@ -1374,7 +1374,7 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         // should not add a sort at the output of the union, input plan should 
not be changed
@@ -1408,13 +1408,13 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "  UnionExec",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         // should remove unnecessary sorting from below and move it to top
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "    UnionExec",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
@@ -1448,20 +1448,20 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         // should adjust sorting in the first input of the union such that it 
is not unnecessarily fine
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1493,15 +1493,15 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
-            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "    UnionExec",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
@@ -1544,17 +1544,17 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 DESC NULLS 
LAST]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 DESC 
NULLS LAST]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -1590,7 +1590,7 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
             "    SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
@@ -1602,11 +1602,11 @@ mod tests {
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
             "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
             "    SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "      SortExec: [nullable_col@0 ASC]",
+            "      SortExec: expr=[nullable_col@0 ASC]",
             "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "          ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
         ];
@@ -1647,9 +1647,9 @@ mod tests {
         let expected_input = vec![
             "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: 
CurrentRow }]",
             "  UnionExec",
-            "    SortExec: [nullable_col@0 DESC NULLS LAST]",
+            "    SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
-            "    SortExec: [nullable_col@0 DESC NULLS LAST]",
+            "    SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
         ];
         let expected_optimized = vec![
@@ -1683,7 +1683,7 @@ mod tests {
         // we should be able to parallelize Sorting also (given that executors 
in between don't require)
         // single partition.
         let expected_input = vec![
-            "SortExec: [nullable_col@0 ASC]",
+            "SortExec: expr=[nullable_col@0 ASC]",
             "  FilterExec: NOT non_nullable_col@1",
             "    CoalescePartitionsExec",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
@@ -1691,7 +1691,7 @@ mod tests {
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  SortExec: [nullable_col@0 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC]",
             "    FilterExec: NOT non_nullable_col@1",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
@@ -1771,9 +1771,9 @@ mod tests {
         // Sort Parallelize rule should end Coalesce + Sort linkage when Sort 
is Global Sort
         // Also input plan is not valid as it is. We need to add SortExec 
before SortPreservingMergeExec.
         let expected_input = vec![
-            "SortExec: [nullable_col@0 ASC]",
+            "SortExec: expr=[nullable_col@0 ASC]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "    SortExec: [nullable_col@0 ASC]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "        CoalescePartitionsExec",
             "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
@@ -1781,7 +1781,7 @@ mod tests {
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  SortExec: [nullable_col@0 ASC]",
+            "  SortExec: expr=[nullable_col@0 ASC]",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
             "        MemoryExec: partitions=0, partition_sizes=[]",
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index e3263747f..9bacf9fad 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -793,7 +793,12 @@ impl ExecutionPlan for SortExec {
         match t {
             DisplayFormatType::Default => {
                 let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
-                write!(f, "SortExec: [{}]", expr.join(","))
+                match self.fetch {
+                    Some(fetch) => {
+                        write!(f, "SortExec: fetch={fetch}, expr=[{}]", 
expr.join(","))
+                    }
+                    None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
+                }
             }
         }
     }
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 0f8dfb608..99f5bd2d6 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -600,7 +600,7 @@ async fn test_physical_plan_display_indent() {
     let expected = vec![
         "GlobalLimitExec: skip=0, fetch=10",
         "  SortPreservingMergeExec: [the_min@2 DESC]",
-        "    SortExec: [the_min@2 DESC]",
+        "    SortExec: fetch=10, expr=[the_min@2 DESC]",
         "      ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 
as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
         "        AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
         "          CoalesceBatchesExec: target_batch_size=4096",
diff --git a/datafusion/core/tests/sql/joins.rs 
b/datafusion/core/tests/sql/joins.rs
index bcb90d451..3c0aa8b3f 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1881,12 +1881,12 @@ async fn sort_merge_join_on_date32() -> Result<()> {
     let expected = vec![
         "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, 
c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]",
         "  SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 
0 }, Column { name: \"c1\", index: 0 })]",
-        "    SortExec: [c1@0 ASC]",
+        "    SortExec: expr=[c1@0 ASC]",
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", 
index: 0 }], 2), input_partitions=2",
         "          RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
         "            MemoryExec: partitions=1, partition_sizes=[1]",
-        "    SortExec: [c1@0 ASC]",
+        "    SortExec: expr=[c1@0 ASC]",
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", 
index: 0 }], 2), input_partitions=2",
         "          RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
@@ -1927,13 +1927,13 @@ async fn sort_merge_join_on_decimal() -> Result<()> {
         "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, 
c1@4 as c1, c2@5 as c2, c3@6 as c3, c4@7 as c4]",
         "  ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as 
c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]",
         "    SortMergeJoin: join_type=Right, on=[(Column { name: \"CAST(t1.c3 
AS Decimal128(10, 2))\", index: 4 }, Column { name: \"c3\", index: 2 })]",
-        "      SortExec: [CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
+        "      SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]",
         "        CoalesceBatchesExec: target_batch_size=4096",
         "          RepartitionExec: partitioning=Hash([Column { name: 
\"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2), input_partitions=2",
         "            ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, 
c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 
2))]",
         "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
         "                MemoryExec: partitions=1, partition_sizes=[1]",
-        "      SortExec: [c3@2 ASC]",
+        "      SortExec: expr=[c3@2 ASC]",
         "        CoalesceBatchesExec: target_batch_size=4096",
         "          RepartitionExec: partitioning=Hash([Column { name: \"c3\", 
index: 2 }], 2), input_partitions=2",
         "            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
@@ -1981,7 +1981,7 @@ async fn left_semi_join() -> Result<()> {
         let expected = if repartition_joins {
             vec![
                 "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                "  SortExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=Partitioned, join_type=LeftSemi, 
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 
})]",
@@ -1997,7 +1997,7 @@ async fn left_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
+                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=CollectLeft, join_type=LeftSemi, 
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 
})]",
@@ -2061,7 +2061,7 @@ async fn left_semi_join() -> Result<()> {
         let expected = if repartition_joins {
             vec![
                 "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                "  SortExec: [t1_id@0 ASC NULLS LAST]",
+                "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                 "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "      CoalesceBatchesExec: target_batch_size=4096",
                 "        HashJoinExec: mode=Partitioned, join_type=LeftSemi, 
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 
})]",
@@ -2076,7 +2076,7 @@ async fn left_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
+                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=CollectLeft, join_type=LeftSemi, 
on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 
})]",
@@ -2256,7 +2256,7 @@ async fn right_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                  "  SortExec: [t1_id@0 ASC NULLS LAST]",
+                  "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                   "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                   "      CoalesceBatchesExec: target_batch_size=4096",
                   "        HashJoinExec: mode=Partitioned, 
join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: 
\"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", 
index: 1 }, op: NotEq, right: Column { name: \"t1_name\", index: 0 } }",
@@ -2271,7 +2271,7 @@ async fn right_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
+                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=CollectLeft, join_type=RightSemi, 
on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 
})], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 1 }, op: 
NotEq, right: Column { name: \"t1_name\", index: 0 } }",
@@ -2302,7 +2302,7 @@ async fn right_semi_join() -> Result<()> {
         let physical_plan = dataframe.create_physical_plan().await?;
         let expected = if repartition_joins {
             vec![ "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]",
-                  "  SortExec: [t1_id@0 ASC NULLS LAST]",
+                  "  SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                   "    ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                   "      CoalesceBatchesExec: target_batch_size=4096",
                   "        HashJoinExec: mode=Partitioned, 
join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: 
\"t1_id\", index: 0 })], filter=BinaryExpr { left: Column { name: \"t2_name\", 
index: 0 }, op: NotEq, right: Column { name: \"t1_name\", index: 1 } }",
@@ -2317,7 +2317,7 @@ async fn right_semi_join() -> Result<()> {
             ]
         } else {
             vec![
-                "SortExec: [t1_id@0 ASC NULLS LAST]",
+                "SortExec: expr=[t1_id@0 ASC NULLS LAST]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as 
t1_name, t1_int@2 as t1_int]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
                 "      HashJoinExec: mode=CollectLeft, join_type=RightSemi, 
on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 
})], filter=BinaryExpr { left: Column { name: \"t2_name\", index: 0 }, op: 
NotEq, right: Column { name: \"t1_name\", index: 1 } }",
diff --git a/datafusion/core/tests/sql/window.rs 
b/datafusion/core/tests/sql/window.rs
index 3fc5b9569..99b9743f0 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1299,7 +1299,7 @@ async fn test_window_agg_sort() -> Result<()> {
             "ProjectionExec: expr=[c9@1 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@3 as sum1, SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum2]",
             "  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field 
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      SortExec: [c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
+            "      SortExec: expr=[c9@1 ASC NULLS LAST,c8@0 ASC NULLS LAST]",
         ]
     };
 
@@ -1331,7 +1331,7 @@ async fn 
over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
             "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]",
             "    BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): 
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "      BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): 
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "        SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
         ]
     };
 
@@ -1359,13 +1359,13 @@ async fn 
over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>
     // 3 SortExec are added
     let expected = {
         vec![
-            "SortExec: [c2@0 ASC NULLS LAST]",
+            "SortExec: expr=[c2@0 ASC NULLS LAST]",
             "  ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as 
MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@4 as SUM(aggregate_test_100.c9), 
MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNB [...]
             "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }]",
             "      BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): 
Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: [c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
+            "        SortExec: expr=[c9@1 ASC NULLS LAST,c2@0 ASC NULLS LAST]",
             "          BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): 
Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "            SortExec: expr=[c2@0 ASC NULLS LAST,c9@1 ASC NULLS 
LAST]",
         ]
     };
 
@@ -1401,11 +1401,11 @@ async fn test_window_partition_by_order_by() -> 
Result<()> {
         vec![
             "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as 
SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1))]",
             "  BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: 
\"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
-            "    SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "    SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
             "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): 
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) 
}]",
-            "            SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS 
LAST]",
             "              CoalesceBatchesExec: target_batch_size=4096",
             "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2), 
input_partitions=2",
             "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
@@ -1444,7 +1444,7 @@ async fn test_window_agg_sort_reversed_plan() -> 
Result<()> {
             "  GlobalLimitExec: skip=0, fetch=5",
             "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}]",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: [c9@0 DESC]",
+            "        SortExec: expr=[c9@0 DESC]",
         ]
     };
 
@@ -1499,7 +1499,7 @@ async fn test_window_agg_sort_reversed_plan_builtin() -> 
Result<()> {
             "  GlobalLimitExec: skip=0, fetch=5",
             "    BoundedWindowAggExec: 
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: 
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, 
nullable: true, dict_id [...]
             "      BoundedWindowAggExec: 
wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: 
\"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: 
\"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, 
nullable: true, dict_ [...]
-            "        SortExec: [c9@0 DESC]",
+            "        SortExec: expr=[c9@0 DESC]",
         ]
     };
 
@@ -1549,9 +1549,9 @@ async fn test_window_agg_sort_non_reversed_plan() -> 
Result<()> {
             "ProjectionExec: expr=[c9@0 as c9, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@2 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]",
             "  GlobalLimitExec: skip=0, fetch=5",
             "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortExec: [c9@0 ASC NULLS LAST]",
+            "      SortExec: expr=[c9@0 ASC NULLS LAST]",
             "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          SortExec: [c9@0 DESC]"
+            "          SortExec: expr=[c9@0 DESC]"
         ]
     };
 
@@ -1602,10 +1602,10 @@ async fn 
test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
             "ProjectionExec: expr=[c9@2 as c9, SUM(aggregate_test_100.c9) 
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS 
LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@5 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING@3 as sum2, ROW_NUMBER() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWE [...]
             "  GlobalLimitExec: skip=0, fetch=5",
             "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "      SortExec: [c9@2 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC 
NULLS LAST]",
+            "      SortExec: expr=[c9@2 ASC NULLS LAST,c1@0 ASC NULLS 
LAST,c2@1 ASC NULLS LAST]",
             "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
             "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "            SortExec: [c9@2 DESC,c1@0 DESC]",
+            "            SortExec: expr=[c9@2 DESC,c1@0 DESC]",
         ]
     };
 
@@ -1690,15 +1690,15 @@ async fn test_window_agg_complex_plan() -> Result<()> {
             "  GlobalLimitExec: skip=0, fetch=5",
             "    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_bound: Preced [...]
             "      BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: [c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
+            "        SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
             "          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field 
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: [c3@2 ASC NULLS LAST,c1@0 ASC]",
+            "            SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]",
             "              WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { 
name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
             "                WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field 
{ name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, 
SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Range, start_b [...]
             "                  WindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start [...]
             "                    WindowAggExec: wdw=[SUM(null_cases.c1): 
Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(10)), end_bound: 
Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: 
\"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, sta 
[...]
             "                      BoundedWindowAggExec: 
wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), 
end_bound: CurrentRow }]",
-            "                        SortExec: [c3@2 DESC,c1@0 ASC NULLS 
LAST]",
+            "                        SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS 
LAST]",
         ]
     };
 
@@ -1738,7 +1738,7 @@ async fn 
test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()>
             "  GlobalLimitExec: skip=0, fetch=5",
             "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]",
         ]
     };
 
@@ -1792,7 +1792,7 @@ async fn test_window_agg_sort_partitionby_reversed_plan() 
-> Result<()> {
             "  GlobalLimitExec: skip=0, fetch=5",
             "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) 
}]",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
-            "        SortExec: [c1@0 ASC NULLS LAST,c9@1 DESC]",
+            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]",
         ]
     };
 
@@ -1845,7 +1845,7 @@ async fn 
test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
             "  GlobalLimitExec: skip=0, fetch=5",
             "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, 
aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ord [...]
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, 
aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
\"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + 
aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBO [...]
-            "        SortExec: [CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 DESC,c2@0 
ASC NULLS LAST]",
+            "        SortExec: expr=[CAST(c3@1 AS Int16) + c4@2 DESC,c9@3 
DESC,c2@0 ASC NULLS LAST]",
         ]
     };
 
@@ -1956,7 +1956,7 @@ async fn 
test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
             "  GlobalLimitExec: skip=0, fetch=5",
             "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
             "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
+            "        SortExec: expr=[c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]",
         ]
     };
 
@@ -2005,7 +2005,7 @@ async fn test_window_agg_global_sort() -> Result<()> {
             "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
             "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
             "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "      SortExec: [c1@0 ASC NULLS LAST]",
+            "      SortExec: expr=[c1@0 ASC NULLS LAST]",
             "        CoalesceBatchesExec: target_batch_size=8192",
             "          RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
             "            RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
@@ -2040,11 +2040,11 @@ async fn 
test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "SortExec: [c1@0 ASC NULLS LAST]",
+            "SortExec: expr=[c1@0 ASC NULLS LAST]",
             "  CoalescePartitionsExec",
             "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY 
[aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@1 as rn1]",
             "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
\"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "        SortExec: [c1@0 ASC NULLS LAST]",
+            "        SortExec: expr=[c1@0 ASC NULLS LAST]",
             "          CoalesceBatchesExec: target_batch_size=8192",
             "            RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
             "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
@@ -2082,13 +2082,13 @@ async fn 
test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()>
     // Only 1 SortExec was added
     let expected = {
         vec![
-            "SortExec: [c1@0 ASC NULLS LAST]",
+            "SortExec: expr=[c1@0 ASC NULLS LAST]",
             "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
             "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
             "      SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
-            "        SortExec: [c9@1 ASC NULLS LAST]",
+            "        SortExec: expr=[c9@1 ASC NULLS LAST]",
             "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
-            "            SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
+            "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 ASC NULLS 
LAST]",
             "              CoalesceBatchesExec: target_batch_size=8192",
             "                RepartitionExec: partitioning=Hash([Column { 
name: \"c1\", index: 0 }], 2), input_partitions=2",
             "                  RepartitionExec: 
partitioning=RoundRobinBatch(2), input_partitions=1",
@@ -2126,7 +2126,7 @@ async fn test_window_agg_with_global_limit() -> 
Result<()> {
             "  AggregateExec: mode=Final, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
             "    AggregateExec: mode=Partial, gby=[], 
aggr=[ARRAYAGG(aggregate_test_100.c13)]",
             "      GlobalLimitExec: skip=0, fetch=1",
-            "        SortExec: [c13@0 ASC NULLS LAST]",
+            "        SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]",
             "          ProjectionExec: expr=[c13@0 as c13]",
         ]
     };
@@ -2320,7 +2320,7 @@ mod tests {
             vec![
                 "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 
as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 
as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, 
sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, 
minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, 
cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]",
                 "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: [inc_col@24 DESC]",
+                "    SortExec: fetch=5, expr=[inc_col@24 DESC]",
                 "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER 
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@14 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts 
ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@15 as sum2, 
SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as sum3, MIN(annotated_data.inc_col) 
ORDER BY [annotated_data.ts ASC NULLS L [...]
                 "        BoundedWindowAggExec: 
wdw=[SUM(annotated_data.desc_col): Ok(Field { name: 
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, 
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units [...]
                 "          BoundedWindowAggExec: 
wdw=[SUM(annotated_data.inc_col): Ok(Field { name: 
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, 
SUM(annotated_data.desc_col): Ok(Field { name: 
\"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }),  [...]
@@ -2395,7 +2395,7 @@ mod tests {
             vec![
                 "ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as 
lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, 
rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, 
dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as 
lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, 
lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, 
leadr2@23 as leadr2]",
                 "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: [ts@24 DESC]",
+                "    SortExec: fetch=5, expr=[ts@24 DESC]",
                 "      ProjectionExec: 
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, 
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] 
ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, 
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] 
RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, 
LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
                 "        BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) 
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_order [...]
                 "          BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) 
}, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ord [...]
@@ -2454,7 +2454,7 @@ mod tests {
             vec![
                 "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 
as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, 
count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]",
                 "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: [inc_col@10 ASC NULLS LAST]",
+                "    SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]",
                 "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER 
BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 
FOLLOWING@7 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts 
DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, 
MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, 
MIN(annotated_data.inc_col) ORDER BY [annotate [...]
                 "        BoundedWindowAggExec: 
wdw=[SUM(annotated_data.inc_col): Ok(Field { name: 
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, 
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), fr [...]
                 "          BoundedWindowAggExec: 
wdw=[SUM(annotated_data.inc_col): Ok(Field { name: 
\"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, 
MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", 
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }),  [...]
@@ -2508,7 +2508,7 @@ mod tests {
             vec![
                 "ProjectionExec: expr=[first_value1@0 as first_value1, 
first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as 
last_value2, nth_value1@4 as nth_value1]",
                 "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: [inc_col@5 ASC NULLS LAST]",
+                "    SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]",
                 "      ProjectionExec: 
expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS 
LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, 
FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS 
FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, 
LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] 
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, LAS [...]
                 "        BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(1)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_orde [...]
                 "          BoundedWindowAggExec: 
wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(3)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: 
\"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, 
dict_id: 0, dict_is_or [...]
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index d10946e5d..f155dc86a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -262,7 +262,7 @@ Sort: d.b ASC NULLS LAST
                     EmptyRelation
 physical_plan
 SortPreservingMergeExec: [b@0 ASC NULLS LAST]
-  SortExec: [b@0 ASC NULLS LAST]
+  SortExec: expr=[b@0 ASC NULLS LAST]
     ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a]
       AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a)]
         CoalesceBatchesExec: target_batch_size=8192
@@ -348,13 +348,13 @@ Sort: d.b ASC NULLS LAST
                       EmptyRelation
 physical_plan
 SortPreservingMergeExec: [b@0 ASC NULLS LAST]
-  SortExec: [b@0 ASC NULLS LAST]
+  SortExec: expr=[b@0 ASC NULLS LAST]
     ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as 
MAX(d.seq)]
       AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[MAX(d.a), 
MAX(d.seq)]
         AggregateExec: mode=Partial, gby=[b@2 as b], aggr=[MAX(d.a), 
MAX(d.seq)]
           ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 
as a, b@1 as b]
             BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: 
"ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]
-              SortExec: [b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
+              SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST]
                 CoalesceBatchesExec: target_batch_size=8192
                   RepartitionExec: partitioning=Hash([Column { name: "b", 
index: 1 }], 4), input_partitions=4
                     UnionExec

Reply via email to