This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a55a3ae977 [MINOR] Adding order into StreamingTableExec (#6860)
a55a3ae977 is described below
commit a55a3ae9773fff227e2c68420711d79d02877b4e
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Jul 7 09:41:41 2023 +0300
[MINOR] Adding order into StreamingTableExec (#6860)
* Adding order into StreamingTableExec
* Update streaming.rs
---
.../core/src/datasource/physical_plan/mod.rs | 4 +-
datafusion/core/src/datasource/streaming.rs | 1 +
datafusion/core/src/physical_plan/streaming.rs | 51 ++++++++++++++++++++--
3 files changed, 51 insertions(+), 5 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 610b5ee424..6ace5e84b0 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -417,7 +417,7 @@ where
/// A wrapper to customize partitioned file display
#[derive(Debug)]
-struct ProjectSchemaDisplay<'a>(&'a SchemaRef);
+pub struct ProjectSchemaDisplay<'a>(pub &'a SchemaRef);
impl<'a> Display for ProjectSchemaDisplay<'a> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
@@ -433,7 +433,7 @@ impl<'a> Display for ProjectSchemaDisplay<'a> {
/// A wrapper to customize output ordering display.
#[derive(Debug)]
-struct OutputOrderingDisplay<'a>(&'a [PhysicalSortExpr]);
+pub struct OutputOrderingDisplay<'a>(pub &'a [PhysicalSortExpr]);
impl<'a> Display for OutputOrderingDisplay<'a> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
diff --git a/datafusion/core/src/datasource/streaming.rs
b/datafusion/core/src/datasource/streaming.rs
index 1a3cd92916..97962ab9b7 100644
--- a/datafusion/core/src/datasource/streaming.rs
+++ b/datafusion/core/src/datasource/streaming.rs
@@ -97,6 +97,7 @@ impl TableProvider for StreamingTable {
self.schema.clone(),
self.partitions.clone(),
projection,
+ None,
self.infinite,
)?))
}
diff --git a/datafusion/core/src/physical_plan/streaming.rs
b/datafusion/core/src/physical_plan/streaming.rs
index 887592f295..22ae2af205 100644
--- a/datafusion/core/src/physical_plan/streaming.rs
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -25,11 +25,14 @@ use async_trait::async_trait;
use futures::stream::StreamExt;
use datafusion_common::{DataFusionError, Result, Statistics};
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use log::debug;
+use crate::datasource::physical_plan::{OutputOrderingDisplay,
ProjectSchemaDisplay};
use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use crate::physical_plan::{ExecutionPlan, Partitioning,
SendableRecordBatchStream};
+use crate::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+};
use datafusion_execution::TaskContext;
use super::{DisplayAs, DisplayFormatType};
@@ -48,6 +51,7 @@ pub struct StreamingTableExec {
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<Arc<[usize]>>,
projected_schema: SchemaRef,
+ projected_output_ordering: Option<LexOrdering>,
infinite: bool,
}
@@ -57,6 +61,7 @@ impl StreamingTableExec {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<&Vec<usize>>,
+ projected_output_ordering: Option<LexOrdering>,
infinite: bool,
) -> Result<Self> {
for x in partitions.iter() {
@@ -81,6 +86,7 @@ impl StreamingTableExec {
partitions,
projected_schema,
projection: projection.cloned().map(Into::into),
+ projected_output_ordering,
infinite,
})
}
@@ -125,7 +131,7 @@ impl ExecutionPlan for StreamingTableExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ self.projected_output_ordering.as_deref()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -158,6 +164,45 @@ impl ExecutionPlan for StreamingTableExec {
})
}
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "StreamingTableExec: partition_sizes={:?}",
+ self.partitions.len(),
+ )?;
+ if !self.projected_schema.fields().is_empty() {
+ write!(
+ f,
+ ", projection={}",
+ ProjectSchemaDisplay(&self.projected_schema)
+ )?;
+ }
+ if self.infinite {
+ write!(f, ", infinite_source=true")?;
+ }
+
+ self.projected_output_ordering
+ .as_deref()
+ .map_or(Ok(()), |ordering| {
+ if !ordering.is_empty() {
+ write!(
+ f,
+ ", output_ordering={}",
+ OutputOrderingDisplay(ordering)
+ )?;
+ }
+ Ok(())
+ })
+ }
+ }
+ }
+
fn statistics(&self) -> Statistics {
Default::default()
}