This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e1bef86240 Minor: include `sort` expressions in 
`SortPreservingRepartitionExec` explain plan (#7796)
e1bef86240 is described below

commit e1bef86240a99bd9a1032a63833b255922065fd3
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Oct 13 16:42:20 2023 -0400

    Minor: include `sort` expressions in `SortPreservingRepartitionExec` 
explain plan (#7796)
    
    * Minor: include sort expressions in SortPreservingRepartitionExec plan
    
    * update plan in new test
---
 .../src/physical_optimizer/enforce_distribution.rs | 41 ++++++++++++----------
 .../core/src/physical_optimizer/enforce_sorting.rs | 27 ++++++++------
 .../replace_with_order_preserving_variants.rs      | 41 +++++++++++++---------
 datafusion/physical-expr/src/sort_expr.rs          | 21 +++++++++++
 datafusion/physical-plan/src/memory.rs             |  7 ++--
 datafusion/physical-plan/src/repartition/mod.rs    | 29 ++++++++++++---
 datafusion/physical-plan/src/sorts/sort.rs         |  7 ++--
 .../src/sorts/sort_preserving_merge.rs             |  7 ++--
 datafusion/sqllogictest/test_files/window.slt      |  8 ++---
 9 files changed, 124 insertions(+), 64 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a1f509d287..9be566f10a 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1738,8 +1738,11 @@ mod tests {
             _t: DisplayFormatType,
             f: &mut std::fmt::Formatter,
         ) -> std::fmt::Result {
-            let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
-            write!(f, "SortRequiredExec: [{}]", expr.join(","))
+            write!(
+                f,
+                "SortRequiredExec: [{}]",
+                PhysicalSortExpr::format_list(&self.expr)
+            )
         }
     }
 
@@ -3056,16 +3059,16 @@ mod tests {
                     vec![
                         top_join_plan.as_str(),
                         join_plan.as_str(),
-                        "SortPreservingRepartitionExec: 
partitioning=Hash([a@0], 10), input_partitions=10",
+                        "SortPreservingRepartitionExec: 
partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
                         "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                         "SortExec: expr=[a@0 ASC]",
                         "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                        "SortPreservingRepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10",
+                        "SortPreservingRepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
                         "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                         "SortExec: expr=[b1@1 ASC]",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
                         "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                        "SortPreservingRepartitionExec: 
partitioning=Hash([c@2], 10), input_partitions=10",
+                        "SortPreservingRepartitionExec: 
partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
                         "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                         "SortExec: expr=[c@2 ASC]",
                         "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
@@ -3082,21 +3085,21 @@ mod tests {
                 _ => vec![
                     top_join_plan.as_str(),
                     // Below 4 operators are differences introduced, when join 
mode is changed
-                    "SortPreservingRepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10",
+                    "SortPreservingRepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10, sort_exprs=a@0 ASC",
                     "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                     "SortExec: expr=[a@0 ASC]",
                     "CoalescePartitionsExec",
                     join_plan.as_str(),
-                    "SortPreservingRepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10",
+                    "SortPreservingRepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10, sort_exprs=a@0 ASC",
                     "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                     "SortExec: expr=[a@0 ASC]",
                     "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e]",
-                    "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 
10), input_partitions=10",
+                    "SortPreservingRepartitionExec: partitioning=Hash([b1@1], 
10), input_partitions=10, sort_exprs=b1@1 ASC",
                     "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                     "SortExec: expr=[b1@1 ASC]",
                     "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, 
d@3 as d1, e@4 as e1]",
                     "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e]",
-                    "SortPreservingRepartitionExec: partitioning=Hash([c@2], 
10), input_partitions=10",
+                    "SortPreservingRepartitionExec: partitioning=Hash([c@2], 
10), input_partitions=10, sort_exprs=c@2 ASC",
                     "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
                     "SortExec: expr=[c@2 ASC]",
                     "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e]",
@@ -3170,16 +3173,16 @@ mod tests {
                         JoinType::Inner | JoinType::Right => vec![
                             top_join_plan.as_str(),
                             join_plan.as_str(),
-                            "SortPreservingRepartitionExec: 
partitioning=Hash([a@0], 10), input_partitions=10",
+                            "SortPreservingRepartitionExec: 
partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "SortExec: expr=[a@0 ASC]",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                            "SortPreservingRepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10",
+                            "SortPreservingRepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "SortExec: expr=[b1@1 ASC]",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                            "SortPreservingRepartitionExec: 
partitioning=Hash([c@2], 10), input_partitions=10",
+                            "SortPreservingRepartitionExec: 
partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "SortExec: expr=[c@2 ASC]",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
@@ -3187,21 +3190,21 @@ mod tests {
                         // Should include 8 RepartitionExecs (4 of them 
preserves order) and 4 SortExecs
                         JoinType::Left | JoinType::Full => vec![
                             top_join_plan.as_str(),
-                            "SortPreservingRepartitionExec: 
partitioning=Hash([b1@6], 10), input_partitions=10",
+                            "SortPreservingRepartitionExec: 
partitioning=Hash([b1@6], 10), input_partitions=10, sort_exprs=b1@6 ASC",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "SortExec: expr=[b1@6 ASC]",
                             "CoalescePartitionsExec",
                             join_plan.as_str(),
-                            "SortPreservingRepartitionExec: 
partitioning=Hash([a@0], 10), input_partitions=10",
+                            "SortPreservingRepartitionExec: 
partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "SortExec: expr=[a@0 ASC]",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                            "SortPreservingRepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10",
+                            "SortPreservingRepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "SortExec: expr=[b1@1 ASC]",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
-                            "SortPreservingRepartitionExec: 
partitioning=Hash([c@2], 10), input_partitions=10",
+                            "SortPreservingRepartitionExec: 
partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
                             "RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
                             "SortExec: expr=[c@2 ASC]",
                             "ParquetExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
@@ -3292,7 +3295,7 @@ mod tests {
 
         let expected_first_sort_enforcement = &[
             "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
-            "SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0], 
10), input_partitions=10",
+            "SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0], 
10), input_partitions=10, sort_exprs=b3@1 ASC,a3@0 ASC",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "SortExec: expr=[b3@1 ASC,a3@0 ASC]",
             "CoalescePartitionsExec",
@@ -3303,7 +3306,7 @@ mod tests {
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e]",
-            "SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0], 
10), input_partitions=10",
+            "SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0], 
10), input_partitions=10, sort_exprs=b2@1 ASC,a2@0 ASC",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "SortExec: expr=[b2@1 ASC,a2@0 ASC]",
             "CoalescePartitionsExec",
@@ -4382,7 +4385,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c@2 ASC]",
             "FilterExec: c@2 = 0",
-            "SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
+            "SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, sort_exprs=c@2 ASC",
             "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC]",
         ];
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index a381bbb501..92db3bbd05 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -2156,15 +2156,19 @@ mod tests {
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
         let physical_plan = sort_exec(vec![sort_expr("a", &schema)], 
coalesce_partitions);
 
-        let expected_input = ["SortExec: expr=[a@0 ASC]",
+        let expected_input = [
+            "SortExec: expr=[a@0 ASC]",
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
-        let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+        ];
+        let expected_optimized = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, sort_exprs=a@0 ASC",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
+            "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+        ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
     }
@@ -2186,11 +2190,14 @@ mod tests {
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
-        let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
+        ];
+        let expected_optimized = [
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, sort_exprs=a@0 ASC",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
+            "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+        ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
false);
         Ok(())
     }
@@ -2259,7 +2266,7 @@ mod tests {
         let expected_input = [
             "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
             "  SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
-            "    SortPreservingRepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=10",
+            "    SortPreservingRepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=10, sort_exprs=a@0 ASC,b@1 
ASC",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "        SortExec: expr=[a@0 ASC,b@1 ASC]",
             "          CsvExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], has_header=false",
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index ede95fc677..f4b3608d00 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -368,7 +368,7 @@ mod tests {
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -412,10 +412,10 @@ mod tests {
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  FilterExec: c@2 > 3",
-            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        SortPreservingMergeExec: [a@0 ASC]",
-            "          SortPreservingRepartitionExec: 
partitioning=Hash([c1@0], 8), input_partitions=8",
+            "          SortPreservingRepartitionExec: 
partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], 
has_header=true",
         ];
@@ -442,11 +442,14 @@ mod tests {
             "      FilterExec: c@2 > 3",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
-        let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+        let expected_optimized = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    FilterExec: c@2 > 3",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+
+        ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
     }
@@ -475,7 +478,7 @@ mod tests {
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
             "  CoalesceBatchesExec: target_batch_size=8192",
             "    FilterExec: c@2 > 3",
-            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8",
+            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -509,7 +512,7 @@ mod tests {
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
             "  CoalesceBatchesExec: target_batch_size=8192",
             "    FilterExec: c@2 > 3",
-            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8",
+            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "        CoalesceBatchesExec: target_batch_size=8192",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
@@ -561,21 +564,25 @@ mod tests {
         let physical_plan =
             sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
 
-        let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+        let expected_input = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
             "      CoalesceBatchesExec: target_batch_size=8192",
             "        FilterExec: c@2 > 3",
             "          RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
-        let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"
+        ];
+        let expected_optimized = [
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    CoalesceBatchesExec: target_batch_size=8192",
             "      FilterExec: c@2 > 3",
-            "        SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8",
+            "        SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
+            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
+        ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
     }
@@ -627,7 +634,7 @@ mod tests {
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -671,7 +678,7 @@ mod tests {
         let expected_optimized = [
             "SortPreservingMergeExec: [c@2 ASC]",
             "  FilterExec: c@2 > 3",
-            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=c@2 ASC",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        SortExec: expr=[c@2 ASC]",
             "          CoalescePartitionsExec",
@@ -756,7 +763,7 @@ mod tests {
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
diff --git a/datafusion/physical-expr/src/sort_expr.rs 
b/datafusion/physical-expr/src/sort_expr.rs
index 83d32dfeec..74179ba594 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -17,6 +17,7 @@
 
 //! Sort expressions
 
+use std::fmt::Display;
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
@@ -89,6 +90,26 @@ impl PhysicalSortExpr {
                 .options
                 .map_or(true, |opts| self.options == opts)
     }
+
+    /// Returns a [`Display`]able list of `PhysicalSortExpr`.
+    pub fn format_list(input: &[PhysicalSortExpr]) -> impl Display + '_ {
+        struct DisplayableList<'a>(&'a [PhysicalSortExpr]);
+        impl<'a> Display for DisplayableList<'a> {
+            fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+                let mut first = true;
+                for sort_expr in self.0 {
+                    if first {
+                        first = false;
+                    } else {
+                        write!(f, ",")?;
+                    }
+                    write!(f, "{}", sort_expr)?;
+                }
+                Ok(())
+            }
+        }
+        DisplayableList(input)
+    }
 }
 
 /// Represents sort requirement associated with a plan
diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index 4c83ff1528..d919ded8d0 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -77,9 +77,10 @@ impl DisplayAs for MemoryExec {
                     .sort_information
                     .first()
                     .map(|output_ordering| {
-                        let order_strings: Vec<_> =
-                            output_ordering.iter().map(|e| 
e.to_string()).collect();
-                        format!(", output_ordering={}", 
order_strings.join(","))
+                        format!(
+                            ", output_ordering={}",
+                            PhysicalSortExpr::format_list(output_ordering)
+                        )
                     })
                     .unwrap_or_default();
 
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 14b54dc061..bcb9c3afee 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -308,7 +308,8 @@ pub struct RepartitionExec {
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 
-    /// Boolean flag to decide whether to preserve ordering
+    /// Boolean flag to decide whether to preserve ordering. If true means
+    /// `SortPreservingRepartitionExec`, false means `RepartitionExec`.
     preserve_order: bool,
 }
 
@@ -370,7 +371,7 @@ impl RepartitionExec {
         self.preserve_order
     }
 
-    /// Get name of the Executor
+    /// Get name used to display this Exec
     pub fn name(&self) -> &str {
         if self.preserve_order {
             "SortPreservingRepartitionExec"
@@ -394,7 +395,16 @@ impl DisplayAs for RepartitionExec {
                     self.name(),
                     self.partitioning,
                     self.input.output_partitioning().partition_count()
-                )
+                )?;
+
+                if let Some(sort_exprs) = self.sort_exprs() {
+                    write!(
+                        f,
+                        ", sort_exprs={}",
+                        PhysicalSortExpr::format_list(sort_exprs)
+                    )?;
+                }
+                Ok(())
             }
         }
     }
@@ -576,8 +586,8 @@ impl ExecutionPlan for RepartitionExec {
                 .collect::<Vec<_>>();
             // Note that receiver size (`rx.len()`) and `num_input_partitions` 
are same.
 
-            // Get existing ordering:
-            let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
+            // Get existing ordering to use for merging
+            let sort_exprs = self.sort_exprs().unwrap_or(&[]);
 
             // Merge streams (while preserving ordering) coming from
             // input partitions to this partition:
@@ -646,6 +656,15 @@ impl RepartitionExec {
         self
     }
 
+    /// Return the sort expressions that are used to merge
+    fn sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.preserve_order {
+            self.input.output_ordering()
+        } else {
+            None
+        }
+    }
+
     /// Pulls data from the specified input plan, feeding it to the
     /// output partitions based on the desired partitioning
     ///
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index a56f8fec68..ffc4ef9dc3 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -763,17 +763,16 @@ impl DisplayAs for SortExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
+                let expr = PhysicalSortExpr::format_list(&self.expr);
                 match self.fetch {
                     Some(fetch) => {
                         write!(
                             f,
                             // TODO should this say topk?
-                            "SortExec: fetch={fetch}, expr=[{}]",
-                            expr.join(",")
+                            "SortExec: fetch={fetch}, expr=[{expr}]",
                         )
                     }
-                    None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
+                    None => write!(f, "SortExec: expr=[{expr}]"),
                 }
             }
         }
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 5b485e0b68..597b59f776 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -118,8 +118,11 @@ impl DisplayAs for SortPreservingMergeExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
-                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?;
+                write!(
+                    f,
+                    "SortPreservingMergeExec: [{}]",
+                    PhysicalSortExpr::format_list(&self.expr)
+                )?;
                 if let Some(fetch) = self.fetch {
                     write!(f, ", fetch={fetch}")?;
                 };
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 5fb5a04c67..80e496a336 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3245,17 +3245,17 @@ physical_plan
 ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTIT [...]
 --BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] 
ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), f [...]
 ----CoalesceBatchesExec: target_batch_size=4096
-------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2), 
input_partitions=2
+------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2), 
input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b ASC NULLS LAST,c ASC NULLS 
LAST
 --------ProjectionExec: expr=[a@0 as a, d@3 as d, 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as 
SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(annotated_data_infinit [...]
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION 
BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY 
[annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64, nullab [...]
 ------------CoalesceBatchesExec: target_batch_size=4096
---------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2), 
input_partitions=2
+--------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2), 
input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC 
NULLS LAST
 ----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) 
PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY 
[annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY 
[annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW", data_type: Int64,  [...]
 ------------------CoalesceBatchesExec: target_batch_size=4096
---------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, 
d@3], 2), input_partitions=2
+--------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, 
d@3], 2), input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS 
LAST,c@2 ASC NULLS LAST
 ----------------------BoundedWindowAggExec: 
wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, 
annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: I [...]
 ------------------------CoalesceBatchesExec: target_batch_size=4096
---------------------------SortPreservingRepartitionExec: 
partitioning=Hash([a@0, b@1], 2), input_partitions=2
+--------------------------SortPreservingRepartitionExec: 
partitioning=Hash([a@0, b@1], 2), input_partitions=2, sort_exprs=a@0 ASC NULLS 
LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST
 ----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS 
LAST, c@2 ASC NULLS LAST], has_header=true
 


Reply via email to