This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 85be1bc553 Minor: improve Display of output ordering of 
`StreamTableExec` (#9225)
85be1bc553 is described below

commit 85be1bc5538661b58ca90fb1e042d8d840c9f693
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Feb 15 17:07:01 2024 +0300

    Minor: improve Display of output ordering of `StreamTableExec` (#9225)
    
    * Initial commit
    
    * Update plan
---
 .../core/src/datasource/physical_plan/mod.rs       | 23 ++----------------
 datafusion/physical-plan/src/display.rs            | 28 +++++++++++++++++++++-
 datafusion/physical-plan/src/streaming.rs          | 17 ++++---------
 datafusion/sqllogictest/test_files/window.slt      |  2 +-
 4 files changed, 34 insertions(+), 36 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 11eb9e7867..d654653999 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -59,7 +59,7 @@ use crate::{
         listing::{FileRange, PartitionedFile},
         object_store::ObjectStoreUrl,
     },
-    physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay},
+    physical_plan::display::{display_orderings, ProjectSchemaDisplay},
 };
 
 use arrow::{
@@ -129,26 +129,7 @@ impl DisplayAs for FileScanConfig {
             write!(f, ", limit={limit}")?;
         }
 
-        if let Some(ordering) = orderings.first() {
-            if !ordering.is_empty() {
-                let start = if orderings.len() == 1 {
-                    ", output_ordering="
-                } else {
-                    ", output_orderings=["
-                };
-                write!(f, "{}", start)?;
-                for (idx, ordering) in
-                    orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
-                {
-                    match idx {
-                        0 => write!(f, "{}", OutputOrderingDisplay(ordering))?,
-                        _ => write!(f, ", {}", 
OutputOrderingDisplay(ordering))?,
-                    }
-                }
-                let end = if orderings.len() == 1 { "" } else { "]" };
-                write!(f, "{}", end)?;
-            }
-        }
+        display_orderings(f, &orderings)?;
 
         Ok(())
     }
diff --git a/datafusion/physical-plan/src/display.rs 
b/datafusion/physical-plan/src/display.rs
index 19c2847b09..ff106dceb9 100644
--- a/datafusion/physical-plan/src/display.rs
+++ b/datafusion/physical-plan/src/display.rs
@@ -19,12 +19,13 @@
 //! [`crate::displayable`] for examples of how to format
 
 use std::fmt;
+use std::fmt::Formatter;
 
 use super::{accept, ExecutionPlan, ExecutionPlanVisitor};
 
 use arrow_schema::SchemaRef;
 use datafusion_common::display::{GraphvizBuilder, PlanType, StringifiedPlan};
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
 
 /// Options for controlling how each [`ExecutionPlan`] should format itself
 #[derive(Debug, Clone, Copy)]
@@ -437,6 +438,31 @@ impl<'a> fmt::Display for OutputOrderingDisplay<'a> {
     }
 }
 
+pub fn display_orderings(f: &mut Formatter, orderings: &[LexOrdering]) -> 
fmt::Result {
+    if let Some(ordering) = orderings.first() {
+        if !ordering.is_empty() {
+            let start = if orderings.len() == 1 {
+                ", output_ordering="
+            } else {
+                ", output_orderings=["
+            };
+            write!(f, "{}", start)?;
+            for (idx, ordering) in
+                orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
+            {
+                match idx {
+                    0 => write!(f, "{}", OutputOrderingDisplay(ordering))?,
+                    _ => write!(f, ", {}", OutputOrderingDisplay(ordering))?,
+                }
+            }
+            let end = if orderings.len() == 1 { "" } else { "]" };
+            write!(f, "{}", end)?;
+        }
+    }
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use std::fmt::Write;
diff --git a/datafusion/physical-plan/src/streaming.rs 
b/datafusion/physical-plan/src/streaming.rs
index 59819c6921..8976820928 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -21,7 +21,7 @@ use std::any::Any;
 use std::sync::Arc;
 
 use super::{DisplayAs, DisplayFormatType};
-use crate::display::{OutputOrderingDisplay, ProjectSchemaDisplay};
+use crate::display::{display_orderings, ProjectSchemaDisplay};
 use crate::stream::RecordBatchStreamAdapter;
 use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
 
@@ -149,18 +149,9 @@ impl DisplayAs for StreamingTableExec {
                     write!(f, ", infinite_source=true")?;
                 }
 
-                self.projected_output_ordering
-                    .first()
-                    .map_or(Ok(()), |ordering| {
-                        if !ordering.is_empty() {
-                            write!(
-                                f,
-                                ", output_ordering={}",
-                                OutputOrderingDisplay(ordering)
-                            )?;
-                        }
-                        Ok(())
-                    })
+                display_orderings(f, &self.projected_output_ordering)?;
+
+                Ok(())
             }
         }
     }
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index fa4445d4cd..5a610c16bc 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3582,7 +3582,7 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
 ------CoalesceBatchesExec: target_batch_size=4096
 --------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2, 
preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, 
d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS 
LAST]
+------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, 
d], infinite_source=true, output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS 
LAST], [c@3 ASC NULLS LAST]]
 
 # CTAS with NTILE function
 statement ok

Reply via email to