This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d610a9fc77 Set DisplayAs to be a supertrait of ExecutionPlan (#6835)
d610a9fc77 is described below
commit d610a9fc771d4c20b269d4702c0d6a2b6618d713
Author: Kirill Zaborsky <[email protected]>
AuthorDate: Thu Jul 6 23:34:55 2023 +0300
Set DisplayAs to be a supertrait of ExecutionPlan (#6835)
Follow-up to https://github.com/apache/arrow-datafusion/pull/6711
---
datafusion-examples/examples/custom_datasource.rs | 11 +-
.../src/datasource/physical_plan/arrow_file.rs | 20 +--
.../core/src/datasource/physical_plan/avro.rs | 20 +--
.../core/src/datasource/physical_plan/csv.rs | 22 +--
.../core/src/datasource/physical_plan/json.rs | 20 +--
.../core/src/datasource/physical_plan/parquet.rs | 54 +++----
.../core/src/physical_optimizer/repartition.rs | 20 +--
.../core/src/physical_plan/aggregates/mod.rs | 166 +++++++++++---------
datafusion/core/src/physical_plan/analyze.rs | 28 ++--
.../core/src/physical_plan/coalesce_batches.rs | 35 +++--
.../core/src/physical_plan/coalesce_partitions.rs | 28 ++--
datafusion/core/src/physical_plan/empty.rs | 28 ++--
datafusion/core/src/physical_plan/explain.rs | 27 ++--
datafusion/core/src/physical_plan/filter.rs | 30 ++--
datafusion/core/src/physical_plan/insert.rs | 28 ++--
.../core/src/physical_plan/joins/cross_join.rs | 27 ++--
.../core/src/physical_plan/joins/hash_join.rs | 47 +++---
.../src/physical_plan/joins/nested_loop_join.rs | 38 ++---
.../src/physical_plan/joins/sort_merge_join.rs | 43 +++---
.../src/physical_plan/joins/symmetric_hash_join.rs | 47 +++---
datafusion/core/src/physical_plan/limit.rs | 63 ++++----
datafusion/core/src/physical_plan/memory.rs | 42 ++---
datafusion/core/src/physical_plan/mod.rs | 12 +-
datafusion/core/src/physical_plan/projection.rs | 54 +++----
.../core/src/physical_plan/repartition/mod.rs | 40 ++---
datafusion/core/src/physical_plan/sorts/sort.rs | 42 ++---
.../physical_plan/sorts/sort_preserving_merge.rs | 39 ++---
datafusion/core/src/physical_plan/streaming.rs | 16 ++
datafusion/core/src/physical_plan/union.rs | 53 ++++---
datafusion/core/src/physical_plan/unnest.rs | 28 ++--
datafusion/core/src/physical_plan/values.rs | 28 ++--
.../windows/bounded_window_agg_exec.rs | 60 ++++----
.../src/physical_plan/windows/window_agg_exec.rs | 56 +++----
datafusion/core/src/physical_planner.rs | 20 +--
datafusion/core/src/test/exec.rs | 169 +++++++++++----------
datafusion/core/src/test_util/mod.rs | 37 ++---
datafusion/core/tests/custom_sources.rs | 30 ++--
.../provider_filter_pushdown.rs | 29 ++--
.../core/tests/custom_sources_cases/statistics.rs | 38 ++---
.../core/tests/user_defined/user_defined_plan.rs | 31 ++--
40 files changed, 880 insertions(+), 746 deletions(-)
diff --git a/datafusion-examples/examples/custom_datasource.rs
b/datafusion-examples/examples/custom_datasource.rs
index c426d9611c..a24573c860 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -27,13 +27,14 @@ use datafusion::execution::context::{SessionState,
TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
- project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics,
+ project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
+ SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
-use std::fmt::{Debug, Formatter};
+use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::timeout;
@@ -204,6 +205,12 @@ impl CustomExec {
}
}
+impl DisplayAs for CustomExec {
+ fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
std::fmt::Result {
+ write!(f, "CustomExec")
+ }
+}
+
impl ExecutionPlan for CustomExec {
fn as_any(&self) -> &dyn Any {
self
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 0003ffea8c..b497ef7f2b 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -68,6 +68,17 @@ impl ArrowExec {
}
}
+impl DisplayAs for ArrowExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "ArrowExec: ")?;
+ self.base_config.fmt_as(t, f)
+ }
+}
+
impl ExecutionPlan for ArrowExec {
fn as_any(&self) -> &dyn Any {
self
@@ -132,15 +143,6 @@ impl ExecutionPlan for ArrowExec {
Some(self.metrics.clone_inner())
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- write!(f, "ArrowExec: ")?;
- self.base_config.fmt_as(t, f)
- }
-
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index df82ccca29..ec2f4db263 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -65,6 +65,17 @@ impl AvroExec {
}
}
+impl DisplayAs for AvroExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "AvroExec: ")?;
+ self.base_config.fmt_as(t, f)
+ }
+}
+
impl ExecutionPlan for AvroExec {
fn as_any(&self) -> &dyn Any {
self
@@ -141,15 +152,6 @@ impl ExecutionPlan for AvroExec {
Ok(Box::pin(stream))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- write!(f, "AvroExec: ")?;
- self.base_config.fmt_as(t, f)
- }
-
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index eba51615cd..e27da2b41e 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -98,6 +98,18 @@ impl CsvExec {
}
}
+impl DisplayAs for CsvExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "CsvExec: ")?;
+ self.base_config.fmt_as(t, f)?;
+ write!(f, ", has_header={}", self.has_header)
+ }
+}
+
impl ExecutionPlan for CsvExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -171,16 +183,6 @@ impl ExecutionPlan for CsvExec {
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- write!(f, "CsvExec: ")?;
- self.base_config.fmt_as(t, f)?;
- write!(f, ", has_header={}", self.has_header)
- }
-
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 64f7077660..e9082b8084 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -84,6 +84,17 @@ impl NdJsonExec {
}
}
+impl DisplayAs for NdJsonExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "JsonExec: ")?;
+ self.base_config.fmt_as(t, f)
+ }
+}
+
impl ExecutionPlan for NdJsonExec {
fn as_any(&self) -> &dyn Any {
self
@@ -149,15 +160,6 @@ impl ExecutionPlan for NdJsonExec {
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- write!(f, "JsonExec: ")?;
- self.base_config.fmt_as(t, f)
- }
-
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 96e5ce9fa0..d8df3941f5 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -325,6 +325,34 @@ impl ParquetExec {
}
}
+impl DisplayAs for ParquetExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let predicate_string = self
+ .predicate
+ .as_ref()
+ .map(|p| format!(", predicate={p}"))
+ .unwrap_or_default();
+
+ let pruning_predicate_string = self
+ .pruning_predicate
+ .as_ref()
+ .map(|pre| format!(", pruning_predicate={}",
pre.predicate_expr()))
+ .unwrap_or_default();
+
+ write!(f, "ParquetExec: ")?;
+ self.base_config.fmt_as(t, f)?;
+ write!(f, "{}{}", predicate_string, pruning_predicate_string,)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for ParquetExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -413,32 +441,6 @@ impl ExecutionPlan for ParquetExec {
Ok(Box::pin(stream))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let predicate_string = self
- .predicate
- .as_ref()
- .map(|p| format!(", predicate={p}"))
- .unwrap_or_default();
-
- let pruning_predicate_string = self
- .pruning_predicate
- .as_ref()
- .map(|pre| format!(", pruning_predicate={}",
pre.predicate_expr()))
- .unwrap_or_default();
-
- write!(f, "ParquetExec: ")?;
- self.base_config.fmt_as(t, f)?;
- write!(f, "{}{}", predicate_string, pruning_predicate_string,)
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs
b/datafusion/core/src/physical_optimizer/repartition.rs
index 33ebcec6a6..f941912586 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -338,7 +338,7 @@ mod tests {
use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
- use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
+ use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType,
Statistics};
use datafusion_physical_expr::PhysicalSortRequirement;
fn schema() -> SchemaRef {
@@ -1136,6 +1136,16 @@ mod tests {
}
}
+ impl DisplayAs for SortRequiredExec {
+ fn fmt_as(
+ &self,
+ _t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "SortRequiredExec")
+ }
+ }
+
impl ExecutionPlan for SortRequiredExec {
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -1184,13 +1194,5 @@ mod tests {
fn statistics(&self) -> Statistics {
self.input.statistics()
}
-
- fn fmt_as(
- &self,
- _t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- write!(f, "SortRequiredExec")
- }
}
}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 343f7628b7..4bf5f66445 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -58,6 +58,8 @@ use datafusion_physical_expr::utils::{
get_finer_ordering, ordering_satisfy_requirement_concrete,
};
+use super::DisplayAs;
+
/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AggregateMode {
@@ -719,6 +721,80 @@ impl AggregateExec {
}
}
+impl DisplayAs for AggregateExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "AggregateExec: mode={:?}", self.mode)?;
+ let g: Vec<String> = if self.group_by.groups.len() == 1 {
+ self.group_by
+ .expr
+ .iter()
+ .map(|(e, alias)| {
+ let e = e.to_string();
+ if &e != alias {
+ format!("{e} as {alias}")
+ } else {
+ e
+ }
+ })
+ .collect()
+ } else {
+ self.group_by
+ .groups
+ .iter()
+ .map(|group| {
+ let terms = group
+ .iter()
+ .enumerate()
+ .map(|(idx, is_null)| {
+ if *is_null {
+ let (e, alias) =
&self.group_by.null_expr[idx];
+ let e = e.to_string();
+ if &e != alias {
+ format!("{e} as {alias}")
+ } else {
+ e
+ }
+ } else {
+ let (e, alias) =
&self.group_by.expr[idx];
+ let e = e.to_string();
+ if &e != alias {
+ format!("{e} as {alias}")
+ } else {
+ e
+ }
+ }
+ })
+ .collect::<Vec<String>>()
+ .join(", ");
+ format!("({terms})")
+ })
+ .collect()
+ };
+
+ write!(f, ", gby=[{}]", g.join(", "))?;
+
+ let a: Vec<String> = self
+ .aggr_expr
+ .iter()
+ .map(|agg| agg.name().to_string())
+ .collect();
+ write!(f, ", aggr=[{}]", a.join(", "))?;
+
+ if let Some(aggregation_ordering) = &self.aggregation_ordering
{
+ write!(f, ", ordering_mode={:?}",
aggregation_ordering.mode)?;
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
impl ExecutionPlan for AggregateExec {
/// Return a reference to Any that can be used for down-casting
fn as_any(&self) -> &dyn Any {
@@ -838,78 +914,6 @@ impl ExecutionPlan for AggregateExec {
Some(self.metrics.clone_inner())
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "AggregateExec: mode={:?}", self.mode)?;
- let g: Vec<String> = if self.group_by.groups.len() == 1 {
- self.group_by
- .expr
- .iter()
- .map(|(e, alias)| {
- let e = e.to_string();
- if &e != alias {
- format!("{e} as {alias}")
- } else {
- e
- }
- })
- .collect()
- } else {
- self.group_by
- .groups
- .iter()
- .map(|group| {
- let terms = group
- .iter()
- .enumerate()
- .map(|(idx, is_null)| {
- if *is_null {
- let (e, alias) =
&self.group_by.null_expr[idx];
- let e = e.to_string();
- if &e != alias {
- format!("{e} as {alias}")
- } else {
- e
- }
- } else {
- let (e, alias) =
&self.group_by.expr[idx];
- let e = e.to_string();
- if &e != alias {
- format!("{e} as {alias}")
- } else {
- e
- }
- }
- })
- .collect::<Vec<String>>()
- .join(", ");
- format!("({terms})")
- })
- .collect()
- };
-
- write!(f, ", gby=[{}]", g.join(", "))?;
-
- let a: Vec<String> = self
- .aggr_expr
- .iter()
- .map(|agg| agg.name().to_string())
- .collect();
- write!(f, ", aggr=[{}]", a.join(", "))?;
-
- if let Some(aggregation_ordering) = &self.aggregation_ordering
{
- write!(f, ", ordering_mode={:?}",
aggregation_ordering.mode)?;
- }
- }
- }
- Ok(())
- }
-
fn statistics(&self) -> Statistics {
// TODO stats: group expressions:
// - once expressions will be able to compute their own stats, use it
here
@@ -1245,8 +1249,8 @@ mod tests {
};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::{
- ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
- Statistics,
+ DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionContext;
@@ -1579,6 +1583,20 @@ mod tests {
pub yield_first: bool,
}
+ impl DisplayAs for TestYieldingExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "TestYieldingExec")
+ }
+ }
+ }
+ }
+
impl ExecutionPlan for TestYieldingExec {
fn as_any(&self) -> &dyn Any {
self
diff --git a/datafusion/core/src/physical_plan/analyze.rs
b/datafusion/core/src/physical_plan/analyze.rs
index 90b7f4c02f..bba1c37e6d 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -30,7 +30,7 @@ use futures::StreamExt;
use super::expressions::PhysicalSortExpr;
use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
-use super::{Distribution, SendableRecordBatchStream};
+use super::{DisplayAs, Distribution, SendableRecordBatchStream};
use datafusion_execution::TaskContext;
/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
@@ -56,6 +56,20 @@ impl AnalyzeExec {
}
}
+impl DisplayAs for AnalyzeExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "AnalyzeExec verbose={}", self.verbose)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for AnalyzeExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -158,18 +172,6 @@ impl ExecutionPlan for AnalyzeExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "AnalyzeExec verbose={}", self.verbose)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
// Statistics an an ANALYZE plan are not relevant
Statistics::default()
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 0064f353e9..9454a36af7 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -38,6 +38,7 @@ use log::trace;
use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, MetricsSet};
+use super::DisplayAs;
use super::{metrics::ExecutionPlanMetricsSet, Statistics};
/// CoalesceBatchesExec combines small batches into larger batches for more
efficient use of
@@ -73,6 +74,24 @@ impl CoalesceBatchesExec {
}
}
+impl DisplayAs for CoalesceBatchesExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "CoalesceBatchesExec: target_batch_size={}",
+ self.target_batch_size
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for CoalesceBatchesExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -141,22 +160,6 @@ impl ExecutionPlan for CoalesceBatchesExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "CoalesceBatchesExec: target_batch_size={}",
- self.target_batch_size
- )
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index d04ffa576a..82f74a62bb 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::stream::{ObservedStream, RecordBatchReceiverStream};
-use super::Statistics;
+use super::{DisplayAs, Statistics};
use crate::physical_plan::{
DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning,
};
@@ -60,6 +60,20 @@ impl CoalescePartitionsExec {
}
}
+impl DisplayAs for CoalescePartitionsExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "CoalescePartitionsExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for CoalescePartitionsExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -146,18 +160,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "CoalescePartitionsExec")
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/empty.rs
b/datafusion/core/src/physical_plan/empty.rs
index 4d1b1cffc0..17bfa0af3a 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -30,7 +30,7 @@ use datafusion_common::{DataFusionError, Result};
use log::trace;
use super::expressions::PhysicalSortExpr;
-use super::{common, SendableRecordBatchStream, Statistics};
+use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
use datafusion_execution::TaskContext;
@@ -94,6 +94,20 @@ impl EmptyExec {
}
}
+impl DisplayAs for EmptyExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "EmptyExec: produce_one_row={}",
self.produce_one_row)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for EmptyExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -148,18 +162,6 @@ impl ExecutionPlan for EmptyExec {
)?))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "EmptyExec: produce_one_row={}",
self.produce_one_row)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
let batch = self
.data()
diff --git a/datafusion/core/src/physical_plan/explain.rs
b/datafusion/core/src/physical_plan/explain.rs
index 22b0817c33..d732e70f8a 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -28,6 +28,7 @@ use crate::physical_plan::{DisplayFormatType, ExecutionPlan,
Partitioning, Stati
use arrow::{array::StringBuilder, datatypes::SchemaRef,
record_batch::RecordBatch};
use log::trace;
+use super::DisplayAs;
use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_execution::TaskContext;
@@ -70,6 +71,20 @@ impl ExplainExec {
}
}
+impl DisplayAs for ExplainExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "ExplainExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for ExplainExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -156,18 +171,6 @@ impl ExecutionPlan for ExplainExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "ExplainExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
// Statistics an EXPLAIN plan are not relevant
Statistics::default()
diff --git a/datafusion/core/src/physical_plan/filter.rs
b/datafusion/core/src/physical_plan/filter.rs
index d56f582028..d2cfc3ce8a 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -24,7 +24,9 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use super::expressions::PhysicalSortExpr;
-use super::{ColumnStatistics, RecordBatchStream, SendableRecordBatchStream,
Statistics};
+use super::{
+ ColumnStatistics, DisplayAs, RecordBatchStream, SendableRecordBatchStream,
Statistics,
+};
use crate::physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
Column, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
Partitioning,
@@ -85,6 +87,20 @@ impl FilterExec {
}
}
+impl DisplayAs for FilterExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "FilterExec: {}", self.predicate)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for FilterExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -157,18 +173,6 @@ impl ExecutionPlan for FilterExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "FilterExec: {}", self.predicate)
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/insert.rs
b/datafusion/core/src/physical_plan/insert.rs
index 8bcb9bab83..8766b62e9a 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -138,6 +138,21 @@ impl InsertExec {
}
}
+impl DisplayAs for InsertExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "InsertExec: sink=")?;
+ self.sink.fmt_as(t, f)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for InsertExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -232,19 +247,6 @@ impl ExecutionPlan for InsertExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "InsertExec: sink=")?;
- self.sink.fmt_as(t, f)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
Statistics::default()
}
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index f752d134a1..1ecbfbea95 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -26,6 +26,7 @@ use arrow::datatypes::{Fields, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use crate::physical_plan::DisplayAs;
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
@@ -139,6 +140,20 @@ async fn load_left_input(
Ok((merged_batch, reservation))
}
+impl DisplayAs for CrossJoinExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "CrossJoinExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for CrossJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -243,18 +258,6 @@ impl ExecutionPlan for CrossJoinExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "CrossJoinExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
stats_cartesian_product(
self.left.statistics(),
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index a3c553c9b3..d258e7529a 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -60,6 +60,7 @@ use crate::physical_plan::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices,
get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
};
+use crate::physical_plan::DisplayAs;
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
@@ -204,6 +205,30 @@ impl HashJoinExec {
}
}
+impl DisplayAs for HashJoinExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let display_filter = self.filter.as_ref().map_or_else(
+ || "".to_string(),
+ |f| format!(", filter={}", f.expression()),
+ );
+ let on = self
+ .on
+ .iter()
+ .map(|(c1, c2)| format!("({}, {})", c1, c2))
+ .collect::<Vec<String>>()
+ .join(", ");
+ write!(
+ f,
+ "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
+ self.mode, self.join_type, on, display_filter
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for HashJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -420,28 +445,6 @@ impl ExecutionPlan for HashJoinExec {
}))
}
- fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let display_filter = self.filter.as_ref().map_or_else(
- || "".to_string(),
- |f| format!(", filter={}", f.expression()),
- );
- let on = self
- .on
- .iter()
- .map(|(c1, c2)| format!("({}, {})", c1, c2))
- .collect::<Vec<String>>()
- .join(", ");
- write!(
- f,
- "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}",
- self.mode, self.join_type, on, display_filter
- )
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
index 6586456fd2..d5de88d933 100644
--- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
+++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
@@ -29,8 +29,8 @@ use crate::physical_plan::joins::utils::{
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
- DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
RecordBatchStream,
- SendableRecordBatchStream,
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ RecordBatchStream, SendableRecordBatchStream,
};
use arrow::array::{
BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array,
UInt64Builder,
@@ -120,6 +120,24 @@ impl NestedLoopJoinExec {
}
}
+impl DisplayAs for NestedLoopJoinExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let display_filter = self.filter.as_ref().map_or_else(
+ || "".to_string(),
+ |f| format!(", filter={}", f.expression()),
+ );
+ write!(
+ f,
+ "NestedLoopJoinExec: join_type={:?}{}",
+ self.join_type, display_filter
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for NestedLoopJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -249,22 +267,6 @@ impl ExecutionPlan for NestedLoopJoinExec {
}))
}
- fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let display_filter = self.filter.as_ref().map_or_else(
- || "".to_string(),
- |f| format!(", filter={}", f.expression()),
- );
- write!(
- f,
- "NestedLoopJoinExec: join_type={:?}{}",
- self.join_type, display_filter
- )
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index bc8c686670..2a628a6146 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -46,8 +46,9 @@ use crate::physical_plan::joins::utils::{
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder,
MetricsSet};
use crate::physical_plan::{
- metrics, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan,
- Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream,
Statistics,
+ metrics, DisplayAs, DisplayFormatType, Distribution, EquivalenceProperties,
+ ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
};
use datafusion_common::DataFusionError;
use datafusion_common::JoinType;
@@ -200,6 +201,26 @@ impl SortMergeJoinExec {
}
}
+impl DisplayAs for SortMergeJoinExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let on = self
+ .on
+ .iter()
+ .map(|(c1, c2)| format!("({}, {})", c1, c2))
+ .collect::<Vec<String>>()
+ .join(", ");
+ write!(
+ f,
+ "SortMergeJoin: join_type={:?}, on=[{}]",
+ self.join_type, on
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for SortMergeJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -361,24 +382,6 @@ impl ExecutionPlan for SortMergeJoinExec {
Some(self.metrics.clone_inner())
}
- fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let on = self
- .on
- .iter()
- .map(|(c1, c2)| format!("({}, {})", c1, c2))
- .collect::<Vec<String>>()
- .join(", ");
- write!(
- f,
- "SortMergeJoin: join_type={:?}, on=[{}]",
- self.join_type, on
- )
- }
- }
- }
-
fn statistics(&self) -> Statistics {
// TODO stats: it is not possible in general to know the output size
of joins
// There are some special cases though, for example:
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index b46aba2fb5..dfd38a20c0 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -57,6 +57,7 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph,
Interval, IntervalB
use crate::physical_plan::common::SharedMemoryReservation;
use
crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
+use crate::physical_plan::DisplayAs;
use crate::physical_plan::{
expressions::Column,
expressions::PhysicalSortExpr,
@@ -385,6 +386,30 @@ impl SymmetricHashJoinExec {
}
}
+impl DisplayAs for SymmetricHashJoinExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let display_filter = self.filter.as_ref().map_or_else(
+ || "".to_string(),
+ |f| format!(", filter={}", f.expression()),
+ );
+ let on = self
+ .on
+ .iter()
+ .map(|(c1, c2)| format!("({}, {})", c1, c2))
+ .collect::<Vec<String>>()
+ .join(", ");
+ write!(
+ f,
+ "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}",
+ self.join_type, on, display_filter
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for SymmetricHashJoinExec {
fn as_any(&self) -> &dyn Any {
self
@@ -460,28 +485,6 @@ impl ExecutionPlan for SymmetricHashJoinExec {
)?))
}
- fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let display_filter = self.filter.as_ref().map_or_else(
- || "".to_string(),
- |f| format!(", filter={}", f.expression()),
- );
- let on = self
- .on
- .iter()
- .map(|(c1, c2)| format!("({}, {})", c1, c2))
- .collect::<Vec<String>>()
- .join(", ");
- write!(
- f,
- "SymmetricHashJoinExec: join_type={:?}, on=[{}]{}",
- self.join_type, on, display_filter
- )
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index d1948cc288..572cea006c 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -34,6 +34,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use datafusion_common::{DataFusionError, Result};
use super::expressions::PhysicalSortExpr;
+use super::DisplayAs;
use super::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
RecordBatchStream, SendableRecordBatchStream, Statistics,
@@ -82,6 +83,25 @@ impl GlobalLimitExec {
}
}
+impl DisplayAs for GlobalLimitExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "GlobalLimitExec: skip={}, fetch={}",
+ self.skip,
+ self.fetch.map_or("None".to_string(), |x| x.to_string())
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for GlobalLimitExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -164,23 +184,6 @@ impl ExecutionPlan for GlobalLimitExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "GlobalLimitExec: skip={}, fetch={}",
- self.skip,
- self.fetch.map_or("None".to_string(), |x| x.to_string())
- )
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
@@ -265,6 +268,20 @@ impl LocalLimitExec {
}
}
+impl DisplayAs for LocalLimitExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "LocalLimitExec: fetch={}", self.fetch)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for LocalLimitExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -331,18 +348,6 @@ impl ExecutionPlan for LocalLimitExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "LocalLimitExec: fetch={}", self.fetch)
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/memory.rs
b/datafusion/core/src/physical_plan/memory.rs
index fa456d15ee..16c22a3b2d 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -19,7 +19,7 @@
use super::expressions::PhysicalSortExpr;
use super::{
- common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning,
+ common, project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use arrow::datatypes::SchemaRef;
@@ -56,6 +56,27 @@ impl fmt::Debug for MemoryExec {
}
}
+impl DisplayAs for MemoryExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let partitions: Vec<_> =
+ self.partitions.iter().map(|b| b.len()).collect();
+ write!(
+ f,
+ "MemoryExec: partitions={}, partition_sizes={:?}",
+ partitions.len(),
+ partitions
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for MemoryExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -102,25 +123,6 @@ impl ExecutionPlan for MemoryExec {
)?))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let partitions: Vec<_> =
- self.partitions.iter().map(|b| b.len()).collect();
- write!(
- f,
- "MemoryExec: partitions={}, partition_sizes={:?}",
- partitions.len(),
- partitions
- )
- }
- }
- }
-
/// We recompute the statistics dynamically from the arrow metadata as it
is pretty cheap to do so
fn statistics(&self) -> Statistics {
common::compute_record_batch_statistics(
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 7efd5a19ee..4e1de2a6cb 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -99,7 +99,7 @@ impl Stream for EmptyRecordBatchStream {
/// [`ExecutionPlan`] can be displayed in a simplified form using the
/// return value from [`displayable`] in addition to the (normally
/// quite verbose) `Debug` output.
-pub trait ExecutionPlan: Debug + Send + Sync {
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// Returns the execution plan as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
@@ -225,16 +225,6 @@ pub trait ExecutionPlan: Debug + Send + Sync {
None
}
- /// Format this `ExecutionPlan` to `f` in the specified type.
- ///
- /// Should not include a newline
- ///
- /// Note this function prints a placeholder by default to preserve
- /// backwards compatibility.
- fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
- write!(f, "ExecutionPlan(PlaceHolder)")
- }
-
/// Returns the global output statistics for this `ExecutionPlan` node.
fn statistics(&self) -> Statistics;
}
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index d1e1a4f9db..c6845c7afe 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -39,7 +39,7 @@ use log::trace;
use super::expressions::{Column, PhysicalSortExpr};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
-use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream,
Statistics};
use datafusion_physical_expr::{
normalize_out_expr_with_columns_map, project_equivalence_properties,
@@ -156,6 +156,33 @@ impl ProjectionExec {
}
}
+impl DisplayAs for ProjectionExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr: Vec<String> = self
+ .expr
+ .iter()
+ .map(|(e, alias)| {
+ let e = e.to_string();
+ if &e != alias {
+ format!("{e} as {alias}")
+ } else {
+ e
+ }
+ })
+ .collect();
+
+ write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
+ }
+ }
+ }
+}
+
impl ExecutionPlan for ProjectionExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -260,31 +287,6 @@ impl ExecutionPlan for ProjectionExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let expr: Vec<String> = self
- .expr
- .iter()
- .map(|(e, alias)| {
- let e = e.to_string();
- if &e != alias {
- format!("{e} as {alias}")
- } else {
- e
- }
- })
- .collect();
-
- write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 3c689e97ab..cb4d5c8988 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -43,7 +43,7 @@ use self::distributor_channels::{DistributionReceiver,
DistributionSender};
use super::common::{AbortOnDropMany, AbortOnDropSingle,
SharedMemoryReservation};
use super::expressions::PhysicalSortExpr;
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream};
use crate::physical_plan::common::transpose;
use crate::physical_plan::metrics::BaselineMetrics;
@@ -320,6 +320,26 @@ impl RepartitionExec {
}
}
+impl DisplayAs for RepartitionExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "{}: partitioning={}, input_partitions={}",
+ self.name(),
+ self.partitioning,
+ self.input.output_partitioning().partition_count()
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for RepartitionExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -515,24 +535,6 @@ impl ExecutionPlan for RepartitionExec {
Some(self.metrics.clone_inner())
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "{}: partitioning={}, input_partitions={}",
- self.name(),
- self.partitioning,
- self.input.output_partitioning().partition_count()
- )
- }
- }
- }
-
fn statistics(&self) -> Statistics {
self.input.statistics()
}
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 205ec706b5..f660f0acf8 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -27,8 +27,8 @@ use crate::physical_plan::metrics::{
use crate::physical_plan::sorts::merge::streaming_merge;
use crate::physical_plan::stream::{RecordBatchReceiverStream,
RecordBatchStreamAdapter};
use crate::physical_plan::{
- DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
Partitioning,
- SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream,
ExecutionPlan,
+ Partitioning, SendableRecordBatchStream, Statistics,
};
pub use arrow::compute::SortOptions;
use arrow::compute::{concat_batches, lexsort_to_indices, take};
@@ -513,6 +513,26 @@ impl SortExec {
}
}
+impl DisplayAs for SortExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "SortExec: fetch={fetch}, expr=[{}]",
expr.join(","))
+ }
+ None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
+ }
+ }
+ }
+ }
+}
+
impl ExecutionPlan for SortExec {
fn as_any(&self) -> &dyn Any {
self
@@ -619,24 +639,6 @@ impl ExecutionPlan for SortExec {
Some(self.metrics_set.clone_inner())
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
- match self.fetch {
- Some(fetch) => {
- write!(f, "SortExec: fetch={fetch}, expr=[{}]",
expr.join(","))
- }
- None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
- }
- }
- }
- }
-
fn statistics(&self) -> Statistics {
self.input.statistics()
}
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 397d254162..8262a18f23 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -28,6 +28,7 @@ use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::sorts::streaming_merge;
+use crate::physical_plan::DisplayAs;
use crate::physical_plan::{
expressions::PhysicalSortExpr, DisplayFormatType, Distribution,
ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
@@ -107,6 +108,26 @@ impl SortPreservingMergeExec {
}
}
+impl DisplayAs for SortPreservingMergeExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
+ write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?;
+ if let Some(fetch) = self.fetch {
+ write!(f, ", fetch={fetch}")?;
+ };
+
+ Ok(())
+ }
+ }
+ }
+}
+
impl ExecutionPlan for SortPreservingMergeExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -215,24 +236,6 @@ impl ExecutionPlan for SortPreservingMergeExec {
}
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
- write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))?;
- if let Some(fetch) = self.fetch {
- write!(f, ", fetch={fetch}")?;
- };
-
- Ok(())
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/streaming.rs
b/datafusion/core/src/physical_plan/streaming.rs
index 48f1409e14..887592f295 100644
--- a/datafusion/core/src/physical_plan/streaming.rs
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -32,6 +32,8 @@ use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::{ExecutionPlan, Partitioning,
SendableRecordBatchStream};
use datafusion_execution::TaskContext;
+use super::{DisplayAs, DisplayFormatType};
+
/// A partition that can be converted into a [`SendableRecordBatchStream`]
pub trait PartitionStream: Send + Sync {
/// Returns the schema of this partition
@@ -90,6 +92,20 @@ impl std::fmt::Debug for StreamingTableExec {
}
}
+impl DisplayAs for StreamingTableExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "StreamingTableExec")
+ }
+ }
+ }
+}
+
#[async_trait]
impl ExecutionPlan for StreamingTableExec {
fn as_any(&self) -> &dyn Any {
diff --git a/datafusion/core/src/physical_plan/union.rs
b/datafusion/core/src/physical_plan/union.rs
index ffd3f06ee8..e1f8072085 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -34,6 +34,7 @@ use futures::Stream;
use itertools::Itertools;
use log::{debug, trace, warn};
+use super::DisplayAs;
use super::{
expressions::PhysicalSortExpr,
metrics::{ExecutionPlanMetricsSet, MetricsSet},
@@ -144,6 +145,20 @@ impl UnionExec {
}
}
+impl DisplayAs for UnionExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "UnionExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for UnionExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -247,18 +262,6 @@ impl ExecutionPlan for UnionExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "UnionExec")
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
@@ -342,6 +345,20 @@ impl InterleaveExec {
}
}
+impl DisplayAs for InterleaveExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "InterleaveExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for InterleaveExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -421,18 +438,6 @@ impl ExecutionPlan for InterleaveExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "InterleaveExec")
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/unnest.rs
b/datafusion/core/src/physical_plan/unnest.rs
index 2e5c92872f..7f8d56847e 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -38,6 +38,8 @@ use crate::physical_plan::{
};
use datafusion_common::{DataFusionError, Result, ScalarValue};
+use super::DisplayAs;
+
/// Unnest the given column by joining the row with each value in the nested
type.
#[derive(Debug)]
pub struct UnnestExec {
@@ -60,6 +62,20 @@ impl UnnestExec {
}
}
+impl DisplayAs for UnnestExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "UnnestExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for UnnestExec {
fn as_any(&self) -> &dyn Any {
self
@@ -126,18 +142,6 @@ impl ExecutionPlan for UnnestExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "UnnestExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
Default::default()
}
diff --git a/datafusion/core/src/physical_plan/values.rs
b/datafusion/core/src/physical_plan/values.rs
index 4099cfdb36..70e00ed034 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -18,7 +18,7 @@
//! Values execution plan
use super::expressions::PhysicalSortExpr;
-use super::{common, SendableRecordBatchStream, Statistics};
+use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
use crate::physical_plan::{
memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan,
Partitioning,
PhysicalExpr,
@@ -94,6 +94,20 @@ impl ValuesExec {
}
}
+impl DisplayAs for ValuesExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "ValuesExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for ValuesExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -145,18 +159,6 @@ impl ExecutionPlan for ValuesExec {
)?))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "ValuesExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
let batch = self.data();
common::compute_record_batch_statistics(&[batch], &self.schema, None)
diff --git
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 9c86abec81..1f0da4a8e6 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -28,8 +28,8 @@ use crate::physical_plan::windows::{
calc_requirements, get_ordered_partition_by_indices,
window_ordering_equivalence,
};
use crate::physical_plan::{
- ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning,
- RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
+ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan,
+ Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
WindowExpr,
};
use datafusion_common::Result;
use datafusion_execution::TaskContext;
@@ -201,6 +201,35 @@ impl BoundedWindowAggExec {
}
}
+impl DisplayAs for BoundedWindowAggExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "BoundedWindowAggExec: ")?;
+ let g: Vec<String> = self
+ .window_expr
+ .iter()
+ .map(|e| {
+ format!(
+ "{}: {:?}, frame: {:?}",
+ e.name().to_owned(),
+ e.field(),
+ e.get_window_frame()
+ )
+ })
+ .collect();
+ let mode = &self.partition_search_mode;
+ write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?;
+ }
+ }
+ Ok(())
+ }
+}
+
impl ExecutionPlan for BoundedWindowAggExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -299,33 +328,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
Ok(stream)
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "BoundedWindowAggExec: ")?;
- let g: Vec<String> = self
- .window_expr
- .iter()
- .map(|e| {
- format!(
- "{}: {:?}, frame: {:?}",
- e.name().to_owned(),
- e.field(),
- e.get_window_frame()
- )
- })
- .collect();
- let mode = &self.partition_search_mode;
- write!(f, "wdw=[{}], mode=[{:?}]", g.join(", "), mode)?;
- }
- }
- Ok(())
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 4a0648c8f1..7e7fc22965 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -26,7 +26,7 @@ use crate::physical_plan::windows::{
calc_requirements, get_ordered_partition_by_indices,
window_ordering_equivalence,
};
use crate::physical_plan::{
- ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
@@ -121,6 +121,34 @@ impl WindowAggExec {
}
}
+impl DisplayAs for WindowAggExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "WindowAggExec: ")?;
+ let g: Vec<String> = self
+ .window_expr
+ .iter()
+ .map(|e| {
+ format!(
+ "{}: {:?}, frame: {:?}",
+ e.name().to_owned(),
+ e.field(),
+ e.get_window_frame()
+ )
+ })
+ .collect();
+ write!(f, "wdw=[{}]", g.join(", "))?;
+ }
+ }
+ Ok(())
+ }
+}
+
impl ExecutionPlan for WindowAggExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -225,32 +253,6 @@ impl ExecutionPlan for WindowAggExec {
Ok(stream)
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "WindowAggExec: ")?;
- let g: Vec<String> = self
- .window_expr
- .iter()
- .map(|e| {
- format!(
- "{}: {:?}, frame: {:?}",
- e.name().to_owned(),
- e.field(),
- e.get_window_frame()
- )
- })
- .collect();
- write!(f, "wdw=[{}]", g.join(", "))?;
- }
- }
- Ok(())
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index f00f5e0d5e..92c7604c37 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1934,10 +1934,10 @@ mod tests {
use super::*;
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::MemTable;
- use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{
expressions, DisplayFormatType, Partitioning, Statistics,
};
+ use crate::physical_plan::{DisplayAs, SendableRecordBatchStream};
use crate::physical_planner::PhysicalPlanner;
use crate::prelude::{SessionConfig, SessionContext};
use crate::scalar::ScalarValue;
@@ -2500,6 +2500,16 @@ mod tests {
schema: SchemaRef,
}
+ impl DisplayAs for NoOpExecutionPlan {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "NoOpExecutionPlan")
+ }
+ }
+ }
+ }
+
impl ExecutionPlan for NoOpExecutionPlan {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
@@ -2537,14 +2547,6 @@ mod tests {
unimplemented!("NoOpExecutionPlan::execute");
}
- fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "NoOpExecutionPlan")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
unimplemented!("NoOpExecutionPlan::statistics");
}
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index 32e7d4bcfe..63c86084ea 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -31,11 +31,11 @@ use arrow::{
};
use futures::Stream;
-use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
+use crate::physical_plan::{expressions::PhysicalSortExpr, DisplayAs};
use crate::{
error::{DataFusionError, Result},
physical_plan::stream::RecordBatchReceiverStream,
@@ -153,6 +153,20 @@ impl MockExec {
}
}
+impl DisplayAs for MockExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "MockExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for MockExec {
fn as_any(&self) -> &dyn Any {
self
@@ -225,18 +239,6 @@ impl ExecutionPlan for MockExec {
}
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "MockExec")
- }
- }
- }
-
// Panics if one of the batches is an error
fn statistics(&self) -> Statistics {
let data: Result<Vec<_>> = self
@@ -295,6 +297,20 @@ impl BarrierExec {
}
}
+impl DisplayAs for BarrierExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "BarrierExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for BarrierExec {
fn as_any(&self) -> &dyn Any {
self
@@ -352,18 +368,6 @@ impl ExecutionPlan for BarrierExec {
Ok(builder.build())
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "BarrierExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
common::compute_record_batch_statistics(&self.data, &self.schema, None)
}
@@ -392,6 +396,20 @@ impl ErrorExec {
}
}
+impl DisplayAs for ErrorExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "ErrorExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for ErrorExec {
fn as_any(&self) -> &dyn Any {
self
@@ -431,18 +449,6 @@ impl ExecutionPlan for ErrorExec {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "ErrorExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
Statistics::default()
}
@@ -470,6 +476,26 @@ impl StatisticsExec {
}
}
}
+
+impl DisplayAs for StatisticsExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "StatisticsExec: col_count={}, row_count={:?}",
+ self.schema.fields().len(),
+ self.stats.num_rows,
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for StatisticsExec {
fn as_any(&self) -> &dyn Any {
self
@@ -509,23 +535,6 @@ impl ExecutionPlan for StatisticsExec {
fn statistics(&self) -> Statistics {
self.stats.clone()
}
-
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "StatisticsExec: col_count={}, row_count={:?}",
- self.schema.fields().len(),
- self.stats.num_rows,
- )
- }
- }
- }
}
/// Execution plan that emits streams that block forever.
@@ -563,6 +572,20 @@ impl BlockingExec {
}
}
+impl DisplayAs for BlockingExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "BlockingExec",)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for BlockingExec {
fn as_any(&self) -> &dyn Any {
self
@@ -605,18 +628,6 @@ impl ExecutionPlan for BlockingExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "BlockingExec",)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
unimplemented!()
}
@@ -697,6 +708,20 @@ impl PanicExec {
}
}
+impl DisplayAs for PanicExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "PanickingExec",)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for PanicExec {
fn as_any(&self) -> &dyn Any {
self
@@ -743,18 +768,6 @@ impl ExecutionPlan for PanicExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "PanickingExec",)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
unimplemented!()
}
diff --git a/datafusion/core/src/test_util/mod.rs
b/datafusion/core/src/test_util/mod.rs
index 6715a6ba98..7f73e00dca 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -33,7 +33,7 @@ use crate::execution::context::{SessionState, TaskContext};
use crate::execution::options::ReadOptions;
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
SendableRecordBatchStream,
};
use crate::prelude::{CsvReadOptions, SessionContext};
@@ -363,6 +363,25 @@ impl UnboundedExec {
}
}
}
+
+impl DisplayAs for UnboundedExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "UnboundableExec: unbounded={}",
+ self.batch_produce.is_none(),
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for UnboundedExec {
fn as_any(&self) -> &dyn Any {
self
@@ -406,22 +425,6 @@ impl ExecutionPlan for UnboundedExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "UnboundableExec: unbounded={}",
- self.batch_produce.is_none(),
- )
- }
- }
- }
-
fn statistics(&self) -> Statistics {
Statistics::default()
}
diff --git a/datafusion/core/tests/custom_sources.rs
b/datafusion/core/tests/custom_sources.rs
index 7374a056d6..0f742f7b9b 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -26,8 +26,8 @@ use datafusion::logical_expr::{
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
- project_schema, ColumnStatistics, ExecutionPlan, Partitioning,
RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ project_schema, ColumnStatistics, DisplayAs, ExecutionPlan, Partitioning,
+ RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion::scalar::ScalarValue;
use datafusion::{
@@ -101,6 +101,20 @@ impl Stream for TestCustomRecordBatchStream {
}
}
+impl DisplayAs for CustomExecutionPlan {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "CustomExecutionPlan: projection={:#?}",
self.projection)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for CustomExecutionPlan {
fn as_any(&self) -> &dyn Any {
self
@@ -138,18 +152,6 @@ impl ExecutionPlan for CustomExecutionPlan {
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "CustomExecutionPlan: projection={:#?}",
self.projection)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
let batch = TEST_CUSTOM_RECORD_BATCH!().unwrap();
Statistics {
diff --git
a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
index 36b15f4359..b471121d1b 100644
--- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs
@@ -26,7 +26,8 @@ use datafusion::logical_expr::{Expr,
TableProviderFilterPushDown};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
+ Statistics,
};
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
@@ -58,6 +59,20 @@ struct CustomPlan {
batches: Vec<RecordBatch>,
}
+impl DisplayAs for CustomPlan {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "CustomPlan: batch_size={}", self.batches.len(),)
+ }
+ }
+ }
+}
+
impl ExecutionPlan for CustomPlan {
fn as_any(&self) -> &dyn std::any::Any {
self
@@ -97,18 +112,6 @@ impl ExecutionPlan for CustomPlan {
)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "CustomPlan: batch_size={}", self.batches.len(),)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
// here we could provide more accurate statistics
// but we want to test the filter pushdown not the CBOs
diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs
b/datafusion/core/tests/custom_sources_cases/statistics.rs
index 73237fa857..167cf0e7f3 100644
--- a/datafusion/core/tests/custom_sources_cases/statistics.rs
+++ b/datafusion/core/tests/custom_sources_cases/statistics.rs
@@ -25,7 +25,7 @@ use datafusion::{
error::Result,
logical_expr::Expr,
physical_plan::{
- expressions::PhysicalSortExpr, project_schema, ColumnStatistics,
+ expressions::PhysicalSortExpr, project_schema, ColumnStatistics,
DisplayAs,
DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
Statistics,
},
@@ -111,6 +111,25 @@ impl TableProvider for StatisticsValidation {
}
}
+impl DisplayAs for StatisticsValidation {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "StatisticsValidation: col_count={}, row_count={:?}",
+ self.schema.fields().len(),
+ self.stats.num_rows,
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for StatisticsValidation {
fn as_any(&self) -> &dyn Any {
self
@@ -150,23 +169,6 @@ impl ExecutionPlan for StatisticsValidation {
fn statistics(&self) -> Statistics {
self.stats.clone()
}
-
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "StatisticsValidation: col_count={}, row_count={:?}",
- self.schema.fields().len(),
- self.stats.num_rows,
- )
- }
- }
- }
}
fn init_ctx(stats: Statistics, schema: Schema) -> Result<SessionContext> {
diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs
b/datafusion/core/tests/user_defined/user_defined_plan.rs
index 493d90d91a..0771ef7f79 100644
--- a/datafusion/core/tests/user_defined/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined/user_defined_plan.rs
@@ -80,8 +80,9 @@ use datafusion::{
},
optimizer::{optimize_children, OptimizerConfig, OptimizerRule},
physical_plan::{
- expressions::PhysicalSortExpr, DisplayFormatType, Distribution,
ExecutionPlan,
- Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
+ expressions::PhysicalSortExpr, DisplayAs, DisplayFormatType,
Distribution,
+ ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
+ Statistics,
},
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner,
PhysicalPlanner},
prelude::{SessionConfig, SessionContext},
@@ -421,6 +422,20 @@ impl Debug for TopKExec {
}
}
+impl DisplayAs for TopKExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "TopKExec: k={}", self.k)
+ }
+ }
+ }
+}
+
#[async_trait]
impl ExecutionPlan for TopKExec {
/// Return a reference to Any that can be used for downcasting
@@ -478,18 +493,6 @@ impl ExecutionPlan for TopKExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "TopKExec: k={}", self.k)
- }
- }
- }
-
fn statistics(&self) -> Statistics {
// to improve the optimizability of this plan
// better statistics inference could be provided