This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dcc9a1  ARROW-9654: [Rust][DataFusion] Add `EXPLAIN <SQL>` statement
2dcc9a1 is described below

commit 2dcc9a1031a6c81cf5eb1df5835ca5fbc5415ecd
Author: alamb <[email protected]>
AuthorDate: Fri Aug 14 16:16:03 2020 -0600

    ARROW-9654: [Rust][DataFusion] Add `EXPLAIN <SQL>` statement
    
    In order to help users and developers understand what DataFusion's planner 
is doing, this PR adds an `"EXPLAIN PLAN"` feature. All other database systems 
I have worked with have such a feature (e.g. see 
[MySql](https://dev.mysql.com/doc/refman/8.0/en/explain-output.html)).
    
    Example printout (the plans printed are simply the `std::fmt::Debug` 
representation of the plan structures:)
    
    ```
    > explain SELECT status, COUNT(1) FROM http_api_requests_total WHERE path = 
'/api/v2/write' GROUP BY status;
    
+--------------+----------------------------------------------------------------------+
    | plan_type    | plan                                                       
          |
    
+--------------+----------------------------------------------------------------------+
    | logical_plan | Aggregate: groupBy=[[#status]], aggr=[[COUNT(UInt8(1))]]   
          |
    |              |   Selection: #path Eq Utf8("/api/v2/write") And #path Eq 
Utf8("foo") |
    |              |     TableScan: http_api_requests_total projection=None     
          |
    
+--------------+----------------------------------------------------------------------+
    1 rows in set. Query took 0 seconds.
    ```
    and
    
    ```
    > explain VERBOSE SELECT status, COUNT(1) FROM http_api_requests_total 
WHERE path = '/api/v2/write' GROUP BY status;
    
    
+-----------------------------------------+----------------------------------------------------------------------+
    | plan_type                               | plan                            
                                     |
    
+-----------------------------------------+----------------------------------------------------------------------+
    | logical_plan                            | Aggregate: groupBy=[[#status]], 
aggr=[[COUNT(UInt8(1))]]             |
    |                                         |   Selection: #path Eq 
Utf8("/api/v2/write") And #path Eq Utf8("foo") |
    |                                         |     TableScan: 
http_api_requests_total projection=None               |
    | logical_plan after projection_push_down | Aggregate: groupBy=[[#status]], 
aggr=[[COUNT(UInt8(1))]]             |
    |                                         |   Selection: #path Eq 
Utf8("/api/v2/write") And #path Eq Utf8("foo") |
    |                                         |     TableScan: 
http_api_requests_total projection=Some([6, 8])       |
    | logical_plan after type_coercion        | Aggregate: groupBy=[[#status]], 
aggr=[[COUNT(UInt8(1))]]             |
    |                                         |   Selection: #path Eq 
Utf8("/api/v2/write") And #path Eq Utf8("foo") |
    |                                         |     TableScan: 
http_api_requests_total projection=Some([6, 8])       |
    | physical_plan                           | HashAggregateExec {             
                                     |
    |                                         |     group_expr: [               
                                     |
    |                                         |         Column {                
                                     |
    |                                         |             name: "status",     
                                     |
    |                                         |         },                      
                                     |
    |                                         |     ],                          
                                     |
    |                                         |     aggr_expr: [                
                                     |
    |                                         |         Count {                 
                                     |
    |                                         |             expr: Literal {     
                                     |
    |                                         |                 value: UInt8(   
                                     |
    |                                         |                     1,          
                                     |
    |                                         |                 ),              
                                     |
    |                                         |             },                  
                                     |
    |                                         |         },                      
                                     |
    |                                         |     ],                          
                                     |
    |                                         |     input: SelectionExec {      
                                     |
    |                                         |         expr: BinaryExpr {      
                                     |
    |                                         |             left: BinaryExpr {  
                                     |
    |                                         |                 left: Column {  
                                     |
    |                                         |                     name: 
"path",                                    |
    |                                         |                 },              
                                     |
    |                                         |                 op: Eq,         
                                     |
    |                                         |                 right: Literal 
{                                     |
    |                                         |                     value: 
Utf8(                                     |
    |                                         |                         
"/api/v2/write",                             |
    |                                         |                     ),          
                                     |
    |                                         |                 },              
                                     |
    |                                         |             },                  
                                     |
    |                                         |             op: And,            
                                     |
    |                                         |             right: BinaryExpr { 
                                     |
    |                                         |                 left: Column {  
                                     |
    |                                         |                     name: 
"path",                                    |
    |                                         |                 },              
                                     |
    |                                         |                 op: Eq,         
                                     |
    |                                         |                 right: Literal 
{                                     |
    |                                         |                     value: 
Utf8(                                     |
    |                                         |                         "foo",  
                                     |
    |                                         |                     ),          
                                     |
    |                                         |                 },              
                                     |
    |                                         |             },                  
                                     |
    |                                         |         },                      
                                     |
    |                                         |         input: DataSourceExec { 
                                     |
    |                                         |             schema: Schema {    
                                     |
    |                                         |                 fields: [       
                                     |
    |                                         |                     Field {     
                                     |
    |                                         |                         name: 
"path",                                |
    |                                         |                         
data_type: Utf8,                             |
    |                                         |                         
nullable: true,                              |
    |                                         |                         
dict_id: 0,                                  |
    |                                         |                         
dict_is_ordered: false,                      |
    |                                         |                     },          
                                     |
    |                                         |                     Field {     
                                     |
    |                                         |                         name: 
"status",                              |
    |                                         |                         
data_type: Utf8,                             |
    |                                         |                         
nullable: true,                              |
    |                                         |                         
dict_id: 0,                                  |
    |                                         |                         
dict_is_ordered: false,                      |
    |                                         |                     },          
                                     |
    |                                         |                 ],              
                                     |
    |                                         |                 metadata: {},   
                                     |
    |                                         |             },                  
                                     |
    |                                         |             partitions.len: 1,  
                                     |
    |                                         |         },                      
                                     |
    |                                         |     },                          
                                     |
    |                                         |     schema: Schema {            
                                     |
    |                                         |         fields: [               
                                     |
    |                                         |             Field {             
                                     |
    |                                         |                 name: "status", 
                                     |
    |                                         |                 data_type: 
Utf8,                                     |
    |                                         |                 nullable: true, 
                                     |
    |                                         |                 dict_id: 0,     
                                     |
    |                                         |                 
dict_is_ordered: false,                              |
    |                                         |             },                  
                                     |
    |                                         |             Field {             
                                     |
    |                                         |                 name: 
"COUNT(UInt8(1))",                             |
    |                                         |                 data_type: 
UInt64,                                   |
    |                                         |                 nullable: true, 
                                     |
    |                                         |                 dict_id: 0,     
                                     |
    |                                         |                 
dict_is_ordered: false,                              |
    |                                         |             },                  
                                     |
    |                                         |         ],                      
                                     |
    |                                         |         metadata: {},           
                                     |
    |                                         |     },                          
                                     |
    |                                         | }                               
                                     |
    
+-----------------------------------------+----------------------------------------------------------------------+
    
    4 row in set. Query took 0 seconds.
    
    ```
    
    Closes #7959 from alamb/alamb/ARROW-9654-explain
    
    Authored-by: alamb <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/datafusion/src/execution/context.rs           |  28 +++++-
 .../src/execution/physical_plan/explain.rs         |  99 ++++++++++++++++++++
 rust/datafusion/src/execution/physical_plan/mod.rs |   1 +
 rust/datafusion/src/logicalplan.rs                 | 103 ++++++++++++++++++++-
 rust/datafusion/src/optimizer/optimizer.rs         |   2 +
 .../src/optimizer/projection_push_down.rs          |  13 ++-
 rust/datafusion/src/optimizer/type_coercion.rs     |  11 +++
 rust/datafusion/src/optimizer/utils.rs             |  92 +++++++++++++++++-
 rust/datafusion/src/sql/parser.rs                  |  59 +++++++++---
 rust/datafusion/src/sql/planner.rs                 |  30 +++++-
 rust/datafusion/tests/sql.rs                       |  34 +++++++
 11 files changed, 454 insertions(+), 18 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index 9d581bb..56aa810 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -35,6 +35,7 @@ use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::common;
 use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
 use crate::execution::physical_plan::datasource::DatasourceExec;
+use crate::execution::physical_plan::explain::ExplainExec;
 use crate::execution::physical_plan::expressions::{
     Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, 
PhysicalSortExpr, Sum,
 };
@@ -51,7 +52,8 @@ use crate::execution::physical_plan::udf::{ScalarFunction, 
ScalarFunctionExpr};
 use crate::execution::physical_plan::{AggregateExpr, ExecutionPlan, 
PhysicalExpr};
 use crate::execution::table_impl::TableImpl;
 use crate::logicalplan::{
-    Expr, FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder,
+    Expr, FunctionMeta, FunctionType, LogicalPlan, LogicalPlanBuilder, 
PlanType,
+    StringifiedPlan,
 };
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::projection_push_down::ProjectionPushDown;
@@ -471,6 +473,30 @@ impl ExecutionContext {
                     self.config.concurrency,
                 )))
             }
+            LogicalPlan::Explain {
+                verbose,
+                plan,
+                stringified_plans,
+                schema,
+            } => {
+                let input = self.create_physical_plan(plan, batch_size)?;
+
+                let mut stringified_plans = stringified_plans
+                    .iter()
+                    .filter(|s| s.should_display(*verbose))
+                    .map(|s| s.clone())
+                    .collect::<Vec<_>>();
+
+                // add in the physical plan if requested
+                if *verbose {
+                    stringified_plans.push(StringifiedPlan::new(
+                        PlanType::PhysicalPlan,
+                        format!("{:#?}", input),
+                    ));
+                }
+                let schema_ref = Arc::new((**schema).clone());
+                Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
+            }
             _ => Err(ExecutionError::General(
                 "Unsupported logical plan variant".to_string(),
             )),
diff --git a/rust/datafusion/src/execution/physical_plan/explain.rs 
b/rust/datafusion/src/execution/physical_plan/explain.rs
new file mode 100644
index 0000000..c9213d5
--- /dev/null
+++ b/rust/datafusion/src/execution/physical_plan/explain.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the EXPLAIN operator
+
+use crate::error::Result;
+use crate::{
+    execution::physical_plan::{common::RecordBatchIterator, ExecutionPlan, 
Partition},
+    logicalplan::StringifiedPlan,
+};
+use arrow::{
+    array::StringArray,
+    datatypes::SchemaRef,
+    record_batch::{RecordBatch, RecordBatchReader},
+};
+
+use std::sync::{Arc, Mutex};
+
+/// Explain execution plan operator. This operator contains the string
+/// values of the various plans it has when it is created, and passes
+/// them to its output.
+#[derive(Debug)]
+pub struct ExplainExec {
+    /// The schema that this exec plan node outputs
+    schema: SchemaRef,
+
+    /// The strings to be printed
+    stringified_plans: Vec<StringifiedPlan>,
+}
+
+impl ExplainExec {
+    /// Create a new MergeExec
+    pub fn new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> 
Self {
+        ExplainExec {
+            schema,
+            stringified_plans,
+        }
+    }
+}
+
+impl ExecutionPlan for ExplainExec {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
+        Ok(vec![Arc::new(ExplainPartition {
+            schema: self.schema.clone(),
+            stringified_plans: self.stringified_plans.clone(),
+        })])
+    }
+}
+
+#[derive(Debug)]
+struct ExplainPartition {
+    /// Input schema
+    schema: SchemaRef,
+    /// The various plans that were created.
+    stringified_plans: Vec<StringifiedPlan>,
+}
+
+impl Partition for ExplainPartition {
+    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + 
Sync>>> {
+        let mut type_builder = 
StringArray::builder(self.stringified_plans.len());
+        let mut plan_builder = 
StringArray::builder(self.stringified_plans.len());
+
+        for p in &self.stringified_plans {
+            type_builder.append_value(&String::from(&p.plan_type))?;
+            plan_builder.append_value(&p.plan)?;
+        }
+
+        let record_batch = RecordBatch::try_new(
+            self.schema.clone(),
+            vec![
+                Arc::new(type_builder.finish()),
+                Arc::new(plan_builder.finish()),
+            ],
+        )?;
+
+        Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
+            self.schema.clone(),
+            vec![Arc::new(record_batch)],
+        ))))
+    }
+}
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs 
b/rust/datafusion/src/execution/physical_plan/mod.rs
index 7f5a9fb..96fd851 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -80,6 +80,7 @@ pub trait Accumulator: Debug {
 pub mod common;
 pub mod csv;
 pub mod datasource;
+pub mod explain;
 pub mod expressions;
 pub mod hash_aggregate;
 pub mod limit;
diff --git a/rust/datafusion/src/logicalplan.rs 
b/rust/datafusion/src/logicalplan.rs
index d13e51b..63397c0 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -21,7 +21,7 @@
 //! Logical query plans can then be optimized and executed directly, or 
translated into
 //! physical query plans and executed.
 
-use std::fmt;
+use std::{fmt, sync::Arc};
 
 use arrow::datatypes::{DataType, Field, Schema};
 
@@ -784,6 +784,18 @@ pub enum LogicalPlan {
         /// Whether the CSV file contains a header
         has_header: bool,
     },
+    /// Produces a relation with string representations of
+    /// various parts of the plan
+    Explain {
+        /// Should extra (detailed, intermediate plans) be included?
+        verbose: bool,
+        /// The logical plan that is being EXPLAIN'd
+        plan: Box<LogicalPlan>,
+        /// Represent the various stages plans have gone through
+        stringified_plans: Vec<StringifiedPlan>,
+        /// The output schema of the explain (2 columns of text)
+        schema: Box<Schema>,
+    },
 }
 
 impl LogicalPlan {
@@ -809,8 +821,17 @@ impl LogicalPlan {
             LogicalPlan::Sort { schema, .. } => &schema,
             LogicalPlan::Limit { schema, .. } => &schema,
             LogicalPlan::CreateExternalTable { schema, .. } => &schema,
+            LogicalPlan::Explain { schema, .. } => &schema,
         }
     }
+
+    /// Returns the (fixed) output schema for explain plans
+    pub fn explain_schema() -> Box<Schema> {
+        Box::new(Schema::new(vec![
+            Field::new("plan_type", DataType::Utf8, false),
+            Field::new("plan", DataType::Utf8, false),
+        ]))
+    }
 }
 
 impl LogicalPlan {
@@ -899,6 +920,10 @@ impl LogicalPlan {
             LogicalPlan::CreateExternalTable { ref name, .. } => {
                 write!(f, "CreateExternalTable: {:?}", name)
             }
+            LogicalPlan::Explain { ref plan, .. } => {
+                write!(f, "Explain")?;
+                plan.fmt_with_indent(f, indent + 1)
+            }
         }
     }
 }
@@ -1128,6 +1153,58 @@ impl LogicalPlanBuilder {
     }
 }
 
+/// Represents which type of plan
+#[derive(Debug, Clone, PartialEq)]
+pub enum PlanType {
+    /// The initial LogicalPlan provided to DataFusion
+    LogicalPlan,
+    /// The LogicalPlan which results from applying an optimizer pass
+    OptimizedLogicalPlan {
+        /// The name of the optimizer which produced this plan
+        optimizer_name: String,
+    },
+    /// The physical plan, prepared for execution
+    PhysicalPlan,
+}
+
+impl From<&PlanType> for String {
+    fn from(t: &PlanType) -> Self {
+        match t {
+            PlanType::LogicalPlan => "logical_plan".into(),
+            PlanType::OptimizedLogicalPlan { optimizer_name } => {
+                format!("logical_plan after {}", optimizer_name)
+            }
+            PlanType::PhysicalPlan => "physical_plan".into(),
+        }
+    }
+}
+
+/// Represents some sort of execution plan, in String form
+#[derive(Debug, Clone, PartialEq)]
+pub struct StringifiedPlan {
+    /// An identifier of what type of plan this string represents
+    pub plan_type: PlanType,
+    /// The string representation of the plan
+    pub plan: Arc<String>,
+}
+
+impl StringifiedPlan {
+    /// Create a new Stringified plan of `plan_type` with string
+    /// representation `plan`
+    pub fn new(plan_type: PlanType, plan: impl Into<String>) -> Self {
+        StringifiedPlan {
+            plan_type,
+            plan: Arc::new(plan.into()),
+        }
+    }
+
+    /// returns true if this plan should be displayed. Generally
+    /// `verbose_mode = true` will display all available plans
+    pub fn should_display(&self, verbose_mode: bool) -> bool {
+        self.plan_type == PlanType::LogicalPlan || verbose_mode
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -1237,4 +1314,28 @@ mod tests {
             Field::new("salary", DataType::Int32, false),
         ])
     }
+
+    #[test]
+    fn stringified_plan() -> Result<()> {
+        let stringified_plan =
+            StringifiedPlan::new(PlanType::LogicalPlan, "...the plan...");
+        assert!(stringified_plan.should_display(true));
+        assert!(stringified_plan.should_display(false)); // display in non 
verbose mode too
+
+        let stringified_plan =
+            StringifiedPlan::new(PlanType::PhysicalPlan, "...the plan...");
+        assert!(stringified_plan.should_display(true));
+        assert!(!stringified_plan.should_display(false));
+
+        let stringified_plan = StringifiedPlan::new(
+            PlanType::OptimizedLogicalPlan {
+                optimizer_name: "random opt pass".into(),
+            },
+            "...the plan...",
+        );
+        assert!(stringified_plan.should_display(true));
+        assert!(!stringified_plan.should_display(false));
+
+        Ok(())
+    }
 }
diff --git a/rust/datafusion/src/optimizer/optimizer.rs 
b/rust/datafusion/src/optimizer/optimizer.rs
index e041ce3..fd655f0 100644
--- a/rust/datafusion/src/optimizer/optimizer.rs
+++ b/rust/datafusion/src/optimizer/optimizer.rs
@@ -25,4 +25,6 @@ use crate::logicalplan::LogicalPlan;
 pub trait OptimizerRule {
     /// Perform optimizations on the plan
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan>;
+    /// Produce a human readable name for this optimizer rule
+    fn name(&self) -> &str;
 }
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs 
b/rust/datafusion/src/optimizer/projection_push_down.rs
index e99a996..48f1960 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -25,6 +25,7 @@ use crate::optimizer::utils;
 use arrow::datatypes::{Field, Schema};
 use arrow::error::Result as ArrowResult;
 use std::collections::HashSet;
+use utils::optimize_explain;
 
 /// Projection Push Down optimizer rule ensures that only referenced columns 
are
 /// loaded into memory
@@ -36,6 +37,10 @@ impl OptimizerRule for ProjectionPushDown {
         let mut accum: HashSet<String> = HashSet::new();
         self.optimize_plan(plan, &mut accum, false)
     }
+
+    fn name(&self) -> &str {
+        return "projection_push_down";
+    }
 }
 
 impl ProjectionPushDown {
@@ -45,7 +50,7 @@ impl ProjectionPushDown {
     }
 
     fn optimize_plan(
-        &self,
+        &mut self,
         plan: &LogicalPlan,
         accum: &mut HashSet<String>,
         has_projection: bool,
@@ -186,6 +191,12 @@ impl ProjectionPushDown {
                 })
             }
             LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
+            LogicalPlan::Explain {
+                verbose,
+                plan,
+                stringified_plans,
+                schema,
+            } => optimize_explain(self, *verbose, &*plan, stringified_plans, 
&*schema),
         }
     }
 }
diff --git a/rust/datafusion/src/optimizer/type_coercion.rs 
b/rust/datafusion/src/optimizer/type_coercion.rs
index 676e712..a3e1ab9 100644
--- a/rust/datafusion/src/optimizer/type_coercion.rs
+++ b/rust/datafusion/src/optimizer/type_coercion.rs
@@ -30,6 +30,7 @@ use crate::logicalplan::LogicalPlan;
 use crate::logicalplan::{Expr, LogicalPlanBuilder};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
+use utils::optimize_explain;
 
 /// Implementation of type coercion optimizer rule
 pub struct TypeCoercionRule<'a> {
@@ -183,8 +184,18 @@ impl<'a> OptimizerRule for TypeCoercionRule<'a> {
             LogicalPlan::CsvScan { .. } => Ok(plan.clone()),
             LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
             LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
+            LogicalPlan::Explain {
+                verbose,
+                plan,
+                stringified_plans,
+                schema,
+            } => optimize_explain(self, *verbose, &*plan, stringified_plans, 
&*schema),
         }
     }
+
+    fn name(&self) -> &str {
+        return "type_coercion";
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/datafusion/src/optimizer/utils.rs 
b/rust/datafusion/src/optimizer/utils.rs
index 6c9d409..c6227a1 100644
--- a/rust/datafusion/src/optimizer/utils.rs
+++ b/rust/datafusion/src/optimizer/utils.rs
@@ -19,10 +19,11 @@
 
 use std::collections::HashSet;
 
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, Schema};
 
+use super::optimizer::OptimizerRule;
 use crate::error::{ExecutionError, Result};
-use crate::logicalplan::Expr;
+use crate::logicalplan::{Expr, LogicalPlan, PlanType, StringifiedPlan};
 
 /// Recursively walk a list of expression trees, collecting the unique set of 
column
 /// names referenced in the expression
@@ -183,10 +184,39 @@ fn _get_supertype(l: &DataType, r: &DataType) -> 
Option<DataType> {
     }
 }
 
+/// Create a `LogicalPlan::Explain` node by running `optimizer` on the
+/// input plan and capturing the resulting plan string
+pub fn optimize_explain(
+    optimizer: &mut impl OptimizerRule,
+    verbose: bool,
+    plan: &LogicalPlan,
+    stringified_plans: &Vec<StringifiedPlan>,
+    schema: &Schema,
+) -> Result<LogicalPlan> {
+    // These are the fields of LogicalPlan::Explain It might be nice
+    // to transform that enum Variant into its own struct and avoid
+    // passing the fields individually
+    let plan = Box::new(optimizer.optimize(plan)?);
+    let mut stringified_plans = stringified_plans.clone();
+    let optimizer_name = optimizer.name().into();
+    stringified_plans.push(StringifiedPlan::new(
+        PlanType::OptimizedLogicalPlan { optimizer_name },
+        format!("{:#?}", plan),
+    ));
+    let schema = Box::new(schema.clone());
+
+    Ok(LogicalPlan::Explain {
+        verbose,
+        plan,
+        stringified_plans,
+        schema,
+    })
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::logicalplan::col;
+    use crate::logicalplan::{col, LogicalPlanBuilder};
     use arrow::datatypes::DataType;
     use std::collections::HashSet;
 
@@ -211,4 +241,60 @@ mod tests {
         assert!(accum.contains("a"));
         Ok(())
     }
+
+    struct TestOptimizer {}
+
+    impl OptimizerRule for TestOptimizer {
+        fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+            Ok(plan.clone())
+        }
+
+        fn name(&self) -> &str {
+            return "test_optimizer";
+        }
+    }
+
+    #[test]
+    fn test_optimize_explain() -> Result<()> {
+        let mut optimizer = TestOptimizer {};
+
+        let empty_plan = LogicalPlanBuilder::empty().build()?;
+        let schema = LogicalPlan::explain_schema();
+
+        let optimized_explain = optimize_explain(
+            &mut optimizer,
+            true,
+            &empty_plan,
+            &vec![StringifiedPlan::new(PlanType::LogicalPlan, "...")],
+            &*schema,
+        )?;
+
+        match &optimized_explain {
+            LogicalPlan::Explain {
+                verbose,
+                stringified_plans,
+                ..
+            } => {
+                assert_eq!(*verbose, true);
+
+                let expected_stringified_plans = vec![
+                    StringifiedPlan::new(PlanType::LogicalPlan, "..."),
+                    StringifiedPlan::new(
+                        PlanType::OptimizedLogicalPlan {
+                            optimizer_name: "test_optimizer".into(),
+                        },
+                        "EmptyRelation",
+                    ),
+                ];
+                assert_eq!(*stringified_plans, expected_stringified_plans);
+            }
+            _ => assert!(
+                false,
+                "Expected explain plan but got {:?}",
+                optimized_explain
+            ),
+        }
+
+        Ok(())
+    }
 }
diff --git a/rust/datafusion/src/sql/parser.rs 
b/rust/datafusion/src/sql/parser.rs
index e8089a8..c73ea81 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -59,6 +59,15 @@ pub struct CreateExternalTable {
     pub location: String,
 }
 
+/// DataFusion extension DDL for `EXPLAIN` and `EXPLAIN VERBOSE`
+#[derive(Debug, Clone, PartialEq)]
+pub struct ExplainPlan {
+    /// If true, dumps more intermediate plans and results of optimizaton 
passes
+    pub verbose: bool,
+    /// The statement for which to generate an planning explination
+    pub statement: Box<Statement>,
+}
+
 /// DataFusion Statement representations.
 ///
 /// Tokens parsed by `DFParser` are converted into these values.
@@ -68,6 +77,8 @@ pub enum Statement {
     Statement(SQLStatement),
     /// Extension: `CREATE EXTERNAL TABLE`
     CreateExternalTable(CreateExternalTable),
+    /// Extension: `EXPLAIN <SQL>`
+    Explain(ExplainPlan),
 }
 
 /// SQL Parser
@@ -121,18 +132,24 @@ impl DFParser {
     /// Parse a new expression
     pub fn parse_statement(&mut self) -> Result<Statement, ParserError> {
         match self.parser.peek_token() {
-            Token::Word(w) => match w.keyword {
-                Keyword::CREATE => {
-                    // move one token forward
-                    self.parser.next_token();
-                    // use custom parsing
-                    Ok(self.parse_create()?)
-                }
-                _ => {
-                    // use the native parser
-                    Ok(Statement::Statement(self.parser.parse_statement()?))
+            Token::Word(w) => {
+                match w.keyword {
+                    Keyword::CREATE => {
+                        // move one token forward
+                        self.parser.next_token();
+                        // use custom parsing
+                        self.parse_create()
+                    }
+                    Keyword::NoKeyword if w.value.to_uppercase() == "EXPLAIN" 
=> {
+                        self.parser.next_token();
+                        self.parse_explain()
+                    }
+                    _ => {
+                        // use the native parser
+                        
Ok(Statement::Statement(self.parser.parse_statement()?))
+                    }
                 }
-            },
+            }
             _ => {
                 // use the native parser
                 Ok(Statement::Statement(self.parser.parse_statement()?))
@@ -149,6 +166,26 @@ impl DFParser {
         }
     }
 
+    /// Parse an SQL EXPLAIN statement.
+    pub fn parse_explain(&mut self) -> Result<Statement, ParserError> {
+        // Parser is at the token immediately after EXPLAIN
+        // Check for EXPLAIN VERBOSE
+        let verbose = match self.parser.peek_token() {
+            Token::Word(w) => match w.keyword {
+                Keyword::NoKeyword if w.value.to_uppercase() == "VERBOSE" => {
+                    self.parser.next_token();
+                    true
+                }
+                _ => false,
+            },
+            _ => false,
+        };
+
+        let statement = Box::new(self.parse_statement()?);
+        let explain_plan = ExplainPlan { statement, verbose };
+        Ok(Statement::Explain(explain_plan))
+    }
+
     // This is a copy of the equivalent implementation in sqlparser.
     fn parse_columns(
         &mut self,
diff --git a/rust/datafusion/src/sql/planner.rs 
b/rust/datafusion/src/sql/planner.rs
index 748262c..16da452 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -22,12 +22,14 @@ use std::sync::Arc;
 use crate::error::{ExecutionError, Result};
 use crate::logicalplan::Expr::Alias;
 use crate::logicalplan::{
-    lit, Expr, FunctionMeta, LogicalPlan, LogicalPlanBuilder, Operator, 
ScalarValue,
+    lit, Expr, FunctionMeta, LogicalPlan, LogicalPlanBuilder, Operator, 
PlanType,
+    ScalarValue, StringifiedPlan,
 };
 use crate::sql::parser::{CreateExternalTable, FileType, Statement as 
DFStatement};
 
 use arrow::datatypes::*;
 
+use super::parser::ExplainPlan;
 use sqlparser::ast::{
     BinaryOperator, DataType as SQLDataType, Expr as SQLExpr, Query, Select, 
SelectItem,
     SetExpr, TableFactor, TableWithJoins, UnaryOperator, Value,
@@ -60,6 +62,7 @@ impl<S: SchemaProvider> SqlToRel<S> {
         match statement {
             DFStatement::CreateExternalTable(s) => 
self.external_table_to_plan(&s),
             DFStatement::Statement(s) => self.sql_statement_to_plan(&s),
+            DFStatement::Explain(s) => self.explain_statement_to_plan(&(*s)),
         }
     }
 
@@ -131,6 +134,31 @@ impl<S: SchemaProvider> SqlToRel<S> {
         })
     }
 
+    /// Generate a plan for EXPLAIN ... that will print out a plan
+    ///
+    pub fn explain_statement_to_plan(
+        &self,
+        explain_plan: &ExplainPlan,
+    ) -> Result<LogicalPlan> {
+        let verbose = explain_plan.verbose;
+        let plan = self.statement_to_plan(&explain_plan.statement)?;
+
+        let stringified_plans = vec![StringifiedPlan::new(
+            PlanType::LogicalPlan,
+            format!("{:#?}", plan),
+        )];
+
+        let schema = LogicalPlan::explain_schema();
+        let plan = Box::new(plan);
+
+        Ok(LogicalPlan::Explain {
+            verbose,
+            plan,
+            stringified_plans,
+            schema,
+        })
+    }
+
     fn build_schema(&self, columns: &Vec<SQLColumnDef>) -> Result<Schema> {
         let mut fields = Vec::new();
 
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 57e3ee6..5b3dc1d 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -432,6 +432,40 @@ fn csv_query_count_one() {
     assert_eq!(expected, actual);
 }
 
+#[test]
+fn csv_explain() {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv_by_sql(&mut ctx);
+    let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10";
+    let actual = execute(&mut ctx, sql).join("\n");
+    let expected = "\"logical_plan\"\t\"Projection: #c1\\n  Selection: #c2 Gt 
Int64(10)\\n    TableScan: aggregate_test_100 projection=None\"".to_string();
+    assert_eq!(expected, actual);
+
+    // Also, expect same result with lowercase explain
+    let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10";
+    let actual = execute(&mut ctx, sql).join("\n");
+    assert_eq!(expected, actual);
+}
+
+#[test]
+fn csv_explain_verbose() {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv_by_sql(&mut ctx);
+    let sql = "EXPLAIN VERBOSE SELECT c1 FROM aggregate_test_100 where c2 > 
10";
+    let actual = execute(&mut ctx, sql).join("\n");
+    // Don't actually test the contents of the debuging output (as
+    // that may change and keeping this test updated will be a
+    // pain). Instead just check for a few key pieces.
+    assert!(actual.contains("logical_plan"), "Actual: '{}'", actual);
+    assert!(actual.contains("physical_plan"), "Actual: '{}'", actual);
+    assert!(actual.contains("type_coercion"), "Actual: '{}'", actual);
+    assert!(
+        actual.contains("CAST(#c2 AS Int64) Gt Int64(10)"),
+        "Actual: '{}'",
+        actual
+    );
+}
+
 fn aggr_test_schema() -> SchemaRef {
     Arc::new(Schema::new(vec![
         Field::new("c1", DataType::Utf8, false),

Reply via email to