This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bd3666b  Add support for EXPLAIN ANALYZE (#858)
bd3666b is described below

commit bd3666b30f4c2ed4323c59707053aa50f2ff6121
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 12 12:42:58 2021 -0400

    Add support for EXPLAIN ANALYZE (#858)
    
    * Add support for EXPLAIN ANALYZE
    
    * fix fmt
---
 ballista/rust/core/proto/ballista.proto            |   6 +
 .../rust/core/src/serde/logical_plan/from_proto.rs |   9 +-
 ballista/rust/core/src/serde/logical_plan/mod.rs   |  41 ++++-
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  11 ++
 datafusion/src/dataframe.rs                        |   6 +-
 datafusion/src/execution/context.rs                |   2 +-
 datafusion/src/execution/dataframe_impl.rs         |   6 +-
 datafusion/src/logical_plan/builder.rs             |  36 ++--
 datafusion/src/logical_plan/plan.rs                |  16 ++
 datafusion/src/optimizer/constant_folding.rs       |   1 +
 datafusion/src/optimizer/filter_push_down.rs       |   1 +
 datafusion/src/optimizer/hash_build_probe_order.rs |   6 +
 datafusion/src/optimizer/projection_push_down.rs   |  25 +++
 datafusion/src/optimizer/utils.rs                  |   3 +-
 datafusion/src/physical_plan/analyze.rs            | 201 +++++++++++++++++++++
 datafusion/src/physical_plan/mod.rs                |   2 +
 datafusion/src/physical_plan/parquet.rs            |  34 +---
 datafusion/src/physical_plan/planner.rs            |  12 +-
 datafusion/src/physical_plan/stream.rs             |  64 +++++++
 datafusion/src/sql/planner.rs                      |  33 ++--
 datafusion/src/test/exec.rs                        |  47 +----
 datafusion/tests/sql.rs                            |  48 +++++
 22 files changed, 505 insertions(+), 105 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 2538a10..a1608c6 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -249,6 +249,7 @@ message LogicalPlanNode {
     RepartitionNode repartition = 9;
     EmptyRelationNode empty_relation = 10;
     CreateExternalTableNode create_external_table = 11;
+    AnalyzeNode analyze = 14;
     ExplainNode explain = 12;
     WindowNode window = 13;
   }
@@ -323,6 +324,11 @@ enum FileType{
   CSV = 2;
 }
 
+message AnalyzeNode {
+  LogicalPlanNode input = 1;
+  bool verbose = 2;
+}
+
 message ExplainNode{
   LogicalPlanNode input = 1;
   bool verbose = 2;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 31b8b6d..f9761a2 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -231,10 +231,17 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     has_header: create_extern_table.has_header,
                 })
             }
+            LogicalPlanType::Analyze(analyze) => {
+                let input: LogicalPlan = convert_box_required!(analyze.input)?;
+                LogicalPlanBuilder::from(input)
+                    .explain(analyze.verbose, true)?
+                    .build()
+                    .map_err(|e| e.into())
+            }
             LogicalPlanType::Explain(explain) => {
                 let input: LogicalPlan = convert_box_required!(explain.input)?;
                 LogicalPlanBuilder::from(input)
-                    .explain(explain.verbose)?
+                    .explain(explain.verbose, false)?
                     .build()
                     .map_err(|e| e.into())
             }
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index e4e4383..dbaac1d 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -662,6 +662,43 @@ mod roundtrip_tests {
     }
 
     #[test]
+    fn roundtrip_analyze() -> Result<()> {
+        let schema = Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("first_name", DataType::Utf8, false),
+            Field::new("last_name", DataType::Utf8, false),
+            Field::new("state", DataType::Utf8, false),
+            Field::new("salary", DataType::Int32, false),
+        ]);
+
+        let verbose_plan = LogicalPlanBuilder::scan_csv(
+            "employee.csv",
+            CsvReadOptions::new().schema(&schema).has_header(true),
+            Some(vec![3, 4]),
+        )
+        .and_then(|plan| plan.sort(vec![col("salary")]))
+        .and_then(|plan| plan.explain(true, true))
+        .and_then(|plan| plan.build())
+        .map_err(BallistaError::DataFusionError)?;
+
+        let plan = LogicalPlanBuilder::scan_csv(
+            "employee.csv",
+            CsvReadOptions::new().schema(&schema).has_header(true),
+            Some(vec![3, 4]),
+        )
+        .and_then(|plan| plan.sort(vec![col("salary")]))
+        .and_then(|plan| plan.explain(false, true))
+        .and_then(|plan| plan.build())
+        .map_err(BallistaError::DataFusionError)?;
+
+        roundtrip_test!(plan);
+
+        roundtrip_test!(verbose_plan);
+
+        Ok(())
+    }
+
+    #[test]
     fn roundtrip_explain() -> Result<()> {
         let schema = Schema::new(vec![
             Field::new("id", DataType::Int32, false),
@@ -677,7 +714,7 @@ mod roundtrip_tests {
             Some(vec![3, 4]),
         )
         .and_then(|plan| plan.sort(vec![col("salary")]))
-        .and_then(|plan| plan.explain(true))
+        .and_then(|plan| plan.explain(true, false))
         .and_then(|plan| plan.build())
         .map_err(BallistaError::DataFusionError)?;
 
@@ -687,7 +724,7 @@ mod roundtrip_tests {
             Some(vec![3, 4]),
         )
         .and_then(|plan| plan.sort(vec![col("salary")]))
-        .and_then(|plan| plan.explain(false))
+        .and_then(|plan| plan.explain(false, false))
         .and_then(|plan| plan.build())
         .map_err(BallistaError::DataFusionError)?;
 
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 1a3834a..e1c7f53 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -931,6 +931,17 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     )),
                 })
             }
+            LogicalPlan::Analyze { verbose, input, .. } => {
+                let input: protobuf::LogicalPlanNode = 
input.as_ref().try_into()?;
+                Ok(protobuf::LogicalPlanNode {
+                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
+                        protobuf::AnalyzeNode {
+                            input: Some(Box::new(input)),
+                            verbose: *verbose,
+                        },
+                    ))),
+                })
+            }
             LogicalPlan::Explain { verbose, plan, .. } => {
                 let input: protobuf::LogicalPlanNode = 
plan.as_ref().try_into()?;
                 Ok(protobuf::LogicalPlanNode {
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 1d4cffd..608f6db 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -289,6 +289,8 @@ pub trait DataFrame: Send + Sync {
 
     /// Return a DataFrame with the explanation of its plan so far.
     ///
+    /// if `analyze` is specified, runs the plan and reports metrics
+    ///
     /// ```
     /// # use datafusion::prelude::*;
     /// # use datafusion::error::Result;
@@ -296,11 +298,11 @@ pub trait DataFrame: Send + Sync {
     /// # async fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
     /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
-    /// let batches = df.limit(100)?.explain(false)?.collect().await?;
+    /// let batches = df.limit(100)?.explain(false, false)?.collect().await?;
     /// # Ok(())
     /// # }
     /// ```
-    fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>>;
+    fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn 
DataFrame>>;
 
     /// Return a `FunctionRegistry` used to plan udf's calls
     ///
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 0cf8b3b..7a54bea 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -986,7 +986,7 @@ mod tests {
 
         let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, 
None)
             .unwrap()
-            .explain(true)
+            .explain(true, false)
             .unwrap()
             .build()
             .unwrap();
diff --git a/datafusion/src/execution/dataframe_impl.rs 
b/datafusion/src/execution/dataframe_impl.rs
index 1c0094b..ddaa04e 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -183,9 +183,9 @@ impl DataFrame for DataFrameImpl {
         self.plan.schema()
     }
 
-    fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
+    fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn 
DataFrame>> {
         let plan = LogicalPlanBuilder::from(self.to_logical_plan())
-            .explain(verbose)?
+            .explain(verbose, analyze)?
             .build()?;
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
@@ -318,7 +318,7 @@ mod tests {
         let df = df
             .select_columns(&["c1", "c2", "c11"])?
             .limit(10)?
-            .explain(false)?;
+            .explain(false, false)?;
         let plan = df.to_logical_plan();
 
         // build query using SQL
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index 0dfc1e7..d9afe2e 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -455,18 +455,32 @@ impl LogicalPlanBuilder {
     }
 
     /// Create an expression to represent the explanation of the plan
-    pub fn explain(&self, verbose: bool) -> Result<Self> {
-        let stringified_plans =
-            vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
-
+    ///
+    /// if `analyze` is true, runs the actual plan and produces
+    /// information about metrics during run.
+    ///
+    /// if `verbose` is true, prints out additional details.
+    pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Self> {
         let schema = LogicalPlan::explain_schema();
-
-        Ok(Self::from(LogicalPlan::Explain {
-            verbose,
-            plan: Arc::new(self.plan.clone()),
-            stringified_plans,
-            schema: schema.to_dfschema_ref()?,
-        }))
+        let schema = schema.to_dfschema_ref()?;
+
+        if analyze {
+            Ok(Self::from(LogicalPlan::Analyze {
+                verbose,
+                input: Arc::new(self.plan.clone()),
+                schema,
+            }))
+        } else {
+            let stringified_plans =
+                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
+
+            Ok(Self::from(LogicalPlan::Explain {
+                verbose,
+                plan: Arc::new(self.plan.clone()),
+                stringified_plans,
+                schema,
+            }))
+        }
     }
 
     /// Build the plan
diff --git a/datafusion/src/logical_plan/plan.rs 
b/datafusion/src/logical_plan/plan.rs
index 28405fb..cb81b8d 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -213,6 +213,16 @@ pub enum LogicalPlan {
         /// The output schema of the explain (2 columns of text)
         schema: DFSchemaRef,
     },
+    /// Runs the actual plan, and then prints the physical plan with
+    /// with execution metrics.
+    Analyze {
+        /// Should extra detail be included?
+        verbose: bool,
+        /// The logical plan that is being EXPLAIN ANALYZE'd
+        input: Arc<LogicalPlan>,
+        /// The output schema of the explain (2 columns of text)
+        schema: DFSchemaRef,
+    },
     /// Extension operator defined outside of DataFusion
     Extension {
         /// The runtime extension operator
@@ -239,6 +249,7 @@ impl LogicalPlan {
             LogicalPlan::Limit { input, .. } => input.schema(),
             LogicalPlan::CreateExternalTable { schema, .. } => schema,
             LogicalPlan::Explain { schema, .. } => schema,
+            LogicalPlan::Analyze { schema, .. } => schema,
             LogicalPlan::Extension { node } => node.schema(),
             LogicalPlan::Union { schema, .. } => schema,
         }
@@ -278,6 +289,7 @@ impl LogicalPlan {
             }
             LogicalPlan::Extension { node } => vec![node.schema()],
             LogicalPlan::Explain { schema, .. }
+            | LogicalPlan::Analyze { schema, .. }
             | LogicalPlan::EmptyRelation { schema, .. }
             | LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
             LogicalPlan::Limit { input, .. }
@@ -327,6 +339,7 @@ impl LogicalPlan {
             | LogicalPlan::Limit { .. }
             | LogicalPlan::CreateExternalTable { .. }
             | LogicalPlan::CrossJoin { .. }
+            | LogicalPlan::Analyze { .. }
             | LogicalPlan::Explain { .. }
             | LogicalPlan::Union { .. } => {
                 vec![]
@@ -350,6 +363,7 @@ impl LogicalPlan {
             LogicalPlan::Extension { node } => node.inputs(),
             LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
             LogicalPlan::Explain { plan, .. } => vec![plan],
+            LogicalPlan::Analyze { input: plan, .. } => vec![plan],
             // plans without inputs
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
@@ -495,6 +509,7 @@ impl LogicalPlan {
                 true
             }
             LogicalPlan::Explain { plan, .. } => plan.accept(visitor)?,
+            LogicalPlan::Analyze { input: plan, .. } => plan.accept(visitor)?,
             // plans without inputs
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation { .. }
@@ -790,6 +805,7 @@ impl LogicalPlan {
                         write!(f, "CreateExternalTable: {:?}", name)
                     }
                     LogicalPlan::Explain { .. } => write!(f, "Explain"),
+                    LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
                     LogicalPlan::Union { .. } => write!(f, "Union"),
                     LogicalPlan::Extension { ref node } => 
node.fmt_for_explain(f),
                 }
diff --git a/datafusion/src/optimizer/constant_folding.rs 
b/datafusion/src/optimizer/constant_folding.rs
index b4c4a96..31b0b7f 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -80,6 +80,7 @@ impl OptimizerRule for ConstantFolding {
             | LogicalPlan::Extension { .. }
             | LogicalPlan::Sort { .. }
             | LogicalPlan::Explain { .. }
+            | LogicalPlan::Analyze { .. }
             | LogicalPlan::Limit { .. }
             | LogicalPlan::Union { .. }
             | LogicalPlan::Join { .. }
diff --git a/datafusion/src/optimizer/filter_push_down.rs 
b/datafusion/src/optimizer/filter_push_down.rs
index 039e92d..d0990de 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -284,6 +284,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
             // push the optimization to the plan of this explain
             push_down(&state, plan)
         }
+        LogicalPlan::Analyze { .. } => push_down(&state, plan),
         LogicalPlan::Filter { input, predicate } => {
             let mut predicates = vec![];
             split_members(predicate, &mut predicates);
diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs 
b/datafusion/src/optimizer/hash_build_probe_order.rs
index ecb3b40..209faf4 100644
--- a/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -80,6 +80,11 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> 
{
             // we cannot predict how rows will be repartitioned
             None
         }
+        LogicalPlan::Analyze { .. } => {
+            // Analyze  produces one row, verbose produces more
+            // but it should never be used as an input to a Join anyways
+            None
+        }
         // the following operators are special cases and not querying data
         LogicalPlan::CreateExternalTable { .. } => None,
         LogicalPlan::Explain { .. } => None,
@@ -201,6 +206,7 @@ impl OptimizerRule for HashBuildProbeOrder {
             | LogicalPlan::Sort { .. }
             | LogicalPlan::CreateExternalTable { .. }
             | LogicalPlan::Explain { .. }
+            | LogicalPlan::Analyze { .. }
             | LogicalPlan::Union { .. }
             | LogicalPlan::Extension { .. } => {
                 let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/projection_push_down.rs 
b/datafusion/src/optimizer/projection_push_down.rs
index 96c5094..7dddbff 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -356,6 +356,31 @@ fn optimize_plan(
         LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
             "Unsupported logical plan: Explain must be root of the 
plan".to_string(),
         )),
+        LogicalPlan::Analyze {
+            input,
+            verbose,
+            schema,
+        } => {
+            // make sure we keep all the columns from the input plan
+            let required_columns = input
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.qualified_column())
+                .collect::<HashSet<Column>>();
+
+            Ok(LogicalPlan::Analyze {
+                input: Arc::new(optimize_plan(
+                    optimizer,
+                    input,
+                    &required_columns,
+                    false,
+                    execution_props,
+                )?),
+                verbose: *verbose,
+                schema: schema.clone(),
+            })
+        }
         LogicalPlan::Union {
             inputs,
             schema,
diff --git a/datafusion/src/optimizer/utils.rs 
b/datafusion/src/optimizer/utils.rs
index 615f0cc..8ce6fe5 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -198,7 +198,8 @@ pub fn from_plan(
         LogicalPlan::EmptyRelation { .. }
         | LogicalPlan::TableScan { .. }
         | LogicalPlan::CreateExternalTable { .. }
-        | LogicalPlan::Explain { .. } => Ok(plan.clone()),
+        | LogicalPlan::Explain { .. }
+        | LogicalPlan::Analyze { .. } => Ok(plan.clone()),
     }
 }
 
diff --git a/datafusion/src/physical_plan/analyze.rs 
b/datafusion/src/physical_plan/analyze.rs
new file mode 100644
index 0000000..36726ad
--- /dev/null
+++ b/datafusion/src/physical_plan/analyze.rs
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the ANALYZE operator
+
+use std::sync::Arc;
+use std::{any::Any, time::Instant};
+
+use crate::{
+    error::{DataFusionError, Result},
+    physical_plan::{display::DisplayableExecutionPlan, Partitioning},
+    physical_plan::{DisplayFormatType, ExecutionPlan},
+};
+use arrow::{array::StringBuilder, datatypes::SchemaRef, 
record_batch::RecordBatch};
+use futures::StreamExt;
+
+use super::{stream::RecordBatchReceiverStream, Distribution, 
SendableRecordBatchStream};
+use async_trait::async_trait;
+
+/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
+/// discards the results, and then prints out an annotated plan with metrics
+#[derive(Debug, Clone)]
+pub struct AnalyzeExec {
+    /// control how much extra to print
+    verbose: bool,
+    /// The input plan (the plan being analyzed)
+    input: Arc<dyn ExecutionPlan>,
+    /// The output schema for RecordBatches of this exec node
+    schema: SchemaRef,
+}
+
+impl AnalyzeExec {
+    /// Create a new AnalyzeExec
+    pub fn new(verbose: bool, input: Arc<dyn ExecutionPlan>, schema: 
SchemaRef) -> Self {
+        AnalyzeExec {
+            verbose,
+            input,
+            schema,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for AnalyzeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Specifies we want the input as a single stream
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::SinglePartition
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn with_new_children(
+        &self,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if children.len() == 1 {
+            Ok(Arc::new(Self::new(
+                self.verbose,
+                children.pop().unwrap(),
+                self.schema.clone(),
+            )))
+        } else {
+            Err(DataFusionError::Internal(format!(
+                "Invalid child count for AnalyzeExec. Expected 1 got {}",
+                children.len()
+            )))
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "AnalyzeExec invalid partition. Expected 0, got {}",
+                partition
+            )));
+        }
+
+        // should be ensured by `SinglePartition`  above
+        let input_partitions = 
self.input.output_partitioning().partition_count();
+        if input_partitions != 1 {
+            return Err(DataFusionError::Internal(format!(
+                "AnalyzeExec invalid number of input partitions. Expected 1, 
got {}",
+                input_partitions
+            )));
+        }
+
+        let (tx, rx) = tokio::sync::mpsc::channel(input_partitions);
+
+        let captured_input = self.input.clone();
+        let mut input_stream = captured_input.execute(0).await?;
+        let captured_schema = self.schema.clone();
+        let verbose = self.verbose;
+
+        // Task reads batches the input and when complete produce a
+        // RecordBatch with a report that is written to `tx` when done
+        tokio::task::spawn(async move {
+            let start = Instant::now();
+            let mut total_rows = 0;
+
+            // Note the code below ignores errors sending on tx. An
+            // error sending means the plan is being torn down and
+            // nothing is left that will handle the error (aka no one
+            // will hear us scream)
+            while let Some(b) = input_stream.next().await {
+                match b {
+                    Ok(batch) => {
+                        total_rows += batch.num_rows();
+                    }
+                    b @ Err(_) => {
+                        // try and pass on errors from input
+                        if tx.send(b).await.is_err() {
+                            // receiver hung up, stop executing (no
+                            // one will look at any further results we
+                            // send)
+                            return;
+                        }
+                    }
+                }
+            }
+            let end = Instant::now();
+
+            let mut type_builder = StringBuilder::new(1);
+            let mut plan_builder = StringBuilder::new(1);
+
+            // TODO use some sort of enum rather than strings?
+            type_builder.append_value("Plan with Metrics").unwrap();
+
+            let annotated_plan =
+                DisplayableExecutionPlan::with_metrics(captured_input.as_ref())
+                    .indent()
+                    .to_string();
+            plan_builder.append_value(annotated_plan).unwrap();
+
+            // Verbose output
+            // TODO make this more sophisticated
+            if verbose {
+                type_builder.append_value("Output Rows").unwrap();
+                plan_builder.append_value(total_rows.to_string()).unwrap();
+
+                type_builder.append_value("Duration").unwrap();
+                plan_builder
+                    .append_value(format!("{:?}", end - start))
+                    .unwrap();
+            }
+
+            let maybe_batch = RecordBatch::try_new(
+                captured_schema,
+                vec![
+                    Arc::new(type_builder.finish()),
+                    Arc::new(plan_builder.finish()),
+                ],
+            );
+            // again ignore error
+            tx.send(maybe_batch).await.ok();
+        });
+
+        Ok(RecordBatchReceiverStream::create(&self.schema, rx))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "AnalyzeExec verbose={}", self.verbose)
+            }
+        }
+    }
+}
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 0df6e60..8f7db72 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -639,6 +639,7 @@ pub trait Accumulator: Send + Sync + Debug {
 }
 
 pub mod aggregates;
+pub mod analyze;
 pub mod array_expressions;
 pub mod coalesce_batches;
 pub mod coalesce_partitions;
@@ -671,6 +672,7 @@ pub mod repartition;
 pub mod sort;
 pub mod sort_preserving_merge;
 pub mod source;
+pub mod stream;
 pub mod string_expressions;
 pub mod type_coercion;
 pub mod udaf;
diff --git a/datafusion/src/physical_plan/parquet.rs 
b/datafusion/src/physical_plan/parquet.rs
index ec5611f..ff8bb5b 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -20,7 +20,6 @@
 use std::fmt;
 use std::fs::File;
 use std::sync::Arc;
-use std::task::{Context, Poll};
 use std::{any::Any, convert::TryInto};
 
 use crate::{
@@ -28,8 +27,7 @@ use crate::{
     logical_plan::{Column, Expr},
     physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
     physical_plan::{
-        common, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
-        SendableRecordBatchStream,
+        common, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
     },
     scalar::ScalarValue,
 };
@@ -55,12 +53,11 @@ use tokio::{
     sync::mpsc::{channel, Receiver, Sender},
     task,
 };
-use tokio_stream::wrappers::ReceiverStream;
 
 use crate::datasource::datasource::{ColumnStatistics, Statistics};
 use async_trait::async_trait;
-use futures::stream::{Stream, StreamExt};
 
+use super::stream::RecordBatchReceiverStream;
 use super::SQLMetric;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::Accumulator;
@@ -688,10 +685,7 @@ impl ExecutionPlan for ParquetExec {
             }
         });
 
-        Ok(Box::pin(ParquetStream {
-            schema: self.schema.clone(),
-            inner: ReceiverStream::new(response_rx),
-        }))
+        Ok(RecordBatchReceiverStream::create(&self.schema, response_rx))
     }
 
     fn fmt_as(
@@ -938,28 +932,6 @@ fn split_files(filenames: &[String], n: usize) -> 
Vec<&[String]> {
     filenames.chunks(chunk_size).collect()
 }
 
-struct ParquetStream {
-    schema: SchemaRef,
-    inner: ReceiverStream<ArrowResult<RecordBatch>>,
-}
-
-impl Stream for ParquetStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        self.inner.poll_next_unpin(cx)
-    }
-}
-
-impl RecordBatchStream for ParquetStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index e662821..256a43b 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -17,6 +17,7 @@
 
 //! Physical query planner
 
+use super::analyze::AnalyzeExec;
 use super::{
     aggregates, cross_join::CrossJoinExec, empty::EmptyExec, 
expressions::binary,
     functions, hash_join::PartitionMode, udaf, union::UnionExec, windows,
@@ -741,6 +742,15 @@ impl DefaultPhysicalPlanner {
             LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
                 "Unsupported logical plan: Explain must be root of the 
plan".to_string(),
             )),
+            LogicalPlan::Analyze {
+                verbose,
+                input,
+                schema,
+            } => {
+                let input = self.create_initial_plan(input, ctx_state)?;
+                let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+                Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
+            }
             LogicalPlan::Extension { node } => {
                 let physical_inputs = node
                     .inputs()
@@ -1651,7 +1661,7 @@ mod tests {
         let logical_plan =
             LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None)
                 .unwrap()
-                .explain(true)
+                .explain(true, false)
                 .unwrap()
                 .build()
                 .unwrap();
diff --git a/datafusion/src/physical_plan/stream.rs 
b/datafusion/src/physical_plan/stream.rs
new file mode 100644
index 0000000..0c29f87
--- /dev/null
+++ b/datafusion/src/physical_plan/stream.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream wrappers for physical operators
+
+use arrow::{
+    datatypes::SchemaRef, error::Result as ArrowResult, 
record_batch::RecordBatch,
+};
+use futures::{Stream, StreamExt};
+use tokio_stream::wrappers::ReceiverStream;
+
+use super::{RecordBatchStream, SendableRecordBatchStream};
+
+/// Adapter for a tokio [`ReceiverStream`] that implements the
+/// [`SendableRecordBatchStream`]
+/// interface
+pub struct RecordBatchReceiverStream {
+    schema: SchemaRef,
+    inner: ReceiverStream<ArrowResult<RecordBatch>>,
+}
+
+impl RecordBatchReceiverStream {
+    /// Construct a new [`RecordBatchReceiverStream`] which will send
+    /// batches of the specfied schema from `inner`
+    pub fn create(
+        schema: &SchemaRef,
+        rx: tokio::sync::mpsc::Receiver<ArrowResult<RecordBatch>>,
+    ) -> SendableRecordBatchStream {
+        let schema = schema.clone();
+        let inner = ReceiverStream::new(rx);
+        Box::pin(Self { schema, inner })
+    }
+}
+
+impl Stream for RecordBatchReceiverStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        self.inner.poll_next_unpin(cx)
+    }
+}
+
+impl RecordBatchStream for RecordBatchReceiverStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index ef2b634..29204f4 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -101,8 +101,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             Statement::Explain {
                 verbose,
                 statement,
-                analyze: _,
-            } => self.explain_statement_to_plan(*verbose, statement),
+                analyze,
+            } => self.explain_statement_to_plan(*verbose, *analyze, statement),
             Statement::Query(query) => self.query_to_plan(query),
             Statement::ShowVariable { variable } => 
self.show_variable_to_plan(variable),
             Statement::ShowColumns {
@@ -230,21 +230,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     pub fn explain_statement_to_plan(
         &self,
         verbose: bool,
+        analyze: bool,
         statement: &Statement,
     ) -> Result<LogicalPlan> {
         let plan = self.sql_statement_to_plan(statement)?;
-
-        let stringified_plans = 
vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
-
-        let schema = LogicalPlan::explain_schema();
         let plan = Arc::new(plan);
+        let schema = LogicalPlan::explain_schema();
+        let schema = schema.to_dfschema_ref()?;
 
-        Ok(LogicalPlan::Explain {
-            verbose,
-            plan,
-            stringified_plans,
-            schema: schema.to_dfschema_ref()?,
-        })
+        if analyze {
+            Ok(LogicalPlan::Analyze {
+                verbose,
+                input: plan,
+                schema,
+            })
+        } else {
+            let stringified_plans =
+                vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
+            Ok(LogicalPlan::Explain {
+                verbose,
+                plan,
+                stringified_plans,
+                schema,
+            })
+        }
     }
 
     fn build_schema(&self, columns: &[SQLColumnDef]) -> Result<Schema> {
diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs
index 3971db3..247dab1 100644
--- a/datafusion/src/test/exec.rs
+++ b/datafusion/src/test/exec.rs
@@ -30,13 +30,15 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use futures::{Stream, StreamExt};
-use tokio_stream::wrappers::ReceiverStream;
+use futures::Stream;
 
-use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
     ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
 };
+use crate::{
+    error::{DataFusionError, Result},
+    physical_plan::stream::RecordBatchReceiverStream,
+};
 
 /// Index into the data that has been returned so far
 #[derive(Debug, Default, Clone)]
@@ -161,8 +163,6 @@ impl ExecutionPlan for MockExec {
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
         assert_eq!(partition, 0);
 
-        let schema = self.schema();
-
         // Result doesn't implement clone, so do it ourself
         let data: Vec<_> = self
             .data
@@ -188,11 +188,7 @@ impl ExecutionPlan for MockExec {
         });
 
         // returned stream simply reads off the rx stream
-        let stream = DelayedStream {
-            schema,
-            inner: ReceiverStream::new(rx),
-        };
-        Ok(Box::pin(stream))
+        Ok(RecordBatchReceiverStream::create(&self.schema, rx))
     }
 }
 
@@ -204,29 +200,6 @@ fn clone_error(e: &ArrowError) -> ArrowError {
     }
 }
 
-#[derive(Debug)]
-pub struct DelayedStream {
-    schema: SchemaRef,
-    inner: ReceiverStream<ArrowResult<RecordBatch>>,
-}
-
-impl Stream for DelayedStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        self.inner.poll_next_unpin(cx)
-    }
-}
-
-impl RecordBatchStream for DelayedStream {
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
-    }
-}
-
 /// A Mock ExecutionPlan that does not start producing input until a
 /// barrier is called
 ///
@@ -289,8 +262,6 @@ impl ExecutionPlan for BarrierExec {
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
         assert!(partition < self.data.len());
 
-        let schema = self.schema();
-
         let (tx, rx) = tokio::sync::mpsc::channel(2);
 
         // task simply sends data in order after barrier is reached
@@ -308,11 +279,7 @@ impl ExecutionPlan for BarrierExec {
         });
 
         // returned stream simply reads off the rx stream
-        let stream = DelayedStream {
-            schema,
-            inner: ReceiverStream::new(rx),
-        };
-        Ok(Box::pin(stream))
+        Ok(RecordBatchReceiverStream::create(&self.schema, rx))
     }
 }
 
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 0c33bd4..2a062f6 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2141,6 +2141,54 @@ async fn csv_explain() {
 }
 
 #[tokio::test]
+async fn csv_explain_analyze() {
+    // This test uses the execute function to run an actual plan under EXPLAIN 
ANALYZE
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv_by_sql(&mut ctx).await;
+    let sql = "EXPLAIN ANALYZE SELECT count(*), c1 FROM aggregate_test_100 
group by c1";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let formatted = 
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+    let formatted = normalize_for_explain(&formatted);
+
+    // Only test basic plumbing and try to avoid having to change too
+    // many things
+    let needle = "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), 
metrics=[";
+    assert!(
+        formatted.contains(needle),
+        "did not find '{}' in\n{}",
+        needle,
+        formatted
+    );
+    let verbose_needle = "Output Rows       | 5";
+    assert!(
+        !formatted.contains(verbose_needle),
+        "found unexpected '{}' in\n{}",
+        verbose_needle,
+        formatted
+    );
+}
+
+#[tokio::test]
+async fn csv_explain_analyze_verbose() {
+    // This test uses the execute function to run an actual plan under EXPLAIN 
VERBOSE ANALYZE
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv_by_sql(&mut ctx).await;
+    let sql =
+        "EXPLAIN ANALYZE VERBOSE SELECT count(*), c1 FROM aggregate_test_100 
group by c1";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let formatted = 
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+    let formatted = normalize_for_explain(&formatted);
+
+    let verbose_needle = "Output Rows       | 5";
+    assert!(
+        formatted.contains(verbose_needle),
+        "did not find '{}' in\n{}",
+        verbose_needle,
+        formatted
+    );
+}
+
+#[tokio::test]
 async fn csv_explain_plans() {
     // This test verify the look of each plan in its full cycle plan creation
 

Reply via email to