This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9cf32cf Implement readable explain plans for physical plans (#337)
9cf32cf is described below
commit 9cf32cf2cda8472b87130142c4eee1126d4d9cbe
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri May 14 15:01:15 2021 -0400
Implement readable explain plans for physical plans (#337)
* Implement readable explain plans for physical plans
* Add apache copyright to display.rs
* Set concurrency explictly in test and make it windows friendly
* fix doc example test
* fmt!
---
datafusion/src/logical_plan/display.rs | 13 +-
datafusion/src/logical_plan/plan.rs | 4 +-
datafusion/src/physical_plan/coalesce_batches.rs | 19 ++-
datafusion/src/physical_plan/cross_join.rs | 19 ++-
datafusion/src/physical_plan/csv.rs | 32 ++++-
datafusion/src/physical_plan/display.rs | 90 ++++++++++++
.../src/physical_plan/distinct_expressions.rs | 4 +
datafusion/src/physical_plan/empty.rs | 17 ++-
datafusion/src/physical_plan/explain.rs | 19 ++-
.../src/physical_plan/expressions/average.rs | 4 +
datafusion/src/physical_plan/expressions/count.rs | 4 +
.../src/physical_plan/expressions/min_max.rs | 8 ++
datafusion/src/physical_plan/expressions/mod.rs | 13 ++
datafusion/src/physical_plan/expressions/sum.rs | 4 +
datafusion/src/physical_plan/filter.rs | 16 ++-
datafusion/src/physical_plan/hash_aggregate.rs | 39 +++++-
datafusion/src/physical_plan/hash_join.rs | 21 ++-
datafusion/src/physical_plan/limit.rs | 28 +++-
datafusion/src/physical_plan/memory.rs | 24 +++-
datafusion/src/physical_plan/merge.rs | 15 +-
datafusion/src/physical_plan/mod.rs | 154 ++++++++++++++++++++-
datafusion/src/physical_plan/parquet.rs | 28 +++-
datafusion/src/physical_plan/planner.rs | 7 +-
datafusion/src/physical_plan/projection.rs | 29 +++-
datafusion/src/physical_plan/repartition.rs | 14 +-
datafusion/src/physical_plan/sort.rs | 15 +-
datafusion/src/physical_plan/udaf.rs | 4 +
datafusion/tests/custom_sources.rs | 17 ++-
datafusion/tests/sql.rs | 48 ++++++-
datafusion/tests/user_defined_plan.rs | 22 ++-
30 files changed, 683 insertions(+), 48 deletions(-)
diff --git a/datafusion/src/logical_plan/display.rs
b/datafusion/src/logical_plan/display.rs
index 76749b5..f285534 100644
--- a/datafusion/src/logical_plan/display.rs
+++ b/datafusion/src/logical_plan/display.rs
@@ -29,7 +29,8 @@ pub struct IndentVisitor<'a, 'b> {
f: &'a mut fmt::Formatter<'b>,
/// If true, includes summarized schema information
with_schema: bool,
- indent: u32,
+ /// The current indent
+ indent: usize,
}
impl<'a, 'b> IndentVisitor<'a, 'b> {
@@ -42,13 +43,6 @@ impl<'a, 'b> IndentVisitor<'a, 'b> {
indent: 0,
}
}
-
- fn write_indent(&mut self) -> fmt::Result {
- for _ in 0..self.indent {
- write!(self.f, " ")?;
- }
- Ok(())
- }
}
impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
@@ -58,8 +52,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
if self.indent > 0 {
writeln!(self.f)?;
}
- self.write_indent()?;
-
+ write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
write!(self.f, "{}", plan.display())?;
if self.with_schema {
write!(
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index 13509d1..8b9aac9 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -356,13 +356,15 @@ pub enum Partitioning {
/// after all children have been visited.
////
/// To use, define a struct that implements this trait and then invoke
-/// "LogicalPlan::accept".
+/// [`LogicalPlan::accept`].
///
/// For example, for a logical plan like:
///
+/// ```text
/// Projection: #id
/// Filter: #state Eq Utf8(\"CO\")\
/// CsvScan: employee.csv projection=Some([0, 3])";
+/// ```
///
/// The sequence of visit operations would be:
/// ```text
diff --git a/datafusion/src/physical_plan/coalesce_batches.rs
b/datafusion/src/physical_plan/coalesce_batches.rs
index b91e0b6..e25412d 100644
--- a/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/src/physical_plan/coalesce_batches.rs
@@ -25,7 +25,8 @@ use std::task::{Context, Poll};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
- ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream,
};
use arrow::compute::kernels::concat::concat;
@@ -114,6 +115,22 @@ impl ExecutionPlan for CoalesceBatchesExec {
is_closed: false,
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(
+ f,
+ "CoalesceBatchesExec: target_batch_size={}",
+ self.target_batch_size
+ )
+ }
+ }
+ }
}
struct CoalesceBatchesStream {
diff --git a/datafusion/src/physical_plan/cross_join.rs
b/datafusion/src/physical_plan/cross_join.rs
index 4372352..f6f5da4 100644
--- a/datafusion/src/physical_plan/cross_join.rs
+++ b/datafusion/src/physical_plan/cross_join.rs
@@ -21,7 +21,6 @@
use futures::{lock::Mutex, StreamExt};
use std::{any::Any, sync::Arc, task::Poll};
-use crate::physical_plan::memory::MemoryStream;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
@@ -36,8 +35,10 @@ use crate::{
use async_trait::async_trait;
use std::time::Instant;
-use super::{ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream};
-use crate::physical_plan::coalesce_batches::concat_batches;
+use super::{
+ coalesce_batches::concat_batches, memory::MemoryStream, DisplayFormatType,
+ ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
use log::debug;
/// Data of the left side
@@ -192,6 +193,18 @@ impl ExecutionPlan for CrossJoinExec {
join_time: 0,
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "CrossJoinExec")
+ }
+ }
+ }
}
/// A stream that issues [RecordBatch]es as they arrive from the right of the
join.
diff --git a/datafusion/src/physical_plan/csv.rs
b/datafusion/src/physical_plan/csv.rs
index 9ab8177..96b24cc 100644
--- a/datafusion/src/physical_plan/csv.rs
+++ b/datafusion/src/physical_plan/csv.rs
@@ -18,8 +18,7 @@
//! Execution plan for reading CSV files
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::ExecutionPlan;
-use crate::physical_plan::{common, Partitioning};
+use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan,
Partitioning};
use arrow::csv;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
@@ -135,6 +134,19 @@ impl std::fmt::Debug for Source {
}
}
+impl std::fmt::Display for Source {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Source::PartitionedFiles { path, filenames } => {
+ write!(f, "Path({}: [{}])", path, filenames.join(","))
+ }
+ Source::Reader(_) => {
+ write!(f, "Reader(...)")
+ }
+ }
+ }
+}
+
impl Clone for Source {
fn clone(&self) -> Self {
match self {
@@ -405,6 +417,22 @@ impl ExecutionPlan for CsvExec {
}
}
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(
+ f,
+ "CsvExec: source={}, has_header={}",
+ self.source, self.has_header
+ )
+ }
+ }
+ }
}
/// Iterator over batches
diff --git a/datafusion/src/physical_plan/display.rs
b/datafusion/src/physical_plan/display.rs
new file mode 100644
index 0000000..bfc3cd9
--- /dev/null
+++ b/datafusion/src/physical_plan/display.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Implementation of physical plan display. See
+//! [`crate::physical_plan::displayable`] for examples of how to
+//! format
+
+use std::fmt;
+
+use super::{accept, ExecutionPlan, ExecutionPlanVisitor};
+
+/// Options for controlling how each [`ExecutionPlan`] should format itself
+#[derive(Debug, Clone, Copy)]
+pub enum DisplayFormatType {
+ /// Default, compact format. Example: `FilterExec: c12 < 10.0`
+ Default,
+}
+
+/// Wraps an `ExecutionPlan` with various ways to display this plan
+pub struct DisplayableExecutionPlan<'a> {
+ inner: &'a dyn ExecutionPlan,
+}
+
+impl<'a> DisplayableExecutionPlan<'a> {
+ /// Create a wrapper around an [`'ExecutionPlan'] which can be
+ /// pretty printed in a variety of ways
+ pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
+ Self { inner }
+ }
+
+ /// Return a `format`able structure that produces a single line
+ /// per node.
+ ///
+ /// ```text
+ /// ProjectionExec: expr=[a]
+ /// CoalesceBatchesExec: target_batch_size=4096
+ /// FilterExec: a < 5
+ /// RepartitionExec: partitioning=RoundRobinBatch(16)
+ /// CsvExec: source=...",
+ /// ```
+ pub fn indent(&self) -> impl fmt::Display + 'a {
+ struct Wrapper<'a>(&'a dyn ExecutionPlan);
+ impl<'a> fmt::Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let t = DisplayFormatType::Default;
+ let mut visitor = IndentVisitor { t, f, indent: 0 };
+ accept(self.0, &mut visitor)
+ }
+ }
+ Wrapper(self.inner)
+ }
+}
+
+/// Formats plans with a single line per node.
+struct IndentVisitor<'a, 'b> {
+ /// How to format each node
+ t: DisplayFormatType,
+ /// Write to this formatter
+ f: &'a mut fmt::Formatter<'b>,
+ ///with_schema: bool,
+ indent: usize,
+}
+
+impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
+ type Error = fmt::Error;
+ fn pre_visit(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ ) -> std::result::Result<bool, Self::Error> {
+ write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
+ plan.fmt_as(self.t, self.f)?;
+ writeln!(self.f)?;
+ self.indent += 1;
+ Ok(true)
+ }
+}
diff --git a/datafusion/src/physical_plan/distinct_expressions.rs
b/datafusion/src/physical_plan/distinct_expressions.rs
index 927f16f..f3513c2 100644
--- a/datafusion/src/physical_plan/distinct_expressions.rs
+++ b/datafusion/src/physical_plan/distinct_expressions.rs
@@ -120,6 +120,10 @@ impl AggregateExpr for DistinctCount {
count_data_type: self.data_type.clone(),
}))
}
+
+ fn name(&self) -> &str {
+ &self.name
+ }
}
#[derive(Debug)]
diff --git a/datafusion/src/physical_plan/empty.rs
b/datafusion/src/physical_plan/empty.rs
index 3011b28..391a695 100644
--- a/datafusion/src/physical_plan/empty.rs
+++ b/datafusion/src/physical_plan/empty.rs
@@ -21,8 +21,9 @@ use std::any::Any;
use std::sync::Arc;
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::memory::MemoryStream;
-use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
+use crate::physical_plan::{
+ memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning,
+};
use arrow::array::NullArray;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
@@ -120,6 +121,18 @@ impl ExecutionPlan for EmptyExec {
None,
)?))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "EmptyExec: produce_one_row={}",
self.produce_one_row)
+ }
+ }
+ }
}
#[cfg(test)]
diff --git a/datafusion/src/physical_plan/explain.rs
b/datafusion/src/physical_plan/explain.rs
index 26d2c94..3c5ef1a 100644
--- a/datafusion/src/physical_plan/explain.rs
+++ b/datafusion/src/physical_plan/explain.rs
@@ -20,15 +20,14 @@
use std::any::Any;
use std::sync::Arc;
-use crate::error::{DataFusionError, Result};
use crate::{
+ error::{DataFusionError, Result},
logical_plan::StringifiedPlan,
- physical_plan::{common::SizedRecordBatchStream, ExecutionPlan},
+ physical_plan::Partitioning,
+ physical_plan::{common::SizedRecordBatchStream, DisplayFormatType,
ExecutionPlan},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef,
record_batch::RecordBatch};
-use crate::physical_plan::Partitioning;
-
use super::SendableRecordBatchStream;
use async_trait::async_trait;
@@ -122,4 +121,16 @@ impl ExecutionPlan for ExplainExec {
vec![Arc::new(record_batch)],
)))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "ExplainExec")
+ }
+ }
+ }
}
diff --git a/datafusion/src/physical_plan/expressions/average.rs
b/datafusion/src/physical_plan/expressions/average.rs
index 3864412..6a63320 100644
--- a/datafusion/src/physical_plan/expressions/average.rs
+++ b/datafusion/src/physical_plan/expressions/average.rs
@@ -109,6 +109,10 @@ impl AggregateExpr for Avg {
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}
+
+ fn name(&self) -> &str {
+ &self.name
+ }
}
/// An accumulator to compute the average
diff --git a/datafusion/src/physical_plan/expressions/count.rs
b/datafusion/src/physical_plan/expressions/count.rs
index 2245981..4a3fbe4 100644
--- a/datafusion/src/physical_plan/expressions/count.rs
+++ b/datafusion/src/physical_plan/expressions/count.rs
@@ -83,6 +83,10 @@ impl AggregateExpr for Count {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(CountAccumulator::new()))
}
+
+ fn name(&self) -> &str {
+ &self.name
+ }
}
#[derive(Debug)]
diff --git a/datafusion/src/physical_plan/expressions/min_max.rs
b/datafusion/src/physical_plan/expressions/min_max.rs
index 5ed1461..ea917d3 100644
--- a/datafusion/src/physical_plan/expressions/min_max.rs
+++ b/datafusion/src/physical_plan/expressions/min_max.rs
@@ -88,6 +88,10 @@ impl AggregateExpr for Max {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?))
}
+
+ fn name(&self) -> &str {
+ &self.name
+ }
}
// Statically-typed version of min/max(array) -> ScalarValue for string types.
@@ -387,6 +391,10 @@ impl AggregateExpr for Min {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MinAccumulator::try_new(&self.data_type)?))
}
+
+ fn name(&self) -> &str {
+ &self.name
+ }
}
#[derive(Debug)]
diff --git a/datafusion/src/physical_plan/expressions/mod.rs
b/datafusion/src/physical_plan/expressions/mod.rs
index 6e25220..4d57c39 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -74,6 +74,19 @@ pub struct PhysicalSortExpr {
pub options: SortOptions,
}
+impl std::fmt::Display for PhysicalSortExpr {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let opts_string = match (self.options.descending,
self.options.nulls_first) {
+ (true, true) => "DESC",
+ (true, false) => "DESC NULLS LAST",
+ (false, true) => "ASC",
+ (false, false) => "ASC NULLS LAST",
+ };
+
+ write!(f, "{} {}", self.expr, opts_string)
+ }
+}
+
impl PhysicalSortExpr {
/// evaluate the sort expression into SortColumn that can be passed into
arrow sort kernel
pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) ->
Result<SortColumn> {
diff --git a/datafusion/src/physical_plan/expressions/sum.rs
b/datafusion/src/physical_plan/expressions/sum.rs
index 6f50894..7bbbf99 100644
--- a/datafusion/src/physical_plan/expressions/sum.rs
+++ b/datafusion/src/physical_plan/expressions/sum.rs
@@ -104,6 +104,10 @@ impl AggregateExpr for Sum {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(SumAccumulator::try_new(&self.data_type)?))
}
+
+ fn name(&self) -> &str {
+ &self.name
+ }
}
#[derive(Debug)]
diff --git a/datafusion/src/physical_plan/filter.rs
b/datafusion/src/physical_plan/filter.rs
index 61af78d..bc2b17a 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -25,7 +25,9 @@ use std::task::{Context, Poll};
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
+use crate::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+};
use arrow::array::BooleanArray;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
@@ -119,6 +121,18 @@ impl ExecutionPlan for FilterExec {
input: self.input.execute(partition).await?,
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "FilterExec: {}", self.predicate)
+ }
+ }
+ }
}
/// The FilterExec streams wraps the input iterator and applies the predicate
expression to
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs
b/datafusion/src/physical_plan/hash_aggregate.rs
index fad4fa5..3059e2f 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -28,8 +28,10 @@ use futures::{
};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Accumulator, AggregateExpr, SQLMetric};
-use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning,
PhysicalExpr};
+use crate::physical_plan::{
+ Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
+ Partitioning, PhysicalExpr, SQLMetric,
+};
use arrow::{
array::{Array, UInt32Builder},
@@ -257,6 +259,39 @@ impl ExecutionPlan for HashAggregateExec {
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
metrics
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "HashAggregateExec: mode={:?}", self.mode)?;
+ let g: Vec<String> = self
+ .group_expr
+ .iter()
+ .map(|(e, alias)| {
+ let e = e.to_string();
+ if &e != alias {
+ format!("{} as {}", e, alias)
+ } else {
+ e
+ }
+ })
+ .collect();
+ write!(f, ", gby=[{}]", g.join(", "))?;
+
+ let a: Vec<String> = self
+ .aggr_expr
+ .iter()
+ .map(|agg| agg.name().to_string())
+ .collect();
+ write!(f, ", aggr=[{}]", a.join(", "))?;
+ }
+ }
+ Ok(())
+ }
}
/*
diff --git a/datafusion/src/physical_plan/hash_join.rs
b/datafusion/src/physical_plan/hash_join.rs
index 2682623..0bf5a28 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -58,7 +58,10 @@ use super::{
};
use crate::error::{DataFusionError, Result};
-use super::{ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream};
+use super::{
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream,
+};
use crate::physical_plan::coalesce_batches::concat_batches;
use log::debug;
@@ -393,6 +396,22 @@ impl ExecutionPlan for HashJoinExec {
is_exhausted: false,
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(
+ f,
+ "HashJoinExec: mode={:?}, join_type={:?}, on={:?}",
+ self.mode, self.join_type, self.on
+ )
+ }
+ }
+ }
}
/// Updates `hash` with new entries from [RecordBatch] evaluated against the
expressions `on`,
diff --git a/datafusion/src/physical_plan/limit.rs
b/datafusion/src/physical_plan/limit.rs
index c091196..c56dbe1 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -26,7 +26,9 @@ use futures::stream::Stream;
use futures::stream::StreamExt;
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
+use crate::physical_plan::{
+ DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+};
use arrow::array::ArrayRef;
use arrow::compute::limit;
use arrow::datatypes::SchemaRef;
@@ -121,6 +123,18 @@ impl ExecutionPlan for GlobalLimitExec {
let stream = self.input.execute(0).await?;
Ok(Box::pin(LimitStream::new(stream, self.limit)))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "GlobalLimitExec: limit={}", self.limit)
+ }
+ }
+ }
}
/// LocalLimitExec applies a limit to a single partition
@@ -187,6 +201,18 @@ impl ExecutionPlan for LocalLimitExec {
let stream = self.input.execute(partition).await?;
Ok(Box::pin(LimitStream::new(stream, self.limit)))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "LocalLimitExec: limit={}", self.limit)
+ }
+ }
+ }
}
/// Truncate a RecordBatch to maximum of n rows
diff --git a/datafusion/src/physical_plan/memory.rs
b/datafusion/src/physical_plan/memory.rs
index 9022077..85d8aee 100644
--- a/datafusion/src/physical_plan/memory.rs
+++ b/datafusion/src/physical_plan/memory.rs
@@ -22,7 +22,10 @@ use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
-use super::{ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream};
+use super::{
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ SendableRecordBatchStream,
+};
use crate::error::{DataFusionError, Result};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
@@ -88,6 +91,25 @@ impl ExecutionPlan for MemoryExec {
self.projection.clone(),
)?))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ let partitions: Vec<_> =
+ self.partitions.iter().map(|b| b.len()).collect();
+ write!(
+ f,
+ "MemoryExec: partitions={}, partition_sizes={:?}",
+ partitions.len(),
+ partitions
+ )
+ }
+ }
+ }
}
impl MemoryExec {
diff --git a/datafusion/src/physical_plan/merge.rs
b/datafusion/src/physical_plan/merge.rs
index c66532b..c65227c 100644
--- a/datafusion/src/physical_plan/merge.rs
+++ b/datafusion/src/physical_plan/merge.rs
@@ -36,8 +36,7 @@ use arrow::{
use super::RecordBatchStream;
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::ExecutionPlan;
-use crate::physical_plan::Partitioning;
+use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use super::SendableRecordBatchStream;
use pin_project_lite::pin_project;
@@ -151,6 +150,18 @@ impl ExecutionPlan for MergeExec {
}
}
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "MergeExec")
+ }
+ }
+ }
}
pin_project! {
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index a8f6f0c..6ab9570 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,7 +17,7 @@
//! Traits for physical query plan, supporting parallel execution for
partitioned relations.
-use std::fmt::{Debug, Display};
+use std::fmt::{self, Debug, Display};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{any::Any, pin::Pin};
@@ -31,9 +31,10 @@ use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
+pub use display::DisplayFormatType;
use futures::stream::Stream;
-use self::merge::MergeExec;
+use self::{display::DisplayableExecutionPlan, merge::MergeExec};
use hashbrown::HashMap;
/// Trait for types that stream [arrow::record_batch::RecordBatch]
@@ -120,7 +121,16 @@ pub trait PhysicalPlanner {
) -> Result<Arc<dyn ExecutionPlan>>;
}
-/// Partition-aware execution plan for a relation
+/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
+///
+/// Each `ExecutionPlan` is Partition-aware and is responsible for
+/// creating the actual `async` [`SendableRecordBatchStream`]s
+/// of [`RecordBatch`] that incrementally compute the operator's
+/// output from its input partition.
+///
+/// [`ExecutionPlan`] can be displayed in an simplified form using the
+/// return value from [`displayable`] in addition to the (normally
+/// quite verbose) `Debug` output.
#[async_trait]
pub trait ExecutionPlan: Debug + Send + Sync {
/// Returns the execution plan as [`Any`](std::any::Any) so that it can be
@@ -152,6 +162,137 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn metrics(&self) -> HashMap<String, SQLMetric> {
HashMap::new()
}
+
+ /// Format this `ExecutionPlan` to `f` in the specified type.
+ ///
+ /// Should not include a newline
+ ///
+ /// Note this function prints a placeholder by default to preserve
+ /// backwards compatibility.
+ fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
+ write!(f, "ExecutionPlan(PlaceHolder)")
+ }
+}
+
+/// Return a [wrapper](DisplayableExecutionPlan) around an
+/// [`ExecutionPlan`] which can be displayed in various easier to
+/// understand ways.
+///
+/// ```
+/// use datafusion::prelude::*;
+/// use datafusion::physical_plan::displayable;
+///
+/// // Hard code concurrency as it appears in the RepartitionExec output
+/// let config = ExecutionConfig::new()
+/// .with_concurrency(3);
+/// let mut ctx = ExecutionContext::with_config(config);
+///
+/// // register the a table
+/// ctx.register_csv("example", "tests/example.csv",
CsvReadOptions::new()).unwrap();
+///
+/// // create a plan to run a SQL query
+/// let plan = ctx
+/// .create_logical_plan("SELECT a FROM example WHERE a < 5")
+/// .unwrap();
+/// let plan = ctx.optimize(&plan).unwrap();
+/// let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+///
+/// // Format using display string
+/// let displayable_plan = displayable(physical_plan.as_ref());
+/// let plan_string = format!("{}", displayable_plan.indent());
+///
+/// assert_eq!("ProjectionExec: expr=[a]\
+/// \n CoalesceBatchesExec: target_batch_size=4096\
+/// \n FilterExec: a < 5\
+/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
+/// \n CsvExec: source=Path(tests/example.csv:
[tests/example.csv]), has_header=true",
+/// plan_string.trim());
+/// ```
+///
+pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
+ DisplayableExecutionPlan::new(plan)
+}
+
+/// Visit all children of this plan, according to the order defined on
`ExecutionPlanVisitor`.
+// Note that this would be really nice if it were a method on
+// ExecutionPlan, but it can not be because it takes a generic
+// parameter and `ExecutionPlan` is a trait
+pub fn accept<V: ExecutionPlanVisitor>(
+ plan: &dyn ExecutionPlan,
+ visitor: &mut V,
+) -> std::result::Result<(), V::Error> {
+ visitor.pre_visit(plan)?;
+ for child in plan.children() {
+ visit_execution_plan(child.as_ref(), visitor)?;
+ }
+ visitor.post_visit(plan)?;
+ Ok(())
+}
+
+/// Trait that implements the [Visitor
+/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a
+/// depth first walk of `ExecutionPlan` nodes. `pre_visit` is called
+/// before any children are visited, and then `post_visit` is called
+/// after all children have been visited.
+////
+/// To use, define a struct that implements this trait and then invoke
+/// ['accept'].
+///
+/// For example, for an execution plan that looks like:
+///
+/// ```text
+/// ProjectionExec: #id
+/// FilterExec: state = CO
+/// CsvExec:
+/// ```
+///
+/// The sequence of visit operations would be:
+/// ```text
+/// visitor.pre_visit(ProjectionExec)
+/// visitor.pre_visit(FilterExec)
+/// visitor.pre_visit(CsvExec)
+/// visitor.post_visit(CsvExec)
+/// visitor.post_visit(FilterExec)
+/// visitor.post_visit(ProjectionExec)
+/// ```
+pub trait ExecutionPlanVisitor {
+ /// The type of error returned by this visitor
+ type Error;
+
+ /// Invoked on an `ExecutionPlan` plan before any of its child
+ /// inputs have been visited. If Ok(true) is returned, the
+ /// recursion continues. If Err(..) or Ok(false) are returned, the
+ /// recursion stops immediately and the error, if any, is returned
+ /// to `accept`
+ fn pre_visit(
+ &mut self,
+ plan: &dyn ExecutionPlan,
+ ) -> std::result::Result<bool, Self::Error>;
+
+ /// Invoked on an `ExecutionPlan` plan *after* all of its child
+ /// inputs have been visited. The return value is handled the same
+ /// as the return value of `pre_visit`. The provided default
+ /// implementation returns `Ok(true)`.
+ fn post_visit(
+ &mut self,
+ _plan: &dyn ExecutionPlan,
+ ) -> std::result::Result<bool, Self::Error> {
+ Ok(true)
+ }
+}
+
+/// Recursively calls `pre_visit` and `post_visit` for this node and
+/// all of its children, as described on [`ExecutionPlanVisitor`]
+pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
+ plan: &dyn ExecutionPlan,
+ visitor: &mut V,
+) -> std::result::Result<(), V::Error> {
+ visitor.pre_visit(plan)?;
+ for child in plan.children() {
+ visit_execution_plan(child.as_ref(), visitor)?;
+ }
+ visitor.post_visit(plan)?;
+ Ok(())
}
/// Execute the [ExecutionPlan] and collect the results in memory
@@ -290,6 +431,12 @@ pub trait AggregateExpr: Send + Sync + Debug {
/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others
(e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+ /// Human readable name such as `"MIN(c2)"`. The default
+ /// implementation returns placeholder text.
+ fn name(&self) -> &str {
+ "AggregateExpr: default name"
+ }
}
/// An accumulator represents a stateful object that lives throughout the
evaluation of multiple rows and
@@ -351,6 +498,7 @@ pub mod cross_join;
pub mod crypto_expressions;
pub mod csv;
pub mod datetime_expressions;
+pub mod display;
pub mod distinct_expressions;
pub mod empty;
pub mod explain;
diff --git a/datafusion/src/physical_plan/parquet.rs
b/datafusion/src/physical_plan/parquet.rs
index dee0fc8..dd5e77b 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -27,7 +27,7 @@ use super::{
planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr,
RecordBatchStream,
SendableRecordBatchStream,
};
-use crate::physical_plan::{common, ExecutionPlan, Partitioning};
+use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan,
Partitioning};
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
@@ -864,6 +864,32 @@ impl ExecutionPlan for ParquetExec {
inner: ReceiverStream::new(response_rx),
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ let files: Vec<_> = self
+ .partitions
+ .iter()
+ .map(|pp| pp.filenames.iter())
+ .flatten()
+ .map(|s| s.as_str())
+ .collect();
+
+ write!(
+ f,
+ "ParquetExec: batch_size={}, limit={:?}, partitions=[{}]",
+ self.batch_size,
+ self.limit,
+ files.join(", ")
+ )
+ }
+ }
+ }
}
fn send_result(
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index 664e4dc..d11e8e9 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,7 +23,6 @@ use super::{
aggregates, cross_join::CrossJoinExec, empty::EmptyExec,
expressions::binary,
functions, hash_join::PartitionMode, udaf, union::UnionExec,
};
-use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
DFSchema, Expr, LogicalPlan, Operator, Partitioning as
LogicalPartitioning, PlanType,
@@ -45,6 +44,10 @@ use crate::physical_plan::{AggregateExpr, ExecutionPlan,
PhysicalExpr, PhysicalP
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::variable::VarType;
+use crate::{
+ error::{DataFusionError, Result},
+ physical_plan::displayable,
+};
use arrow::compute::can_cast_types;
use arrow::compute::SortOptions;
@@ -383,7 +386,7 @@ impl DefaultPhysicalPlanner {
if *verbose {
stringified_plans.push(StringifiedPlan::new(
PlanType::PhysicalPlan,
- format!("{:#?}", input),
+ displayable(input.as_ref()).indent().to_string(),
));
}
Ok(Arc::new(ExplainExec::new(
diff --git a/datafusion/src/physical_plan/projection.rs
b/datafusion/src/physical_plan/projection.rs
index a881beb..c0d78ff 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -26,7 +26,9 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
+use crate::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+};
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
@@ -130,6 +132,31 @@ impl ExecutionPlan for ProjectionExec {
input: self.input.execute(partition).await?,
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ let expr: Vec<String> = self
+ .expr
+ .iter()
+ .map(|(e, alias)| {
+ let e = e.to_string();
+ if &e != alias {
+ format!("{} as {}", e, alias)
+ } else {
+ e
+ }
+ })
+ .collect();
+
+ write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
+ }
+ }
+ }
}
fn batch_project(
diff --git a/datafusion/src/physical_plan/repartition.rs
b/datafusion/src/physical_plan/repartition.rs
index 7243550..2599690 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
use std::{any::Any, collections::HashMap, vec};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{ExecutionPlan, Partitioning};
+use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::record_batch::RecordBatch;
use arrow::{array::Array, error::Result as ArrowResult};
use arrow::{compute::take, datatypes::SchemaRef};
@@ -235,6 +235,18 @@ impl ExecutionPlan for RepartitionExec {
input:
UnboundedReceiverStream::new(channels.remove(&partition).unwrap().1),
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "RepartitionExec: partitioning={:?}",
self.partitioning)
+ }
+ }
+ }
}
impl RepartitionExec {
diff --git a/datafusion/src/physical_plan/sort.rs
b/datafusion/src/physical_plan/sort.rs
index 010e406..8229060 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -41,7 +41,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
- common, Distribution, ExecutionPlan, Partitioning, SQLMetric,
+ common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
SQLMetric,
};
/// Sort execution plan
@@ -145,6 +145,19 @@ impl ExecutionPlan for SortExec {
)))
}
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ let expr: Vec<String> = self.expr.iter().map(|e|
e.to_string()).collect();
+ write!(f, "SortExec: [{}]", expr.join(","))
+ }
+ }
+ }
+
fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
diff --git a/datafusion/src/physical_plan/udaf.rs
b/datafusion/src/physical_plan/udaf.rs
index 3dc6aa4..f7515d3 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -165,4 +165,8 @@ impl AggregateExpr for AggregateFunctionExpr {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
(self.fun.accumulator)()
}
+
+ fn name(&self) -> &str {
+ &self.name
+ }
}
diff --git a/datafusion/tests/custom_sources.rs
b/datafusion/tests/custom_sources.rs
index a00dd6a..b39f47b 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -20,11 +20,14 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
-use datafusion::error::{DataFusionError, Result};
use datafusion::{
datasource::{datasource::Statistics, TableProvider},
physical_plan::collect,
};
+use datafusion::{
+ error::{DataFusionError, Result},
+ physical_plan::DisplayFormatType,
+};
use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::{col, Expr, LogicalPlan, LogicalPlanBuilder};
@@ -128,6 +131,18 @@ impl ExecutionPlan for CustomExecutionPlan {
async fn execute(&self, _partition: usize) ->
Result<SendableRecordBatchStream> {
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "CustomExecutionPlan: projection={:#?}",
self.projection)
+ }
+ }
+ }
}
impl TableProvider for CustomTableProvider {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index c80ffe4..0b9cc2a 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -31,9 +31,8 @@ use arrow::{
util::display::array_value_to_string,
};
-use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::LogicalPlan;
-use datafusion::prelude::create_udf;
+use datafusion::prelude::*;
use datafusion::{
datasource::{csv::CsvReadOptions, MemTable},
physical_plan::collect,
@@ -42,6 +41,7 @@ use datafusion::{
error::{DataFusionError, Result},
physical_plan::ColumnarValue,
};
+use datafusion::{execution::context::ExecutionContext,
physical_plan::displayable};
#[tokio::test]
async fn nyc() -> Result<()> {
@@ -2932,3 +2932,47 @@ async fn test_cast_expressions_error() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_physical_plan_display_indent() {
+ // Hard code concurrency as it appears in the RepartitionExec output
+ let config = ExecutionConfig::new().with_concurrency(3);
+ let mut ctx = ExecutionContext::with_config(config);
+ register_aggregate_csv(&mut ctx).unwrap();
+ let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
+ FROM aggregate_test_100 \
+ WHERE c12 < 10 \
+ GROUP BY c1 \
+ ORDER BY the_min DESC \
+ LIMIT 10";
+ let plan = ctx.create_logical_plan(&sql).unwrap();
+ let plan = ctx.optimize(&plan).unwrap();
+
+ let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+ let expected = vec![
+ "GlobalLimitExec: limit=10",
+ " SortExec: [the_min DESC]",
+ " ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]",
+ " HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]",
+ " MergeExec",
+ " HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12),
MIN(c12)]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " FilterExec: c12 < CAST(10 AS Float64)",
+ " RepartitionExec: partitioning=RoundRobinBatch(3)",
+ " CsvExec:
source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv:
[ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
+ ];
+
+ let data_path = arrow::util::test_util::arrow_test_data();
+ let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
+ .trim()
+ .lines()
+ // normalize paths
+ .map(|s| s.replace(&data_path, "ARROW_TEST_DATA"))
+ .collect::<Vec<_>>();
+
+ assert_eq!(
+ expected, actual,
+ "expected:\n{:#?}\nactual:\n\n{:#?}\n",
+ expected, actual
+ );
+}
diff --git a/datafusion/tests/user_defined_plan.rs
b/datafusion/tests/user_defined_plan.rs
index 5e38c57..8914c05 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -75,8 +75,8 @@ use datafusion::{
optimizer::{optimizer::OptimizerRule, utils::optimize_children},
physical_plan::{
planner::{DefaultPhysicalPlanner, ExtensionPlanner},
- Distribution, ExecutionPlan, Partitioning, PhysicalPlanner,
RecordBatchStream,
- SendableRecordBatchStream,
+ DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
PhysicalPlanner,
+ RecordBatchStream, SendableRecordBatchStream,
},
prelude::{ExecutionConfig, ExecutionContext},
};
@@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> {
let mut ctx = setup_table(make_topk_context()).await?;
let expected = vec![
- "| logical_plan after topk | TopK: k=3
|",
- "| | Projection:
#customer_id, #revenue |",
- "| | TableScan: sales
projection=Some([0, 1]) |",
+ "| logical_plan after topk | TopK: k=3
|",
+ "| | Projection:
#customer_id, #revenue |",
+ "| | TableScan: sales
projection=Some([0, 1]) |",
].join("\n");
let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);
@@ -397,6 +397,18 @@ impl ExecutionPlan for TopKExec {
state: BTreeMap::new(),
}))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "TopKExec: k={}", self.k)
+ }
+ }
+ }
}
// A very specialized TopK implementation