This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c4b9995d9b fix: add `order_requirement` & `dist_requirement` to 
`OutputRequirementExec` display (#16726)
c4b9995d9b is described below

commit c4b9995d9be601907556da2ca538a452e0e909c4
Author: Loakesh Indiran <[email protected]>
AuthorDate: Wed Jul 16 12:42:11 2025 +0530

    fix: add `order_requirement` & `dist_requirement` to 
`OutputRequirementExec` display (#16726)
    
    * add: `order_requirement` & `dist_requirement` to `OutputRequirementExec` 
plan display
    
    * chore: cargo clippy
    
    * add: get `order_by` and `dost_cols`
    
    * fix: fmt order_by as (col, direction)
    
    * fix: fmt
    
    * fix: expr fmt
    
    * chore: cargo clippy
    
    * test: fix test for output requirement
    
    * test: fix sqllogictest
    
    * fix: fmt
    
    * refactor: use first() to borrow ordering requirements
---
 .../tests/physical_optimizer/enforce_sorting.rs    |  4 +--
 .../physical_optimizer/projection_pushdown.rs      |  4 +--
 .../physical-optimizer/src/output_requirements.rs  | 29 ++++++++++++++++++++--
 datafusion/sqllogictest/test_files/explain.slt     |  6 ++---
 4 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index e14166a922..e31a30cc08 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -983,7 +983,7 @@ async fn 
test_soft_hard_requirements_with_multiple_soft_requirements_and_output_
     ));
 
     let expected_input = [
-        "OutputRequirementExec",
+        "OutputRequirementExec: order_by=[(non_nullable_col@1, asc)], 
dist_by=SinglePartition",
         "  BoundedWindowAggExec: wdw=[count: Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
mode=[Sorted]",
         "    BoundedWindowAggExec: wdw=[count: Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
mode=[Sorted]",
         "      SortExec: expr=[nullable_col@0 DESC NULLS LAST], 
preserve_partitioning=[false]",
@@ -998,7 +998,7 @@ async fn 
test_soft_hard_requirements_with_multiple_soft_requirements_and_output_
     //     "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], file_type=parquet",
     // ];
     let expected_optimized = [
-        "OutputRequirementExec",
+        "OutputRequirementExec: order_by=[(non_nullable_col@1, asc)], 
dist_by=SinglePartition",
         "  BoundedWindowAggExec: wdw=[count: Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
mode=[Sorted]",
         "    SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], 
preserve_partitioning=[false]",
         "      BoundedWindowAggExec: wdw=[count: Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
mode=[Sorted]",
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 6964965a64..b5300c68d1 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -729,7 +729,7 @@ fn test_output_req_after_projection() -> Result<()> {
         actual,
         @r"
     ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
-      OutputRequirementExec
+      OutputRequirementExec: order_by=[(b@1, asc), (c@2 + a@0, asc)], 
dist_by=HashPartitioned[[a@0, b@1]])
         DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false
     "
     );
@@ -745,7 +745,7 @@ fn test_output_req_after_projection() -> Result<()> {
     assert_snapshot!(
         actual,
         @r"
-    OutputRequirementExec
+    OutputRequirementExec: order_by=[(b@2, asc), (c@0 + new_a@1, asc)], 
dist_by=HashPartitioned[[new_a@1, b@2]])
       ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]
         DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false
     "
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs 
b/datafusion/physical-optimizer/src/output_requirements.rs
index 044d27811b..d8ff2914dc 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -138,10 +138,35 @@ impl DisplayAs for OutputRequirementExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "OutputRequirementExec")
+                let order_cols = self
+                    .order_requirement
+                    .as_ref()
+                    .map(|reqs| reqs.first())
+                    .map(|lex| {
+                        let pairs: Vec<String> = lex
+                            .iter()
+                            .map(|req| {
+                                let direction = req
+                                    .options
+                                    .as_ref()
+                                    .map(
+                                        |opt| if opt.descending { "desc" } 
else { "asc" },
+                                    )
+                                    .unwrap_or("unspecified");
+                                format!("({}, {direction})", req.expr)
+                            })
+                            .collect();
+                        format!("[{}]", pairs.join(", "))
+                    })
+                    .unwrap_or_else(|| "[]".to_string());
+
+                write!(
+                    f,
+                    "OutputRequirementExec: order_by={}, dist_by={}",
+                    order_cols, self.dist_requirement
+                )
             }
             DisplayFormatType::TreeRender => {
-                // TODO: collect info
                 write!(f, "")
             }
         }
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index 50575a3aba..4d61b254f5 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -226,7 +226,7 @@ initial_physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/dat
 initial_physical_plan_with_stats DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:)]]
 initial_physical_plan_with_schema DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N]
 physical_plan after OutputRequirements
-01)OutputRequirementExec
+01)OutputRequirementExec: order_by=[], dist_by=Unspecified
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE
 physical_plan after join_selection SAME TEXT AS ABOVE
@@ -303,7 +303,7 @@ initial_physical_plan_with_schema
 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, 
tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, 
float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, 
string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, 
smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, 
double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryVie [...]
 physical_plan after OutputRequirements
-01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
+01)OutputRequirementExec: order_by=[], dist_by=Unspecified, 
statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), 
Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE
@@ -347,7 +347,7 @@ initial_physical_plan_with_schema
 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, 
tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, 
float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, 
string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, 
smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, 
double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryVie [...]
 physical_plan after OutputRequirements
-01)OutputRequirementExec
+01)OutputRequirementExec: order_by=[], dist_by=Unspecified
 02)--GlobalLimitExec: skip=0, fetch=10
 03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after aggregate_statistics SAME TEXT AS ABOVE


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to