This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a55a3ae977 [MINOR] Adding order into StreamingTableExec (#6860)
a55a3ae977 is described below

commit a55a3ae9773fff227e2c68420711d79d02877b4e
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Jul 7 09:41:41 2023 +0300

    [MINOR] Adding order into StreamingTableExec (#6860)
    
    * Adding order into StreamingTableExec
    
    * Update streaming.rs
---
 .../core/src/datasource/physical_plan/mod.rs       |  4 +-
 datafusion/core/src/datasource/streaming.rs        |  1 +
 datafusion/core/src/physical_plan/streaming.rs     | 51 ++++++++++++++++++++--
 3 files changed, 51 insertions(+), 5 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 610b5ee424..6ace5e84b0 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -417,7 +417,7 @@ where
 
 /// A wrapper to customize partitioned file display
 #[derive(Debug)]
-struct ProjectSchemaDisplay<'a>(&'a SchemaRef);
+pub struct ProjectSchemaDisplay<'a>(pub &'a SchemaRef);
 
 impl<'a> Display for ProjectSchemaDisplay<'a> {
     fn fmt(&self, f: &mut Formatter) -> FmtResult {
@@ -433,7 +433,7 @@ impl<'a> Display for ProjectSchemaDisplay<'a> {
 
 /// A wrapper to customize output ordering display.
 #[derive(Debug)]
-struct OutputOrderingDisplay<'a>(&'a [PhysicalSortExpr]);
+pub struct OutputOrderingDisplay<'a>(pub &'a [PhysicalSortExpr]);
 
 impl<'a> Display for OutputOrderingDisplay<'a> {
     fn fmt(&self, f: &mut Formatter) -> FmtResult {
diff --git a/datafusion/core/src/datasource/streaming.rs 
b/datafusion/core/src/datasource/streaming.rs
index 1a3cd92916..97962ab9b7 100644
--- a/datafusion/core/src/datasource/streaming.rs
+++ b/datafusion/core/src/datasource/streaming.rs
@@ -97,6 +97,7 @@ impl TableProvider for StreamingTable {
             self.schema.clone(),
             self.partitions.clone(),
             projection,
+            None,
             self.infinite,
         )?))
     }
diff --git a/datafusion/core/src/physical_plan/streaming.rs 
b/datafusion/core/src/physical_plan/streaming.rs
index 887592f295..22ae2af205 100644
--- a/datafusion/core/src/physical_plan/streaming.rs
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -25,11 +25,14 @@ use async_trait::async_trait;
 use futures::stream::StreamExt;
 
 use datafusion_common::{DataFusionError, Result, Statistics};
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
 use log::debug;
 
+use crate::datasource::physical_plan::{OutputOrderingDisplay, 
ProjectSchemaDisplay};
 use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use crate::physical_plan::{ExecutionPlan, Partitioning, 
SendableRecordBatchStream};
+use crate::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
 use datafusion_execution::TaskContext;
 
 use super::{DisplayAs, DisplayFormatType};
@@ -48,6 +51,7 @@ pub struct StreamingTableExec {
     partitions: Vec<Arc<dyn PartitionStream>>,
     projection: Option<Arc<[usize]>>,
     projected_schema: SchemaRef,
+    projected_output_ordering: Option<LexOrdering>,
     infinite: bool,
 }
 
@@ -57,6 +61,7 @@ impl StreamingTableExec {
         schema: SchemaRef,
         partitions: Vec<Arc<dyn PartitionStream>>,
         projection: Option<&Vec<usize>>,
+        projected_output_ordering: Option<LexOrdering>,
         infinite: bool,
     ) -> Result<Self> {
         for x in partitions.iter() {
@@ -81,6 +86,7 @@ impl StreamingTableExec {
             partitions,
             projected_schema,
             projection: projection.cloned().map(Into::into),
+            projected_output_ordering,
             infinite,
         })
     }
@@ -125,7 +131,7 @@ impl ExecutionPlan for StreamingTableExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        self.projected_output_ordering.as_deref()
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -158,6 +164,45 @@ impl ExecutionPlan for StreamingTableExec {
         })
     }
 
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "StreamingTableExec: partition_sizes={:?}",
+                    self.partitions.len(),
+                )?;
+                if !self.projected_schema.fields().is_empty() {
+                    write!(
+                        f,
+                        ", projection={}",
+                        ProjectSchemaDisplay(&self.projected_schema)
+                    )?;
+                }
+                if self.infinite {
+                    write!(f, ", infinite_source=true")?;
+                }
+
+                self.projected_output_ordering
+                    .as_deref()
+                    .map_or(Ok(()), |ordering| {
+                        if !ordering.is_empty() {
+                            write!(
+                                f,
+                                ", output_ordering={}",
+                                OutputOrderingDisplay(ordering)
+                            )?;
+                        }
+                        Ok(())
+                    })
+            }
+        }
+    }
+
     fn statistics(&self) -> Statistics {
         Default::default()
     }

Reply via email to