This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new df41267df Improve parquet partition_file output display (#4467)
df41267df is described below

commit df41267df3eafce93d33cd29e74d9415ebef92a1
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Dec 4 03:19:31 2022 -0500

    Improve parquet partition_file output display (#4467)
    
    * Improve parquet partition_file output display
    
    * Fix: update doc test
    
    * Document format
    
    * Update avro test
    
    * Make display clearer by adding explicit groups
    
    * fix doc test
---
 .../core/src/physical_optimizer/enforcement.rs     | 104 ++++++++++-----------
 .../core/src/physical_optimizer/repartition.rs     |  36 +++----
 .../core/src/physical_plan/file_format/mod.rs      |  83 +++++++++++++---
 datafusion/core/src/physical_plan/mod.rs           |   2 +-
 datafusion/core/tests/sql/avro.rs                  |   2 +-
 datafusion/core/tests/sql/explain_analyze.rs       |   8 +-
 datafusion/core/tests/sql/json.rs                  |   2 +-
 7 files changed, 149 insertions(+), 88 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs 
b/datafusion/core/src/physical_optimizer/enforcement.rs
index 6f6c504d0..cc9070ccb 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1289,12 +1289,12 @@ mod tests {
                             top_join_plan.as_str(),
                             join_plan.as_str(),
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10)",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                         ],
                         // Should include 4 RepartitionExecs
                         _ => vec![
@@ -1302,12 +1302,12 @@ mod tests {
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10)",
                             join_plan.as_str(),
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10)",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                         ],
                     };
                     assert_optimized!(expected, top_join);
@@ -1345,12 +1345,12 @@ mod tests {
                                 top_join_plan.as_str(),
                                 join_plan.as_str(),
                                 "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10)",
-                                "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                                "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10)",
                                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
-                                "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                                "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10)",
-                                "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                                "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             ],
                         // Should include 4 RepartitionExecs
                         _ =>
@@ -1359,12 +1359,12 @@ mod tests {
                                 "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 6 }], 10)",
                                 join_plan.as_str(),
                                 "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10)",
-                                "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                                "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10)",
                                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
-                                "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                                "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                                 "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10)",
-                                "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                                "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             ],
                     };
                     assert_optimized!(expected, top_join);
@@ -1414,11 +1414,11 @@ mod tests {
             "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 
0 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
1 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 
2 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, top_join);
 
@@ -1436,11 +1436,11 @@ mod tests {
             "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 
0 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
1 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 
2 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
         ];
 
         assert_optimized!(expected, top_join);
@@ -1487,11 +1487,11 @@ mod tests {
             "ProjectionExec: expr=[c@2 as c1]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a\", index: 
0 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
1 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"c\", index: 
2 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
         ];
 
         assert_optimized!(expected, top_join);
@@ -1524,11 +1524,11 @@ mod tests {
             "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index: 
0 }], 10)",
             "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 
0 }], 10)",
             "AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, join);
         Ok(())
@@ -1573,11 +1573,11 @@ mod tests {
             "AggregateExec: mode=FinalPartitioned, gby=[b1@1 as b1, a1@0 as 
a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 
0 }, Column { name: \"a1\", index: 1 }], 10)",
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
0 }, Column { name: \"a\", index: 1 }], 10)",
             "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, join);
         Ok(())
@@ -1679,16 +1679,16 @@ mod tests {
             "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: 
\"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", 
index: 0 }, Column { name: \"a1\", index: 0 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 
1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 
10)",
             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: 
\"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\", 
index: 0 }, Column { name: \"a1\", index: 0 })]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 
1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }], 
10)",
             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, filter_top_join);
         Ok(())
@@ -1799,16 +1799,16 @@ mod tests {
                 "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as 
C]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column 
{ name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: 
\"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\", 
index: 2 }, Column { name: \"c1\", index: 2 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a\", 
index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2 
}], 10)",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a1\", 
index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2 
}], 10)",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column 
{ name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: 
\"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", 
index: 0 }, Column { name: \"a1\", index: 0 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c\", 
index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 
}], 10)",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c1\", 
index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 
}], 10)",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             ];
 
             assert_plan_txt!(expected, reordered);
@@ -1918,16 +1918,16 @@ mod tests {
                 "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as 
C]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column 
{ name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name: 
\"b\", index: 1 }, Column { name: \"b1\", index: 1 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a\", 
index: 0 }, Column { name: \"b\", index: 1 }], 10)",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"a1\", 
index: 0 }, Column { name: \"b1\", index: 1 }], 10)",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column 
{ name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: 
\"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\", 
index: 0 }, Column { name: \"a1\", index: 0 })]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c\", 
index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0 
}], 10)",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 "RepartitionExec: partitioning=Hash([Column { name: \"c1\", 
index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0 
}], 10)",
                 "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
-                "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, 
d, e]",
+                "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             ];
 
             assert_plan_txt!(expected, reordered);
@@ -1992,14 +1992,14 @@ mod tests {
                         join_plan.as_str(),
                         "SortExec: [a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"a\", index: 0 }], 10)",
-                        "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         "SortExec: [b1@1 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"b1\", index: 1 }], 10)",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         "SortExec: [c@2 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"c\", index: 2 }], 10)",
-                        "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                     ],
                 // Should include 4 RepartitionExecs
                 _ => vec![
@@ -2009,14 +2009,14 @@ mod tests {
                         join_plan.as_str(),
                         "SortExec: [a@0 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"a\", index: 0 }], 10)",
-                        "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         "SortExec: [b1@1 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"b1\", index: 1 }], 10)",
                         "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as 
c1, d@3 as d1, e@4 as e1]",
-                        "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                         "SortExec: [c@2 ASC]",
                         "RepartitionExec: partitioning=Hash([Column { name: 
\"c\", index: 2 }], 10)",
-                        "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                        "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
                 ],
             };
             assert_optimized!(expected, top_join);
@@ -2045,14 +2045,14 @@ mod tests {
                             join_plan.as_str(),
                             "SortExec: [a@0 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [b1@1 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10)",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [c@2 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                         ],
                         // Should include 4 RepartitionExecs and 4 SortExecs
                         _ => vec![
@@ -2062,14 +2062,14 @@ mod tests {
                             join_plan.as_str(),
                             "SortExec: [a@0 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"a\", index: 0 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [b1@1 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"b1\", index: 1 }], 10)",
                             "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                             "SortExec: [c@2 ASC]",
                             "RepartitionExec: partitioning=Hash([Column { 
name: \"c\", index: 2 }], 10)",
-                            "ParquetExec: limit=None, partitions=[x], 
projection=[a, b, c, d, e]",
+                            "ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[a, b, c, d, e]",
                         ],
                     };
                     assert_optimized!(expected, top_join);
@@ -2136,13 +2136,13 @@ mod tests {
             "AggregateExec: mode=FinalPartitioned, gby=[b1@1 as b1, a1@0 as 
a1], aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b1\", index: 
0 }, Column { name: \"a1\", index: 1 }], 10)",
             "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
             "SortExec: [b2@1 ASC,a2@0 ASC]",
             "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
             "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], 
aggr=[]",
             "RepartitionExec: partitioning=Hash([Column { name: \"b\", index: 
0 }, Column { name: \"a\", index: 1 }], 10)",
             "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
-            "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d, 
e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, join);
         Ok(())
@@ -2171,7 +2171,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [a@0 ASC]",
             "CoalesceBatchesExec: target_batch_size=4096",
-            "ParquetExec: limit=None, partitions=[x], output_ordering=[a@0 
ASC], projection=[a, b, c, d, e]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
         ];
         assert_optimized!(expected, exec);
         Ok(())
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index e22da6e22..1b4f53946 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -386,7 +386,7 @@ mod tests {
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -402,7 +402,7 @@ mod tests {
             "AggregateExec: mode=Partial, gby=[], aggr=[]",
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -419,7 +419,7 @@ mod tests {
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require 
sorted data either
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -436,7 +436,7 @@ mod tests {
             "FilterExec: c1@0",
             // nothing sorts the data, so the local limit doesn't require 
sorted data either
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -452,7 +452,7 @@ mod tests {
             "LocalLimitExec: fetch=100",
             // data is sorted so can't repartition here
             "SortExec: [c1@0 ASC]",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -470,7 +470,7 @@ mod tests {
             // data is sorted so can't repartition here even though
             // filter would benefit from parallelism, the answers might be 
wrong
             "SortExec: [c1@0 ASC]",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -493,7 +493,7 @@ mod tests {
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -518,7 +518,7 @@ mod tests {
             "GlobalLimitExec: skip=0, fetch=100",
             "LocalLimitExec: fetch=100",
             // Expect no repartition to happen for local limit
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -534,11 +534,11 @@ mod tests {
         let expected = &[
             "UnionExec",
             // Expect no repartition of ParquetExec
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -552,7 +552,7 @@ mod tests {
         let expected = &[
             "SortPreservingMergeExec: [c1@0 ASC]",
             // Expect no repartition of SortPreservingMergeExec
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -568,7 +568,7 @@ mod tests {
             // Expect no repartition of SortPreservingMergeExec
             // even though there is a projection exec between it
             "ProjectionExec: expr=[c1@0 as c1]",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -585,7 +585,7 @@ mod tests {
             "SortExec: [c1@0 ASC]",
             "ProjectionExec: expr=[c1@0 as c1]",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -602,7 +602,7 @@ mod tests {
             "SortExec: [c1@0 ASC]",
             "FilterExec: c1@0",
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
@@ -623,7 +623,7 @@ mod tests {
             "FilterExec: c1@0",
             // repartition is lowest down
             "RepartitionExec: partitioning=RoundRobinBatch(10)",
-            "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+            "ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[c1]",
         ];
 
         assert_optimized!(expected, plan);
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index f2ccc16bc..bcdc3221d 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -178,22 +178,39 @@ impl FileScanConfig {
 }
 
 /// A wrapper to customize partitioned file display
+///
+/// Prints in the format:
+/// ```text
+/// {NUM_GROUPS groups: [[file1, file2,...], [fileN, fileM, ...], ...]}
+/// ```
 #[derive(Debug)]
 struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
 
 impl<'a> Display for FileGroupsDisplay<'a> {
     fn fmt(&self, f: &mut Formatter) -> FmtResult {
-        let parts: Vec<_> = self
-            .0
-            .iter()
-            .map(|pp| {
-                pp.iter()
-                    .map(|pf| pf.object_meta.location.as_ref())
-                    .collect::<Vec<_>>()
-                    .join(", ")
-            })
-            .collect();
-        write!(f, "[{}]", parts.join(", "))
+        let mut first_group = true;
+        let groups = if self.0.len() == 1 { "group" } else { "groups" };
+        write!(f, "{{{} {}: [", self.0.len(), groups)?;
+        for group in self.0 {
+            if !first_group {
+                write!(f, ", ")?;
+            }
+            first_group = false;
+            write!(f, "[")?;
+
+            let mut first_file = true;
+            for pf in group {
+                if !first_file {
+                    write!(f, ", ")?;
+                }
+                first_file = false;
+
+                write!(f, "{}", pf.object_meta.location.as_ref())?;
+            }
+            write!(f, "]")?;
+        }
+        write!(f, "]}}")?;
+        Ok(())
     }
 }
 
@@ -521,6 +538,8 @@ pub(crate) fn get_output_ordering(
 
 #[cfg(test)]
 mod tests {
+    use chrono::Utc;
+
     use crate::{
         test::{build_table_i32, columns},
         test_util::aggr_test_schema,
@@ -796,4 +815,46 @@ mod tests {
             output_ordering: None,
         }
     }
+
+    #[test]
+    fn file_groups_display_empty() {
+        let expected = "{0 groups: []}";
+        assert_eq!(&FileGroupsDisplay(&[]).to_string(), expected);
+    }
+
+    #[test]
+    fn file_groups_display_one() {
+        let files = [vec![partitioned_file("foo"), partitioned_file("bar")]];
+
+        let expected = "{1 group: [[foo, bar]]}";
+        assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
+    }
+
+    #[test]
+    fn file_groups_display_many() {
+        let files = [
+            vec![partitioned_file("foo"), partitioned_file("bar")],
+            vec![partitioned_file("baz")],
+            vec![],
+        ];
+
+        let expected = "{3 groups: [[foo, bar], [baz], []]}";
+        assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
+    }
+
+    /// create a PartitionedFile for testing
+    fn partitioned_file(path: &str) -> PartitionedFile {
+        let object_meta = ObjectMeta {
+            location: object_store::path::Path::parse(path).unwrap(),
+            last_modified: Utc::now(),
+            size: 42,
+        };
+
+        PartitionedFile {
+            object_meta,
+            partition_values: vec![],
+            range: None,
+            extensions: None,
+        }
+    }
 }
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index 7c3ef4c7e..2b4cd63c6 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -307,7 +307,7 @@ pub fn with_new_children_if_necessary(
 ///              \n  CoalesceBatchesExec: target_batch_size=4096\
 ///              \n    FilterExec: a@0 < 5\
 ///              \n      RepartitionExec: partitioning=RoundRobinBatch(3)\
-///              \n        CsvExec: files=[WORKING_DIR/tests/example.csv], 
has_header=true, limit=None, projection=[a]",
+///              \n        CsvExec: files={1 group: 
[[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None, 
projection=[a]",
 ///               plan_string.trim());
 ///
 ///   let one_line = format!("{}", displayable_plan.one_line());
diff --git a/datafusion/core/tests/sql/avro.rs 
b/datafusion/core/tests/sql/avro.rs
index c57fc820f..38f4ae7dd 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -154,7 +154,7 @@ async fn avro_explain() {
             \n    CoalescePartitionsExec\
             \n      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
             \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
-            \n          AvroExec: 
files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], limit=None\
+            \n          AvroExec: files={1 group: 
[[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
             \n",
         ],
     ];
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 5479bf529..bcbbead56 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -696,7 +696,7 @@ async fn test_physical_plan_display_indent() {
         "                CoalesceBatchesExec: target_batch_size=4096",
         "                  FilterExec: c12@1 < 10",
         "                    RepartitionExec: 
partitioning=RoundRobinBatch(9000)",
-        "                      CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None, projection=[c1, c12]",
+        "                      CsvExec: files={1 group: 
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, 
projection=[c1, c12]",
     ];
 
     let normalizer = ExplainNormalizer::new();
@@ -739,12 +739,12 @@ async fn 
test_physical_plan_display_indent_multi_children() {
         "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", 
index: 0 }], 9000)",
         "          ProjectionExec: expr=[c1@0 as c1]",
         "            RepartitionExec: partitioning=RoundRobinBatch(9000)",
-        "              CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None, projection=[c1]",
+        "              CsvExec: files={1 group: 
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, 
projection=[c1]",
         "      CoalesceBatchesExec: target_batch_size=4096",
         "        RepartitionExec: partitioning=Hash([Column { name: \"c2\", 
index: 0 }], 9000)",
         "          ProjectionExec: expr=[c1@0 as c2]",
         "            RepartitionExec: partitioning=RoundRobinBatch(9000)",
-        "              CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None, projection=[c1, c2]",
+        "              CsvExec: files={1 group: 
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, 
projection=[c1, c2]",
     ];
 
     let normalizer = ExplainNormalizer::new();
@@ -787,7 +787,7 @@ async fn csv_explain() {
               \n  CoalesceBatchesExec: target_batch_size=4096\
               \n    FilterExec: c2@1 > 10\
               \n      RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
-              \n        CsvExec: 
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true, 
limit=None, projection=[c1, c2]\
+              \n        CsvExec: files={1 group: 
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, 
projection=[c1, c2]\
               \n",
         ]];
     assert_eq!(expected, actual);
diff --git a/datafusion/core/tests/sql/json.rs 
b/datafusion/core/tests/sql/json.rs
index 1f52625e1..e66fcb655 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -97,7 +97,7 @@ async fn json_explain() {
             \n    CoalescePartitionsExec\
             \n      AggregateExec: mode=Partial, gby=[], 
aggr=[COUNT(UInt8(1))]\
             \n        RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
-            \n          JsonExec: limit=None, 
files=[WORKING_DIR/tests/jsons/2.json]\n",
+            \n          JsonExec: limit=None, files={1 group: 
[[WORKING_DIR/tests/jsons/2.json]]}\n",
         ],
     ];
     assert_eq!(expected, actual);


Reply via email to