This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cf32cf  Implement readable explain plans for physical plans (#337)
9cf32cf is described below

commit 9cf32cf2cda8472b87130142c4eee1126d4d9cbe
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri May 14 15:01:15 2021 -0400

    Implement readable explain plans for physical plans (#337)
    
    * Implement readable explain plans for physical plans
    
    * Add apache copyright to display.rs
    
    * Set concurrency explictly in test and make it windows friendly
    
    * fix doc example test
    
    * fmt!
---
 datafusion/src/logical_plan/display.rs             |  13 +-
 datafusion/src/logical_plan/plan.rs                |   4 +-
 datafusion/src/physical_plan/coalesce_batches.rs   |  19 ++-
 datafusion/src/physical_plan/cross_join.rs         |  19 ++-
 datafusion/src/physical_plan/csv.rs                |  32 ++++-
 datafusion/src/physical_plan/display.rs            |  90 ++++++++++++
 .../src/physical_plan/distinct_expressions.rs      |   4 +
 datafusion/src/physical_plan/empty.rs              |  17 ++-
 datafusion/src/physical_plan/explain.rs            |  19 ++-
 .../src/physical_plan/expressions/average.rs       |   4 +
 datafusion/src/physical_plan/expressions/count.rs  |   4 +
 .../src/physical_plan/expressions/min_max.rs       |   8 ++
 datafusion/src/physical_plan/expressions/mod.rs    |  13 ++
 datafusion/src/physical_plan/expressions/sum.rs    |   4 +
 datafusion/src/physical_plan/filter.rs             |  16 ++-
 datafusion/src/physical_plan/hash_aggregate.rs     |  39 +++++-
 datafusion/src/physical_plan/hash_join.rs          |  21 ++-
 datafusion/src/physical_plan/limit.rs              |  28 +++-
 datafusion/src/physical_plan/memory.rs             |  24 +++-
 datafusion/src/physical_plan/merge.rs              |  15 +-
 datafusion/src/physical_plan/mod.rs                | 154 ++++++++++++++++++++-
 datafusion/src/physical_plan/parquet.rs            |  28 +++-
 datafusion/src/physical_plan/planner.rs            |   7 +-
 datafusion/src/physical_plan/projection.rs         |  29 +++-
 datafusion/src/physical_plan/repartition.rs        |  14 +-
 datafusion/src/physical_plan/sort.rs               |  15 +-
 datafusion/src/physical_plan/udaf.rs               |   4 +
 datafusion/tests/custom_sources.rs                 |  17 ++-
 datafusion/tests/sql.rs                            |  48 ++++++-
 datafusion/tests/user_defined_plan.rs              |  22 ++-
 30 files changed, 683 insertions(+), 48 deletions(-)

diff --git a/datafusion/src/logical_plan/display.rs 
b/datafusion/src/logical_plan/display.rs
index 76749b5..f285534 100644
--- a/datafusion/src/logical_plan/display.rs
+++ b/datafusion/src/logical_plan/display.rs
@@ -29,7 +29,8 @@ pub struct IndentVisitor<'a, 'b> {
     f: &'a mut fmt::Formatter<'b>,
     /// If true, includes summarized schema information
     with_schema: bool,
-    indent: u32,
+    /// The current indent
+    indent: usize,
 }
 
 impl<'a, 'b> IndentVisitor<'a, 'b> {
@@ -42,13 +43,6 @@ impl<'a, 'b> IndentVisitor<'a, 'b> {
             indent: 0,
         }
     }
-
-    fn write_indent(&mut self) -> fmt::Result {
-        for _ in 0..self.indent {
-            write!(self.f, "  ")?;
-        }
-        Ok(())
-    }
 }
 
 impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
@@ -58,8 +52,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
         if self.indent > 0 {
             writeln!(self.f)?;
         }
-        self.write_indent()?;
-
+        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
         write!(self.f, "{}", plan.display())?;
         if self.with_schema {
             write!(
diff --git a/datafusion/src/logical_plan/plan.rs 
b/datafusion/src/logical_plan/plan.rs
index 13509d1..8b9aac9 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -356,13 +356,15 @@ pub enum Partitioning {
 /// after all children have been visited.
 ////
 /// To use, define a struct that implements this trait and then invoke
-/// "LogicalPlan::accept".
+/// [`LogicalPlan::accept`].
 ///
 /// For example, for a logical plan like:
 ///
+/// ```text
 /// Projection: #id
 ///    Filter: #state Eq Utf8(\"CO\")\
 ///       CsvScan: employee.csv projection=Some([0, 3])";
+/// ```
 ///
 /// The sequence of visit operations would be:
 /// ```text
diff --git a/datafusion/src/physical_plan/coalesce_batches.rs 
b/datafusion/src/physical_plan/coalesce_batches.rs
index b91e0b6..e25412d 100644
--- a/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/src/physical_plan/coalesce_batches.rs
@@ -25,7 +25,8 @@ use std::task::{Context, Poll};
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
-    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
 };
 
 use arrow::compute::kernels::concat::concat;
@@ -114,6 +115,22 @@ impl ExecutionPlan for CoalesceBatchesExec {
             is_closed: false,
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "CoalesceBatchesExec: target_batch_size={}",
+                    self.target_batch_size
+                )
+            }
+        }
+    }
 }
 
 struct CoalesceBatchesStream {
diff --git a/datafusion/src/physical_plan/cross_join.rs 
b/datafusion/src/physical_plan/cross_join.rs
index 4372352..f6f5da4 100644
--- a/datafusion/src/physical_plan/cross_join.rs
+++ b/datafusion/src/physical_plan/cross_join.rs
@@ -21,7 +21,6 @@
 use futures::{lock::Mutex, StreamExt};
 use std::{any::Any, sync::Arc, task::Poll};
 
-use crate::physical_plan::memory::MemoryStream;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
@@ -36,8 +35,10 @@ use crate::{
 use async_trait::async_trait;
 use std::time::Instant;
 
-use super::{ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream};
-use crate::physical_plan::coalesce_batches::concat_batches;
+use super::{
+    coalesce_batches::concat_batches, memory::MemoryStream, DisplayFormatType,
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
 use log::debug;
 
 /// Data of the left side
@@ -192,6 +193,18 @@ impl ExecutionPlan for CrossJoinExec {
             join_time: 0,
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "CrossJoinExec")
+            }
+        }
+    }
 }
 
 /// A stream that issues [RecordBatch]es as they arrive from the right  of the 
join.
diff --git a/datafusion/src/physical_plan/csv.rs 
b/datafusion/src/physical_plan/csv.rs
index 9ab8177..96b24cc 100644
--- a/datafusion/src/physical_plan/csv.rs
+++ b/datafusion/src/physical_plan/csv.rs
@@ -18,8 +18,7 @@
 //! Execution plan for reading CSV files
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::ExecutionPlan;
-use crate::physical_plan::{common, Partitioning};
+use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, 
Partitioning};
 use arrow::csv;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
@@ -135,6 +134,19 @@ impl std::fmt::Debug for Source {
     }
 }
 
+impl std::fmt::Display for Source {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Source::PartitionedFiles { path, filenames } => {
+                write!(f, "Path({}: [{}])", path, filenames.join(","))
+            }
+            Source::Reader(_) => {
+                write!(f, "Reader(...)")
+            }
+        }
+    }
+}
+
 impl Clone for Source {
     fn clone(&self) -> Self {
         match self {
@@ -405,6 +417,22 @@ impl ExecutionPlan for CsvExec {
             }
         }
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "CsvExec: source={}, has_header={}",
+                    self.source, self.has_header
+                )
+            }
+        }
+    }
 }
 
 /// Iterator over batches
diff --git a/datafusion/src/physical_plan/display.rs 
b/datafusion/src/physical_plan/display.rs
new file mode 100644
index 0000000..bfc3cd9
--- /dev/null
+++ b/datafusion/src/physical_plan/display.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Implementation of physical plan display. See
+//! [`crate::physical_plan::displayable`] for examples of how to
+//! format
+
+use std::fmt;
+
+use super::{accept, ExecutionPlan, ExecutionPlanVisitor};
+
+/// Options for controlling how each [`ExecutionPlan`] should format itself
+#[derive(Debug, Clone, Copy)]
+pub enum DisplayFormatType {
+    /// Default, compact format. Example: `FilterExec: c12 < 10.0`
+    Default,
+}
+
+/// Wraps an `ExecutionPlan` with various ways to display this plan
+pub struct DisplayableExecutionPlan<'a> {
+    inner: &'a dyn ExecutionPlan,
+}
+
+impl<'a> DisplayableExecutionPlan<'a> {
+    /// Create a wrapper around an [`'ExecutionPlan'] which can be
+    /// pretty printed in a variety of ways
+    pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
+        Self { inner }
+    }
+
+    /// Return a `format`able structure that produces a single line
+    /// per node.
+    ///
+    /// ```text
+    /// ProjectionExec: expr=[a]
+    ///   CoalesceBatchesExec: target_batch_size=4096
+    ///     FilterExec: a < 5
+    ///       RepartitionExec: partitioning=RoundRobinBatch(16)
+    ///         CsvExec: source=...",
+    /// ```
+    pub fn indent(&self) -> impl fmt::Display + 'a {
+        struct Wrapper<'a>(&'a dyn ExecutionPlan);
+        impl<'a> fmt::Display for Wrapper<'a> {
+            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+                let t = DisplayFormatType::Default;
+                let mut visitor = IndentVisitor { t, f, indent: 0 };
+                accept(self.0, &mut visitor)
+            }
+        }
+        Wrapper(self.inner)
+    }
+}
+
+/// Formats plans with a single line per node.
+struct IndentVisitor<'a, 'b> {
+    /// How to format each node
+    t: DisplayFormatType,
+    /// Write to this formatter
+    f: &'a mut fmt::Formatter<'b>,
+    ///with_schema: bool,
+    indent: usize,
+}
+
+impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
+    type Error = fmt::Error;
+    fn pre_visit(
+        &mut self,
+        plan: &dyn ExecutionPlan,
+    ) -> std::result::Result<bool, Self::Error> {
+        write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
+        plan.fmt_as(self.t, self.f)?;
+        writeln!(self.f)?;
+        self.indent += 1;
+        Ok(true)
+    }
+}
diff --git a/datafusion/src/physical_plan/distinct_expressions.rs 
b/datafusion/src/physical_plan/distinct_expressions.rs
index 927f16f..f3513c2 100644
--- a/datafusion/src/physical_plan/distinct_expressions.rs
+++ b/datafusion/src/physical_plan/distinct_expressions.rs
@@ -120,6 +120,10 @@ impl AggregateExpr for DistinctCount {
             count_data_type: self.data_type.clone(),
         }))
     }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/src/physical_plan/empty.rs 
b/datafusion/src/physical_plan/empty.rs
index 3011b28..391a695 100644
--- a/datafusion/src/physical_plan/empty.rs
+++ b/datafusion/src/physical_plan/empty.rs
@@ -21,8 +21,9 @@ use std::any::Any;
 use std::sync::Arc;
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::memory::MemoryStream;
-use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
+use crate::physical_plan::{
+    memory::MemoryStream, DisplayFormatType, Distribution, ExecutionPlan, 
Partitioning,
+};
 use arrow::array::NullArray;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
@@ -120,6 +121,18 @@ impl ExecutionPlan for EmptyExec {
             None,
         )?))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "EmptyExec: produce_one_row={}", 
self.produce_one_row)
+            }
+        }
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/src/physical_plan/explain.rs 
b/datafusion/src/physical_plan/explain.rs
index 26d2c94..3c5ef1a 100644
--- a/datafusion/src/physical_plan/explain.rs
+++ b/datafusion/src/physical_plan/explain.rs
@@ -20,15 +20,14 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use crate::error::{DataFusionError, Result};
 use crate::{
+    error::{DataFusionError, Result},
     logical_plan::StringifiedPlan,
-    physical_plan::{common::SizedRecordBatchStream, ExecutionPlan},
+    physical_plan::Partitioning,
+    physical_plan::{common::SizedRecordBatchStream, DisplayFormatType, 
ExecutionPlan},
 };
 use arrow::{array::StringBuilder, datatypes::SchemaRef, 
record_batch::RecordBatch};
 
-use crate::physical_plan::Partitioning;
-
 use super::SendableRecordBatchStream;
 use async_trait::async_trait;
 
@@ -122,4 +121,16 @@ impl ExecutionPlan for ExplainExec {
             vec![Arc::new(record_batch)],
         )))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "ExplainExec")
+            }
+        }
+    }
 }
diff --git a/datafusion/src/physical_plan/expressions/average.rs 
b/datafusion/src/physical_plan/expressions/average.rs
index 3864412..6a63320 100644
--- a/datafusion/src/physical_plan/expressions/average.rs
+++ b/datafusion/src/physical_plan/expressions/average.rs
@@ -109,6 +109,10 @@ impl AggregateExpr for Avg {
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
         vec![self.expr.clone()]
     }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
 }
 
 /// An accumulator to compute the average
diff --git a/datafusion/src/physical_plan/expressions/count.rs 
b/datafusion/src/physical_plan/expressions/count.rs
index 2245981..4a3fbe4 100644
--- a/datafusion/src/physical_plan/expressions/count.rs
+++ b/datafusion/src/physical_plan/expressions/count.rs
@@ -83,6 +83,10 @@ impl AggregateExpr for Count {
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(CountAccumulator::new()))
     }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/src/physical_plan/expressions/min_max.rs 
b/datafusion/src/physical_plan/expressions/min_max.rs
index 5ed1461..ea917d3 100644
--- a/datafusion/src/physical_plan/expressions/min_max.rs
+++ b/datafusion/src/physical_plan/expressions/min_max.rs
@@ -88,6 +88,10 @@ impl AggregateExpr for Max {
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?))
     }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
 }
 
 // Statically-typed version of min/max(array) -> ScalarValue for string types.
@@ -387,6 +391,10 @@ impl AggregateExpr for Min {
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(MinAccumulator::try_new(&self.data_type)?))
     }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/src/physical_plan/expressions/mod.rs 
b/datafusion/src/physical_plan/expressions/mod.rs
index 6e25220..4d57c39 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -74,6 +74,19 @@ pub struct PhysicalSortExpr {
     pub options: SortOptions,
 }
 
+impl std::fmt::Display for PhysicalSortExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let opts_string = match (self.options.descending, 
self.options.nulls_first) {
+            (true, true) => "DESC",
+            (true, false) => "DESC NULLS LAST",
+            (false, true) => "ASC",
+            (false, false) => "ASC NULLS LAST",
+        };
+
+        write!(f, "{} {}", self.expr, opts_string)
+    }
+}
+
 impl PhysicalSortExpr {
     /// evaluate the sort expression into SortColumn that can be passed into 
arrow sort kernel
     pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> 
Result<SortColumn> {
diff --git a/datafusion/src/physical_plan/expressions/sum.rs 
b/datafusion/src/physical_plan/expressions/sum.rs
index 6f50894..7bbbf99 100644
--- a/datafusion/src/physical_plan/expressions/sum.rs
+++ b/datafusion/src/physical_plan/expressions/sum.rs
@@ -104,6 +104,10 @@ impl AggregateExpr for Sum {
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(SumAccumulator::try_new(&self.data_type)?))
     }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/src/physical_plan/filter.rs 
b/datafusion/src/physical_plan/filter.rs
index 61af78d..bc2b17a 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -25,7 +25,9 @@ use std::task::{Context, Poll};
 
 use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
+use crate::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+};
 use arrow::array::BooleanArray;
 use arrow::compute::filter_record_batch;
 use arrow::datatypes::{DataType, SchemaRef};
@@ -119,6 +121,18 @@ impl ExecutionPlan for FilterExec {
             input: self.input.execute(partition).await?,
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "FilterExec: {}", self.predicate)
+            }
+        }
+    }
 }
 
 /// The FilterExec streams wraps the input iterator and applies the predicate 
expression to
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs 
b/datafusion/src/physical_plan/hash_aggregate.rs
index fad4fa5..3059e2f 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -28,8 +28,10 @@ use futures::{
 };
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Accumulator, AggregateExpr, SQLMetric};
-use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, 
PhysicalExpr};
+use crate::physical_plan::{
+    Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
+    Partitioning, PhysicalExpr, SQLMetric,
+};
 
 use arrow::{
     array::{Array, UInt32Builder},
@@ -257,6 +259,39 @@ impl ExecutionPlan for HashAggregateExec {
         metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
         metrics
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "HashAggregateExec: mode={:?}", self.mode)?;
+                let g: Vec<String> = self
+                    .group_expr
+                    .iter()
+                    .map(|(e, alias)| {
+                        let e = e.to_string();
+                        if &e != alias {
+                            format!("{} as {}", e, alias)
+                        } else {
+                            e
+                        }
+                    })
+                    .collect();
+                write!(f, ", gby=[{}]", g.join(", "))?;
+
+                let a: Vec<String> = self
+                    .aggr_expr
+                    .iter()
+                    .map(|agg| agg.name().to_string())
+                    .collect();
+                write!(f, ", aggr=[{}]", a.join(", "))?;
+            }
+        }
+        Ok(())
+    }
 }
 
 /*
diff --git a/datafusion/src/physical_plan/hash_join.rs 
b/datafusion/src/physical_plan/hash_join.rs
index 2682623..0bf5a28 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -58,7 +58,10 @@ use super::{
 };
 use crate::error::{DataFusionError, Result};
 
-use super::{ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream};
+use super::{
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
 use crate::physical_plan::coalesce_batches::concat_batches;
 use log::debug;
 
@@ -393,6 +396,22 @@ impl ExecutionPlan for HashJoinExec {
             is_exhausted: false,
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "HashJoinExec: mode={:?}, join_type={:?}, on={:?}",
+                    self.mode, self.join_type, self.on
+                )
+            }
+        }
+    }
 }
 
 /// Updates `hash` with new entries from [RecordBatch] evaluated against the 
expressions `on`,
diff --git a/datafusion/src/physical_plan/limit.rs 
b/datafusion/src/physical_plan/limit.rs
index c091196..c56dbe1 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -26,7 +26,9 @@ use futures::stream::Stream;
 use futures::stream::StreamExt;
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+};
 use arrow::array::ArrayRef;
 use arrow::compute::limit;
 use arrow::datatypes::SchemaRef;
@@ -121,6 +123,18 @@ impl ExecutionPlan for GlobalLimitExec {
         let stream = self.input.execute(0).await?;
         Ok(Box::pin(LimitStream::new(stream, self.limit)))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "GlobalLimitExec: limit={}", self.limit)
+            }
+        }
+    }
 }
 
 /// LocalLimitExec applies a limit to a single partition
@@ -187,6 +201,18 @@ impl ExecutionPlan for LocalLimitExec {
         let stream = self.input.execute(partition).await?;
         Ok(Box::pin(LimitStream::new(stream, self.limit)))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "LocalLimitExec: limit={}", self.limit)
+            }
+        }
+    }
 }
 
 /// Truncate a RecordBatch to maximum of n rows
diff --git a/datafusion/src/physical_plan/memory.rs 
b/datafusion/src/physical_plan/memory.rs
index 9022077..85d8aee 100644
--- a/datafusion/src/physical_plan/memory.rs
+++ b/datafusion/src/physical_plan/memory.rs
@@ -22,7 +22,10 @@ use std::any::Any;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use super::{ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream};
+use super::{
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
 use crate::error::{DataFusionError, Result};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
@@ -88,6 +91,25 @@ impl ExecutionPlan for MemoryExec {
             self.projection.clone(),
         )?))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let partitions: Vec<_> =
+                    self.partitions.iter().map(|b| b.len()).collect();
+                write!(
+                    f,
+                    "MemoryExec: partitions={}, partition_sizes={:?}",
+                    partitions.len(),
+                    partitions
+                )
+            }
+        }
+    }
 }
 
 impl MemoryExec {
diff --git a/datafusion/src/physical_plan/merge.rs 
b/datafusion/src/physical_plan/merge.rs
index c66532b..c65227c 100644
--- a/datafusion/src/physical_plan/merge.rs
+++ b/datafusion/src/physical_plan/merge.rs
@@ -36,8 +36,7 @@ use arrow::{
 
 use super::RecordBatchStream;
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::ExecutionPlan;
-use crate::physical_plan::Partitioning;
+use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
 
 use super::SendableRecordBatchStream;
 use pin_project_lite::pin_project;
@@ -151,6 +150,18 @@ impl ExecutionPlan for MergeExec {
             }
         }
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "MergeExec")
+            }
+        }
+    }
 }
 
 pin_project! {
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index a8f6f0c..6ab9570 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,7 +17,7 @@
 
 //! Traits for physical query plan, supporting parallel execution for 
partitioned relations.
 
-use std::fmt::{Debug, Display};
+use std::fmt::{self, Debug, Display};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::{any::Any, pin::Pin};
@@ -31,9 +31,10 @@ use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
 
 use async_trait::async_trait;
+pub use display::DisplayFormatType;
 use futures::stream::Stream;
 
-use self::merge::MergeExec;
+use self::{display::DisplayableExecutionPlan, merge::MergeExec};
 use hashbrown::HashMap;
 
 /// Trait for types that stream [arrow::record_batch::RecordBatch]
@@ -120,7 +121,16 @@ pub trait PhysicalPlanner {
     ) -> Result<Arc<dyn ExecutionPlan>>;
 }
 
-/// Partition-aware execution plan for a relation
+/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
+///
+/// Each `ExecutionPlan` is Partition-aware and is responsible for
+/// creating the actual `async` [`SendableRecordBatchStream`]s
+/// of [`RecordBatch`] that incrementally compute the operator's
+/// output from its input partition.
+///
+/// [`ExecutionPlan`] can be displayed in an simplified form using the
+/// return value from [`displayable`] in addition to the (normally
+/// quite verbose) `Debug` output.
 #[async_trait]
 pub trait ExecutionPlan: Debug + Send + Sync {
     /// Returns the execution plan as [`Any`](std::any::Any) so that it can be
@@ -152,6 +162,137 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn metrics(&self) -> HashMap<String, SQLMetric> {
         HashMap::new()
     }
+
+    /// Format this `ExecutionPlan` to `f` in the specified type.
+    ///
+    /// Should not include a newline
+    ///
+    /// Note this function prints a placeholder by default to preserve
+    /// backwards compatibility.
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+        write!(f, "ExecutionPlan(PlaceHolder)")
+    }
+}
+
+/// Return a [wrapper](DisplayableExecutionPlan) around an
+/// [`ExecutionPlan`] which can be displayed in various easier to
+/// understand ways.
+///
+/// ```
+/// use datafusion::prelude::*;
+/// use datafusion::physical_plan::displayable;
+///
+/// // Hard code concurrency as it appears in the RepartitionExec output
+/// let config = ExecutionConfig::new()
+///     .with_concurrency(3);
+/// let mut ctx = ExecutionContext::with_config(config);
+///
+/// // register the a table
+/// ctx.register_csv("example", "tests/example.csv", 
CsvReadOptions::new()).unwrap();
+///
+/// // create a plan to run a SQL query
+/// let plan = ctx
+///    .create_logical_plan("SELECT a FROM example WHERE a < 5")
+///    .unwrap();
+/// let plan = ctx.optimize(&plan).unwrap();
+/// let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+///
+/// // Format using display string
+/// let displayable_plan = displayable(physical_plan.as_ref());
+/// let plan_string = format!("{}", displayable_plan.indent());
+///
+/// assert_eq!("ProjectionExec: expr=[a]\
+///            \n  CoalesceBatchesExec: target_batch_size=4096\
+///            \n    FilterExec: a < 5\
+///            \n      RepartitionExec: partitioning=RoundRobinBatch(3)\
+///            \n        CsvExec: source=Path(tests/example.csv: 
[tests/example.csv]), has_header=true",
+///             plan_string.trim());
+/// ```
+///
+pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
+    DisplayableExecutionPlan::new(plan)
+}
+
+/// Visit all children of this plan, according to the order defined on 
`ExecutionPlanVisitor`.
+// Note that this would be really nice if it were a method on
+// ExecutionPlan, but it can not be because it takes a generic
+// parameter and `ExecutionPlan` is a trait
+pub fn accept<V: ExecutionPlanVisitor>(
+    plan: &dyn ExecutionPlan,
+    visitor: &mut V,
+) -> std::result::Result<(), V::Error> {
+    visitor.pre_visit(plan)?;
+    for child in plan.children() {
+        visit_execution_plan(child.as_ref(), visitor)?;
+    }
+    visitor.post_visit(plan)?;
+    Ok(())
+}
+
+/// Trait that implements the [Visitor
+/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a
+/// depth first walk of `ExecutionPlan` nodes. `pre_visit` is called
+/// before any children are visited, and then `post_visit` is called
+/// after all children have been visited.
+////
+/// To use, define a struct that implements this trait and then invoke
+/// ['accept'].
+///
+/// For example, for an execution plan that looks like:
+///
+/// ```text
+/// ProjectionExec: #id
+///    FilterExec: state = CO
+///       CsvExec:
+/// ```
+///
+/// The sequence of visit operations would be:
+/// ```text
+/// visitor.pre_visit(ProjectionExec)
+/// visitor.pre_visit(FilterExec)
+/// visitor.pre_visit(CsvExec)
+/// visitor.post_visit(CsvExec)
+/// visitor.post_visit(FilterExec)
+/// visitor.post_visit(ProjectionExec)
+/// ```
+pub trait ExecutionPlanVisitor {
+    /// The type of error returned by this visitor
+    type Error;
+
+    /// Invoked on an `ExecutionPlan` plan before any of its child
+    /// inputs have been visited. If Ok(true) is returned, the
+    /// recursion continues. If Err(..) or Ok(false) are returned, the
+    /// recursion stops immediately and the error, if any, is returned
+    /// to `accept`
+    fn pre_visit(
+        &mut self,
+        plan: &dyn ExecutionPlan,
+    ) -> std::result::Result<bool, Self::Error>;
+
+    /// Invoked on an `ExecutionPlan` plan *after* all of its child
+    /// inputs have been visited. The return value is handled the same
+    /// as the return value of `pre_visit`. The provided default
+    /// implementation returns `Ok(true)`.
+    fn post_visit(
+        &mut self,
+        _plan: &dyn ExecutionPlan,
+    ) -> std::result::Result<bool, Self::Error> {
+        Ok(true)
+    }
+}
+
+/// Recursively calls `pre_visit` and `post_visit` for this node and
+/// all of its children, as described on [`ExecutionPlanVisitor`]
+pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
+    plan: &dyn ExecutionPlan,
+    visitor: &mut V,
+) -> std::result::Result<(), V::Error> {
+    visitor.pre_visit(plan)?;
+    for child in plan.children() {
+        visit_execution_plan(child.as_ref(), visitor)?;
+    }
+    visitor.post_visit(plan)?;
+    Ok(())
 }
 
 /// Execute the [ExecutionPlan] and collect the results in memory
@@ -290,6 +431,12 @@ pub trait AggregateExpr: Send + Sync + Debug {
     /// expressions that are passed to the Accumulator.
     /// Single-column aggregations such as `sum` return a single value, others 
(e.g. `cov`) return many.
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+    /// Human readable name such as `"MIN(c2)"`. The default
+    /// implementation returns placeholder text.
+    fn name(&self) -> &str {
+        "AggregateExpr: default name"
+    }
 }
 
 /// An accumulator represents a stateful object that lives throughout the 
evaluation of multiple rows and
@@ -351,6 +498,7 @@ pub mod cross_join;
 pub mod crypto_expressions;
 pub mod csv;
 pub mod datetime_expressions;
+pub mod display;
 pub mod distinct_expressions;
 pub mod empty;
 pub mod explain;
diff --git a/datafusion/src/physical_plan/parquet.rs 
b/datafusion/src/physical_plan/parquet.rs
index dee0fc8..dd5e77b 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -27,7 +27,7 @@ use super::{
     planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, 
RecordBatchStream,
     SendableRecordBatchStream,
 };
-use crate::physical_plan::{common, ExecutionPlan, Partitioning};
+use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, 
Partitioning};
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
@@ -864,6 +864,32 @@ impl ExecutionPlan for ParquetExec {
             inner: ReceiverStream::new(response_rx),
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let files: Vec<_> = self
+                    .partitions
+                    .iter()
+                    .map(|pp| pp.filenames.iter())
+                    .flatten()
+                    .map(|s| s.as_str())
+                    .collect();
+
+                write!(
+                    f,
+                    "ParquetExec: batch_size={}, limit={:?}, partitions=[{}]",
+                    self.batch_size,
+                    self.limit,
+                    files.join(", ")
+                )
+            }
+        }
+    }
 }
 
 fn send_result(
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index 664e4dc..d11e8e9 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -23,7 +23,6 @@ use super::{
     aggregates, cross_join::CrossJoinExec, empty::EmptyExec, 
expressions::binary,
     functions, hash_join::PartitionMode, udaf, union::UnionExec,
 };
-use crate::error::{DataFusionError, Result};
 use crate::execution::context::ExecutionContextState;
 use crate::logical_plan::{
     DFSchema, Expr, LogicalPlan, Operator, Partitioning as 
LogicalPartitioning, PlanType,
@@ -45,6 +44,10 @@ use crate::physical_plan::{AggregateExpr, ExecutionPlan, 
PhysicalExpr, PhysicalP
 use crate::prelude::JoinType;
 use crate::scalar::ScalarValue;
 use crate::variable::VarType;
+use crate::{
+    error::{DataFusionError, Result},
+    physical_plan::displayable,
+};
 use arrow::compute::can_cast_types;
 
 use arrow::compute::SortOptions;
@@ -383,7 +386,7 @@ impl DefaultPhysicalPlanner {
                 if *verbose {
                     stringified_plans.push(StringifiedPlan::new(
                         PlanType::PhysicalPlan,
-                        format!("{:#?}", input),
+                        displayable(input.as_ref()).indent().to_string(),
                     ));
                 }
                 Ok(Arc::new(ExplainExec::new(
diff --git a/datafusion/src/physical_plan/projection.rs 
b/datafusion/src/physical_plan/projection.rs
index a881beb..c0d78ff 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -26,7 +26,9 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
+use crate::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
+};
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
@@ -130,6 +132,31 @@ impl ExecutionPlan for ProjectionExec {
             input: self.input.execute(partition).await?,
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self
+                    .expr
+                    .iter()
+                    .map(|(e, alias)| {
+                        let e = e.to_string();
+                        if &e != alias {
+                            format!("{} as {}", e, alias)
+                        } else {
+                            e
+                        }
+                    })
+                    .collect();
+
+                write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
+            }
+        }
+    }
 }
 
 fn batch_project(
diff --git a/datafusion/src/physical_plan/repartition.rs 
b/datafusion/src/physical_plan/repartition.rs
index 7243550..2599690 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
 use std::{any::Any, collections::HashMap, vec};
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{ExecutionPlan, Partitioning};
+use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
 use arrow::record_batch::RecordBatch;
 use arrow::{array::Array, error::Result as ArrowResult};
 use arrow::{compute::take, datatypes::SchemaRef};
@@ -235,6 +235,18 @@ impl ExecutionPlan for RepartitionExec {
             input: 
UnboundedReceiverStream::new(channels.remove(&partition).unwrap().1),
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "RepartitionExec: partitioning={:?}", 
self.partitioning)
+            }
+        }
+    }
 }
 
 impl RepartitionExec {
diff --git a/datafusion/src/physical_plan/sort.rs 
b/datafusion/src/physical_plan/sort.rs
index 010e406..8229060 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -41,7 +41,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::{
-    common, Distribution, ExecutionPlan, Partitioning, SQLMetric,
+    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, 
SQLMetric,
 };
 
 /// Sort execution plan
@@ -145,6 +145,19 @@ impl ExecutionPlan for SortExec {
         )))
     }
 
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| 
e.to_string()).collect();
+                write!(f, "SortExec: [{}]", expr.join(","))
+            }
+        }
+    }
+
     fn metrics(&self) -> HashMap<String, SQLMetric> {
         let mut metrics = HashMap::new();
         metrics.insert("outputRows".to_owned(), (*self.output_rows).clone());
diff --git a/datafusion/src/physical_plan/udaf.rs 
b/datafusion/src/physical_plan/udaf.rs
index 3dc6aa4..f7515d3 100644
--- a/datafusion/src/physical_plan/udaf.rs
+++ b/datafusion/src/physical_plan/udaf.rs
@@ -165,4 +165,8 @@ impl AggregateExpr for AggregateFunctionExpr {
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         (self.fun.accumulator)()
     }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
 }
diff --git a/datafusion/tests/custom_sources.rs 
b/datafusion/tests/custom_sources.rs
index a00dd6a..b39f47b 100644
--- a/datafusion/tests/custom_sources.rs
+++ b/datafusion/tests/custom_sources.rs
@@ -20,11 +20,14 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
-use datafusion::error::{DataFusionError, Result};
 use datafusion::{
     datasource::{datasource::Statistics, TableProvider},
     physical_plan::collect,
 };
+use datafusion::{
+    error::{DataFusionError, Result},
+    physical_plan::DisplayFormatType,
+};
 
 use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::{col, Expr, LogicalPlan, LogicalPlanBuilder};
@@ -128,6 +131,18 @@ impl ExecutionPlan for CustomExecutionPlan {
     async fn execute(&self, _partition: usize) -> 
Result<SendableRecordBatchStream> {
         Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "CustomExecutionPlan: projection={:#?}", 
self.projection)
+            }
+        }
+    }
 }
 
 impl TableProvider for CustomTableProvider {
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index c80ffe4..0b9cc2a 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -31,9 +31,8 @@ use arrow::{
     util::display::array_value_to_string,
 };
 
-use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::LogicalPlan;
-use datafusion::prelude::create_udf;
+use datafusion::prelude::*;
 use datafusion::{
     datasource::{csv::CsvReadOptions, MemTable},
     physical_plan::collect,
@@ -42,6 +41,7 @@ use datafusion::{
     error::{DataFusionError, Result},
     physical_plan::ColumnarValue,
 };
+use datafusion::{execution::context::ExecutionContext, 
physical_plan::displayable};
 
 #[tokio::test]
 async fn nyc() -> Result<()> {
@@ -2932,3 +2932,47 @@ async fn test_cast_expressions_error() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn test_physical_plan_display_indent() {
+    // Hard code concurrency as it appears in the RepartitionExec output
+    let config = ExecutionConfig::new().with_concurrency(3);
+    let mut ctx = ExecutionContext::with_config(config);
+    register_aggregate_csv(&mut ctx).unwrap();
+    let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
+         FROM aggregate_test_100 \
+         WHERE c12 < 10 \
+         GROUP BY c1 \
+         ORDER BY the_min DESC \
+         LIMIT 10";
+    let plan = ctx.create_logical_plan(&sql).unwrap();
+    let plan = ctx.optimize(&plan).unwrap();
+
+    let physical_plan = ctx.create_physical_plan(&plan).unwrap();
+    let expected = vec![
+    "GlobalLimitExec: limit=10",
+    "  SortExec: [the_min DESC]",
+    "    ProjectionExec: expr=[c1, MAX(c12), MIN(c12) as the_min]",
+    "      HashAggregateExec: mode=Final, gby=[c1], aggr=[MAX(c12), MIN(c12)]",
+    "        MergeExec",
+    "          HashAggregateExec: mode=Partial, gby=[c1], aggr=[MAX(c12), 
MIN(c12)]",
+    "            CoalesceBatchesExec: target_batch_size=4096",
+    "              FilterExec: c12 < CAST(10 AS Float64)",
+    "                RepartitionExec: partitioning=RoundRobinBatch(3)",
+    "                  CsvExec: 
source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: 
[ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
+    ];
+
+    let data_path = arrow::util::test_util::arrow_test_data();
+    let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
+        .trim()
+        .lines()
+        // normalize paths
+        .map(|s| s.replace(&data_path, "ARROW_TEST_DATA"))
+        .collect::<Vec<_>>();
+
+    assert_eq!(
+        expected, actual,
+        "expected:\n{:#?}\nactual:\n\n{:#?}\n",
+        expected, actual
+    );
+}
diff --git a/datafusion/tests/user_defined_plan.rs 
b/datafusion/tests/user_defined_plan.rs
index 5e38c57..8914c05 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -75,8 +75,8 @@ use datafusion::{
     optimizer::{optimizer::OptimizerRule, utils::optimize_children},
     physical_plan::{
         planner::{DefaultPhysicalPlanner, ExtensionPlanner},
-        Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, 
RecordBatchStream,
-        SendableRecordBatchStream,
+        DisplayFormatType, Distribution, ExecutionPlan, Partitioning, 
PhysicalPlanner,
+        RecordBatchStream, SendableRecordBatchStream,
     },
     prelude::{ExecutionConfig, ExecutionContext},
 };
@@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> {
     let mut ctx = setup_table(make_topk_context()).await?;
 
     let expected = vec![
-        "| logical_plan after topk                 | TopK: k=3                 
                     |",
-        "|                                         |   Projection: 
#customer_id, #revenue           |",
-        "|                                         |     TableScan: sales 
projection=Some([0, 1])   |",
+        "| logical_plan after topk                 | TopK: k=3                 
                                                           |",
+        "|                                         |   Projection: 
#customer_id, #revenue                                                 |",
+        "|                                         |     TableScan: sales 
projection=Some([0, 1])                                         |",
     ].join("\n");
 
     let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);
@@ -397,6 +397,18 @@ impl ExecutionPlan for TopKExec {
             state: BTreeMap::new(),
         }))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "TopKExec: k={}", self.k)
+            }
+        }
+    }
 }
 
 // A very specialized TopK implementation

Reply via email to