This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 85be1bc553 Minor: improve Display of output ordering of
`StreamTableExec` (#9225)
85be1bc553 is described below
commit 85be1bc5538661b58ca90fb1e042d8d840c9f693
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Feb 15 17:07:01 2024 +0300
Minor: improve Display of output ordering of `StreamTableExec` (#9225)
* Initial commit
* Update plan
---
.../core/src/datasource/physical_plan/mod.rs | 23 ++----------------
datafusion/physical-plan/src/display.rs | 28 +++++++++++++++++++++-
datafusion/physical-plan/src/streaming.rs | 17 ++++---------
datafusion/sqllogictest/test_files/window.slt | 2 +-
4 files changed, 34 insertions(+), 36 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 11eb9e7867..d654653999 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -59,7 +59,7 @@ use crate::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
},
- physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay},
+ physical_plan::display::{display_orderings, ProjectSchemaDisplay},
};
use arrow::{
@@ -129,26 +129,7 @@ impl DisplayAs for FileScanConfig {
write!(f, ", limit={limit}")?;
}
- if let Some(ordering) = orderings.first() {
- if !ordering.is_empty() {
- let start = if orderings.len() == 1 {
- ", output_ordering="
- } else {
- ", output_orderings=["
- };
- write!(f, "{}", start)?;
- for (idx, ordering) in
- orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
- {
- match idx {
- 0 => write!(f, "{}", OutputOrderingDisplay(ordering))?,
- _ => write!(f, ", {}",
OutputOrderingDisplay(ordering))?,
- }
- }
- let end = if orderings.len() == 1 { "" } else { "]" };
- write!(f, "{}", end)?;
- }
- }
+ display_orderings(f, &orderings)?;
Ok(())
}
diff --git a/datafusion/physical-plan/src/display.rs
b/datafusion/physical-plan/src/display.rs
index 19c2847b09..ff106dceb9 100644
--- a/datafusion/physical-plan/src/display.rs
+++ b/datafusion/physical-plan/src/display.rs
@@ -19,12 +19,13 @@
//! [`crate::displayable`] for examples of how to format
use std::fmt;
+use std::fmt::Formatter;
use super::{accept, ExecutionPlan, ExecutionPlanVisitor};
use arrow_schema::SchemaRef;
use datafusion_common::display::{GraphvizBuilder, PlanType, StringifiedPlan};
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
/// Options for controlling how each [`ExecutionPlan`] should format itself
#[derive(Debug, Clone, Copy)]
@@ -437,6 +438,31 @@ impl<'a> fmt::Display for OutputOrderingDisplay<'a> {
}
}
+pub fn display_orderings(f: &mut Formatter, orderings: &[LexOrdering]) ->
fmt::Result {
+ if let Some(ordering) = orderings.first() {
+ if !ordering.is_empty() {
+ let start = if orderings.len() == 1 {
+ ", output_ordering="
+ } else {
+ ", output_orderings=["
+ };
+ write!(f, "{}", start)?;
+ for (idx, ordering) in
+ orderings.iter().enumerate().filter(|(_, o)| !o.is_empty())
+ {
+ match idx {
+ 0 => write!(f, "{}", OutputOrderingDisplay(ordering))?,
+ _ => write!(f, ", {}", OutputOrderingDisplay(ordering))?,
+ }
+ }
+ let end = if orderings.len() == 1 { "" } else { "]" };
+ write!(f, "{}", end)?;
+ }
+ }
+
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use std::fmt::Write;
diff --git a/datafusion/physical-plan/src/streaming.rs
b/datafusion/physical-plan/src/streaming.rs
index 59819c6921..8976820928 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;
use super::{DisplayAs, DisplayFormatType};
-use crate::display::{OutputOrderingDisplay, ProjectSchemaDisplay};
+use crate::display::{display_orderings, ProjectSchemaDisplay};
use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
@@ -149,18 +149,9 @@ impl DisplayAs for StreamingTableExec {
write!(f, ", infinite_source=true")?;
}
- self.projected_output_ordering
- .first()
- .map_or(Ok(()), |ordering| {
- if !ordering.is_empty() {
- write!(
- f,
- ", output_ordering={}",
- OutputOrderingDisplay(ordering)
- )?;
- }
- Ok(())
- })
+ display_orderings(f, &self.projected_output_ordering)?;
+
+ Ok(())
}
}
}
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index fa4445d4cd..5a610c16bc 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3582,7 +3582,7 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
------CoalesceBatchesExec: target_batch_size=4096
--------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2,
preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c,
d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS
LAST]
+------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c,
d], infinite_source=true, output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS
LAST], [c@3 ASC NULLS LAST]]
# CTAS with NTILE function
statement ok