This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new df41267df Improve parquet partition_file output display (#4467)
df41267df is described below
commit df41267df3eafce93d33cd29e74d9415ebef92a1
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Dec 4 03:19:31 2022 -0500
Improve parquet partition_file output display (#4467)
* Improve parquet partition_file output display
* Fix: update doc test
* Document format
* Update avro test
* Make display clearer by adding explicit groups
* fix doc test
---
.../core/src/physical_optimizer/enforcement.rs | 104 ++++++++++-----------
.../core/src/physical_optimizer/repartition.rs | 36 +++----
.../core/src/physical_plan/file_format/mod.rs | 83 +++++++++++++---
datafusion/core/src/physical_plan/mod.rs | 2 +-
datafusion/core/tests/sql/avro.rs | 2 +-
datafusion/core/tests/sql/explain_analyze.rs | 8 +-
datafusion/core/tests/sql/json.rs | 2 +-
7 files changed, 149 insertions(+), 88 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs
b/datafusion/core/src/physical_optimizer/enforcement.rs
index 6f6c504d0..cc9070ccb 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1289,12 +1289,12 @@ mod tests {
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ => vec![
@@ -1302,12 +1302,12 @@ mod tests {
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10)",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -1345,12 +1345,12 @@ mod tests {
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ =>
@@ -1359,12 +1359,12 @@ mod tests {
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 6 }], 10)",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -1414,11 +1414,11 @@ mod tests {
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index:
0 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
1 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index:
2 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join);
@@ -1436,11 +1436,11 @@ mod tests {
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index:
0 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
1 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index:
2 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join);
@@ -1487,11 +1487,11 @@ mod tests {
"ProjectionExec: expr=[c@2 as c1]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: \"a\", index: 0 }, Column { name: \"b\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\", index:
0 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
1 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\", index:
2 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join);
@@ -1524,11 +1524,11 @@ mod tests {
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\", index:
0 }], 10)",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"a2\", index:
0 }], 10)",
"AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join);
Ok(())
@@ -1573,11 +1573,11 @@ mod tests {
"AggregateExec: mode=FinalPartitioned, gby=[b1@1 as b1, a1@0 as
a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index:
0 }, Column { name: \"a1\", index: 1 }], 10)",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
0 }, Column { name: \"a\", index: 1 }], 10)",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join);
Ok(())
@@ -1679,16 +1679,16 @@ mod tests {
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name:
\"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\",
index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index:
1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }],
10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: \"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name:
\"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name: \"a\",
index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
1 }, Column { name: \"c\", index: 2 }, Column { name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index:
1 }, Column { name: \"c1\", index: 2 }, Column { name: \"a1\", index: 0 }],
10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_optimized!(expected, filter_top_join);
Ok(())
@@ -1799,16 +1799,16 @@ mod tests {
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as
C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column
{ name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name:
\"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"c\",
index: 2 }, Column { name: \"c1\", index: 2 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\",
index: 0 }, Column { name: \"b\", index: 1 }, Column { name: \"c\", index: 2
}], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\",
index: 0 }, Column { name: \"b1\", index: 1 }, Column { name: \"c1\", index: 2
}], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column
{ name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name:
\"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\",
index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\",
index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0
}], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0
}], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
@@ -1918,16 +1918,16 @@ mod tests {
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as
C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column
{ name: \"a\", index: 0 }, Column { name: \"a1\", index: 0 }), (Column { name:
\"b\", index: 1 }, Column { name: \"b1\", index: 1 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"a\",
index: 0 }, Column { name: \"b\", index: 1 }], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"a1\",
index: 0 }, Column { name: \"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column
{ name: \"c\", index: 2 }, Column { name: \"c1\", index: 2 }), (Column { name:
\"b\", index: 1 }, Column { name: \"b1\", index: 1 }), (Column { name: \"a\",
index: 0 }, Column { name: \"a1\", index: 0 })]",
"RepartitionExec: partitioning=Hash([Column { name: \"c\",
index: 2 }, Column { name: \"b\", index: 1 }, Column { name: \"a\", index: 0
}], 10)",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 2 }, Column { name: \"b1\", index: 1 }, Column { name: \"a1\", index: 0
}], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c,
d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
@@ -1992,14 +1992,14 @@ mod tests {
join_plan.as_str(),
"SortExec: [a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs
_ => vec![
@@ -2009,14 +2009,14 @@ mod tests {
join_plan.as_str(),
"SortExec: [a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column { name:
\"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -2045,14 +2045,14 @@ mod tests {
join_plan.as_str(),
"SortExec: [a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
// Should include 4 RepartitionExecs and 4 SortExecs
_ => vec![
@@ -2062,14 +2062,14 @@ mod tests {
join_plan.as_str(),
"SortExec: [a@0 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"a\", index: 0 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"SortExec: [b1@1 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"b1\", index: 1 }], 10)",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
"SortExec: [c@2 ASC]",
"RepartitionExec: partitioning=Hash([Column {
name: \"c\", index: 2 }], 10)",
- "ParquetExec: limit=None, partitions=[x],
projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join);
@@ -2136,13 +2136,13 @@ mod tests {
"AggregateExec: mode=FinalPartitioned, gby=[b1@1 as b1, a1@0 as
a1], aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b1\", index:
0 }, Column { name: \"a1\", index: 1 }], 10)",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
"SortExec: [b2@1 ASC,a2@0 ASC]",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
"RepartitionExec: partitioning=Hash([Column { name: \"b\", index:
0 }, Column { name: \"a\", index: 1 }], 10)",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
- "ParquetExec: limit=None, partitions=[x], projection=[a, b, c, d,
e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join);
Ok(())
@@ -2171,7 +2171,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"CoalesceBatchesExec: target_batch_size=4096",
- "ParquetExec: limit=None, partitions=[x], output_ordering=[a@0
ASC], projection=[a, b, c, d, e]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[a@0 ASC], projection=[a, b, c, d, e]",
];
assert_optimized!(expected, exec);
Ok(())
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index e22da6e22..1b4f53946 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -386,7 +386,7 @@ mod tests {
"AggregateExec: mode=Final, gby=[], aggr=[]",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -402,7 +402,7 @@ mod tests {
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -419,7 +419,7 @@ mod tests {
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require
sorted data either
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -436,7 +436,7 @@ mod tests {
"FilterExec: c1@0",
// nothing sorts the data, so the local limit doesn't require
sorted data either
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -452,7 +452,7 @@ mod tests {
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: [c1@0 ASC]",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -470,7 +470,7 @@ mod tests {
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be
wrong
"SortExec: [c1@0 ASC]",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -493,7 +493,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -518,7 +518,7 @@ mod tests {
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -534,11 +534,11 @@ mod tests {
let expected = &[
"UnionExec",
// Expect no repartition of ParquetExec
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -552,7 +552,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
// Expect no repartition of SortPreservingMergeExec
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -568,7 +568,7 @@ mod tests {
// Expect no repartition of SortPreservingMergeExec
// even though there is a projection exec between it
"ProjectionExec: expr=[c1@0 as c1]",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -585,7 +585,7 @@ mod tests {
"SortExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -602,7 +602,7 @@ mod tests {
"SortExec: [c1@0 ASC]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
@@ -623,7 +623,7 @@ mod tests {
"FilterExec: c1@0",
// repartition is lowest down
"RepartitionExec: partitioning=RoundRobinBatch(10)",
- "ParquetExec: limit=None, partitions=[x], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[c1]",
];
assert_optimized!(expected, plan);
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index f2ccc16bc..bcdc3221d 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -178,22 +178,39 @@ impl FileScanConfig {
}
/// A wrapper to customize partitioned file display
+///
+/// Prints in the format:
+/// ```text
+/// {NUM_GROUPS groups: [[file1, file2,...], [fileN, fileM, ...], ...]}
+/// ```
#[derive(Debug)]
struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
impl<'a> Display for FileGroupsDisplay<'a> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
- let parts: Vec<_> = self
- .0
- .iter()
- .map(|pp| {
- pp.iter()
- .map(|pf| pf.object_meta.location.as_ref())
- .collect::<Vec<_>>()
- .join(", ")
- })
- .collect();
- write!(f, "[{}]", parts.join(", "))
+ let mut first_group = true;
+ let groups = if self.0.len() == 1 { "group" } else { "groups" };
+ write!(f, "{{{} {}: [", self.0.len(), groups)?;
+ for group in self.0 {
+ if !first_group {
+ write!(f, ", ")?;
+ }
+ first_group = false;
+ write!(f, "[")?;
+
+ let mut first_file = true;
+ for pf in group {
+ if !first_file {
+ write!(f, ", ")?;
+ }
+ first_file = false;
+
+ write!(f, "{}", pf.object_meta.location.as_ref())?;
+ }
+ write!(f, "]")?;
+ }
+ write!(f, "]}}")?;
+ Ok(())
}
}
@@ -521,6 +538,8 @@ pub(crate) fn get_output_ordering(
#[cfg(test)]
mod tests {
+ use chrono::Utc;
+
use crate::{
test::{build_table_i32, columns},
test_util::aggr_test_schema,
@@ -796,4 +815,46 @@ mod tests {
output_ordering: None,
}
}
+
+ #[test]
+ fn file_groups_display_empty() {
+ let expected = "{0 groups: []}";
+ assert_eq!(&FileGroupsDisplay(&[]).to_string(), expected);
+ }
+
+ #[test]
+ fn file_groups_display_one() {
+ let files = [vec![partitioned_file("foo"), partitioned_file("bar")]];
+
+ let expected = "{1 group: [[foo, bar]]}";
+ assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
+ }
+
+ #[test]
+ fn file_groups_display_many() {
+ let files = [
+ vec![partitioned_file("foo"), partitioned_file("bar")],
+ vec![partitioned_file("baz")],
+ vec![],
+ ];
+
+ let expected = "{3 groups: [[foo, bar], [baz], []]}";
+ assert_eq!(&FileGroupsDisplay(&files).to_string(), expected);
+ }
+
+ /// create a PartitionedFile for testing
+ fn partitioned_file(path: &str) -> PartitionedFile {
+ let object_meta = ObjectMeta {
+ location: object_store::path::Path::parse(path).unwrap(),
+ last_modified: Utc::now(),
+ size: 42,
+ };
+
+ PartitionedFile {
+ object_meta,
+ partition_values: vec![],
+ range: None,
+ extensions: None,
+ }
+ }
}
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 7c3ef4c7e..2b4cd63c6 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -307,7 +307,7 @@ pub fn with_new_children_if_necessary(
/// \n CoalesceBatchesExec: target_batch_size=4096\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
-/// \n CsvExec: files=[WORKING_DIR/tests/example.csv],
has_header=true, limit=None, projection=[a]",
+/// \n CsvExec: files={1 group:
[[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None,
projection=[a]",
/// plan_string.trim());
///
/// let one_line = format!("{}", displayable_plan.one_line());
diff --git a/datafusion/core/tests/sql/avro.rs
b/datafusion/core/tests/sql/avro.rs
index c57fc820f..38f4ae7dd 100644
--- a/datafusion/core/tests/sql/avro.rs
+++ b/datafusion/core/tests/sql/avro.rs
@@ -154,7 +154,7 @@ async fn avro_explain() {
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
- \n AvroExec:
files=[ARROW_TEST_DATA/avro/alltypes_plain.avro], limit=None\
+ \n AvroExec: files={1 group:
[[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
\n",
],
];
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 5479bf529..bcbbead56 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -696,7 +696,7 @@ async fn test_physical_plan_display_indent() {
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec:
partitioning=RoundRobinBatch(9000)",
- " CsvExec:
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true,
limit=None, projection=[c1, c12]",
+ " CsvExec: files={1 group:
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None,
projection=[c1, c12]",
];
let normalizer = ExplainNormalizer::new();
@@ -739,12 +739,12 @@ async fn
test_physical_plan_display_indent_multi_children() {
" RepartitionExec: partitioning=Hash([Column { name: \"c1\",
index: 0 }], 9000)",
" ProjectionExec: expr=[c1@0 as c1]",
" RepartitionExec: partitioning=RoundRobinBatch(9000)",
- " CsvExec:
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true,
limit=None, projection=[c1]",
+ " CsvExec: files={1 group:
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None,
projection=[c1]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c2\",
index: 0 }], 9000)",
" ProjectionExec: expr=[c1@0 as c2]",
" RepartitionExec: partitioning=RoundRobinBatch(9000)",
- " CsvExec:
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true,
limit=None, projection=[c1, c2]",
+ " CsvExec: files={1 group:
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None,
projection=[c1, c2]",
];
let normalizer = ExplainNormalizer::new();
@@ -787,7 +787,7 @@ async fn csv_explain() {
\n CoalesceBatchesExec: target_batch_size=4096\
\n FilterExec: c2@1 > 10\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
- \n CsvExec:
files=[ARROW_TEST_DATA/csv/aggregate_test_100.csv], has_header=true,
limit=None, projection=[c1, c2]\
+ \n CsvExec: files={1 group:
[[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None,
projection=[c1, c2]\
\n",
]];
assert_eq!(expected, actual);
diff --git a/datafusion/core/tests/sql/json.rs
b/datafusion/core/tests/sql/json.rs
index 1f52625e1..e66fcb655 100644
--- a/datafusion/core/tests/sql/json.rs
+++ b/datafusion/core/tests/sql/json.rs
@@ -97,7 +97,7 @@ async fn json_explain() {
\n CoalescePartitionsExec\
\n AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
- \n JsonExec: limit=None,
files=[WORKING_DIR/tests/jsons/2.json]\n",
+ \n JsonExec: limit=None, files={1 group:
[[WORKING_DIR/tests/jsons/2.json]]}\n",
],
];
assert_eq!(expected, actual);