This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c4b9995d9b fix: add `order_requirement` & `dist_requirement` to
`OutputRequirementExec` display (#16726)
c4b9995d9b is described below
commit c4b9995d9be601907556da2ca538a452e0e909c4
Author: Loakesh Indiran <[email protected]>
AuthorDate: Wed Jul 16 12:42:11 2025 +0530
fix: add `order_requirement` & `dist_requirement` to
`OutputRequirementExec` display (#16726)
* add: `order_requirement` & `dist_requirement` to `OutputRequirementExec`
plan display
* chore: cargo clippy
* add: get `order_by` and `dost_cols`
* fix: fmt order_by as (col, direction)
* fix: fmt
* fix: expr fmt
* chore: cargo clippy
* test: fix test for output requirement
* test: fix sqllogictest
* fix: fmt
* refactor: use first() to borrow ordering requirements
---
.../tests/physical_optimizer/enforce_sorting.rs | 4 +--
.../physical_optimizer/projection_pushdown.rs | 4 +--
.../physical-optimizer/src/output_requirements.rs | 29 ++++++++++++++++++++--
datafusion/sqllogictest/test_files/explain.slt | 6 ++---
4 files changed, 34 insertions(+), 9 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index e14166a922..e31a30cc08 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -983,7 +983,7 @@ async fn
test_soft_hard_requirements_with_multiple_soft_requirements_and_output_
));
let expected_input = [
- "OutputRequirementExec",
+ "OutputRequirementExec: order_by=[(non_nullable_col@1, asc)],
dist_by=SinglePartition",
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],
mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],
mode=[Sorted]",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST],
preserve_partitioning=[false]",
@@ -998,7 +998,7 @@ async fn
test_soft_hard_requirements_with_multiple_soft_requirements_and_output_
// " DataSourceExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], file_type=parquet",
// ];
let expected_optimized = [
- "OutputRequirementExec",
+ "OutputRequirementExec: order_by=[(non_nullable_col@1, asc)],
dist_by=SinglePartition",
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],
mode=[Sorted]",
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST],
preserve_partitioning=[false]",
" BoundedWindowAggExec: wdw=[count: Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],
mode=[Sorted]",
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 6964965a64..b5300c68d1 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -729,7 +729,7 @@ fn test_output_req_after_projection() -> Result<()> {
actual,
@r"
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
- OutputRequirementExec
+ OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)],
dist_by=HashPartitioned[[a@0, b@1]])
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false
"
);
@@ -745,7 +745,7 @@ fn test_output_req_after_projection() -> Result<()> {
assert_snapshot!(
actual,
@r"
- OutputRequirementExec
+ OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)],
dist_by=HashPartitioned[[new_a@1, b@2]])
ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false
"
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs
b/datafusion/physical-optimizer/src/output_requirements.rs
index 044d27811b..d8ff2914dc 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -138,10 +138,35 @@ impl DisplayAs for OutputRequirementExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "OutputRequirementExec")
+ let order_cols = self
+ .order_requirement
+ .as_ref()
+ .map(|reqs| reqs.first())
+ .map(|lex| {
+ let pairs: Vec<String> = lex
+ .iter()
+ .map(|req| {
+ let direction = req
+ .options
+ .as_ref()
+ .map(
+ |opt| if opt.descending { "desc" }
else { "asc" },
+ )
+ .unwrap_or("unspecified");
+ format!("({}, {direction})", req.expr)
+ })
+ .collect();
+ format!("[{}]", pairs.join(", "))
+ })
+ .unwrap_or_else(|| "[]".to_string());
+
+ write!(
+ f,
+ "OutputRequirementExec: order_by={}, dist_by={}",
+ order_cols, self.dist_requirement
+ )
}
DisplayFormatType::TreeRender => {
- // TODO: collect info
write!(f, "")
}
}
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 50575a3aba..4d61b254f5 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -226,7 +226,7 @@ initial_physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/dat
initial_physical_plan_with_stats DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent,
[(Col[0]:),(Col[1]:),(Col[2]:)]]
initial_physical_plan_with_schema DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N]
physical_plan after OutputRequirements
-01)OutputRequirementExec
+01)OutputRequirementExec: order_by=[], dist_by=Unspecified
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
physical_plan after join_selection SAME TEXT AS ABOVE
@@ -303,7 +303,7 @@ initial_physical_plan_with_schema
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N,
tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N,
float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N,
string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N,
smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N,
double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryVie [...]
physical_plan after OutputRequirements
-01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
+01)OutputRequirementExec: order_by=[], dist_by=Unspecified,
statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8),
Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
@@ -347,7 +347,7 @@ initial_physical_plan_with_schema
01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N,
tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N,
float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N,
string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N,
smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N,
double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryVie [...]
physical_plan after OutputRequirements
-01)OutputRequirementExec
+01)OutputRequirementExec: order_by=[], dist_by=Unspecified
02)--GlobalLimitExec: skip=0, fetch=10
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]