This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bd3666b Add support for EXPLAIN ANALYZE (#858)
bd3666b is described below
commit bd3666b30f4c2ed4323c59707053aa50f2ff6121
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 12 12:42:58 2021 -0400
Add support for EXPLAIN ANALYZE (#858)
* Add support for EXPLAIN ANALYZE
* fix fmt
---
ballista/rust/core/proto/ballista.proto | 6 +
.../rust/core/src/serde/logical_plan/from_proto.rs | 9 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 41 ++++-
.../rust/core/src/serde/logical_plan/to_proto.rs | 11 ++
datafusion/src/dataframe.rs | 6 +-
datafusion/src/execution/context.rs | 2 +-
datafusion/src/execution/dataframe_impl.rs | 6 +-
datafusion/src/logical_plan/builder.rs | 36 ++--
datafusion/src/logical_plan/plan.rs | 16 ++
datafusion/src/optimizer/constant_folding.rs | 1 +
datafusion/src/optimizer/filter_push_down.rs | 1 +
datafusion/src/optimizer/hash_build_probe_order.rs | 6 +
datafusion/src/optimizer/projection_push_down.rs | 25 +++
datafusion/src/optimizer/utils.rs | 3 +-
datafusion/src/physical_plan/analyze.rs | 201 +++++++++++++++++++++
datafusion/src/physical_plan/mod.rs | 2 +
datafusion/src/physical_plan/parquet.rs | 34 +---
datafusion/src/physical_plan/planner.rs | 12 +-
datafusion/src/physical_plan/stream.rs | 64 +++++++
datafusion/src/sql/planner.rs | 33 ++--
datafusion/src/test/exec.rs | 47 +----
datafusion/tests/sql.rs | 48 +++++
22 files changed, 505 insertions(+), 105 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 2538a10..a1608c6 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -249,6 +249,7 @@ message LogicalPlanNode {
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
+ AnalyzeNode analyze = 14;
ExplainNode explain = 12;
WindowNode window = 13;
}
@@ -323,6 +324,11 @@ enum FileType{
CSV = 2;
}
+message AnalyzeNode {
+ LogicalPlanNode input = 1;
+ bool verbose = 2;
+}
+
message ExplainNode{
LogicalPlanNode input = 1;
bool verbose = 2;
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 31b8b6d..f9761a2 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -231,10 +231,17 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
has_header: create_extern_table.has_header,
})
}
+ LogicalPlanType::Analyze(analyze) => {
+ let input: LogicalPlan = convert_box_required!(analyze.input)?;
+ LogicalPlanBuilder::from(input)
+ .explain(analyze.verbose, true)?
+ .build()
+ .map_err(|e| e.into())
+ }
LogicalPlanType::Explain(explain) => {
let input: LogicalPlan = convert_box_required!(explain.input)?;
LogicalPlanBuilder::from(input)
- .explain(explain.verbose)?
+ .explain(explain.verbose, false)?
.build()
.map_err(|e| e.into())
}
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index e4e4383..dbaac1d 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -662,6 +662,43 @@ mod roundtrip_tests {
}
#[test]
+ fn roundtrip_analyze() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("first_name", DataType::Utf8, false),
+ Field::new("last_name", DataType::Utf8, false),
+ Field::new("state", DataType::Utf8, false),
+ Field::new("salary", DataType::Int32, false),
+ ]);
+
+ let verbose_plan = LogicalPlanBuilder::scan_csv(
+ "employee.csv",
+ CsvReadOptions::new().schema(&schema).has_header(true),
+ Some(vec![3, 4]),
+ )
+ .and_then(|plan| plan.sort(vec![col("salary")]))
+ .and_then(|plan| plan.explain(true, true))
+ .and_then(|plan| plan.build())
+ .map_err(BallistaError::DataFusionError)?;
+
+ let plan = LogicalPlanBuilder::scan_csv(
+ "employee.csv",
+ CsvReadOptions::new().schema(&schema).has_header(true),
+ Some(vec![3, 4]),
+ )
+ .and_then(|plan| plan.sort(vec![col("salary")]))
+ .and_then(|plan| plan.explain(false, true))
+ .and_then(|plan| plan.build())
+ .map_err(BallistaError::DataFusionError)?;
+
+ roundtrip_test!(plan);
+
+ roundtrip_test!(verbose_plan);
+
+ Ok(())
+ }
+
+ #[test]
fn roundtrip_explain() -> Result<()> {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
@@ -677,7 +714,7 @@ mod roundtrip_tests {
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.explain(true))
+ .and_then(|plan| plan.explain(true, false))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;
@@ -687,7 +724,7 @@ mod roundtrip_tests {
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.explain(false))
+ .and_then(|plan| plan.explain(false, false))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 1a3834a..e1c7f53 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -931,6 +931,17 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)),
})
}
+ LogicalPlan::Analyze { verbose, input, .. } => {
+ let input: protobuf::LogicalPlanNode =
input.as_ref().try_into()?;
+ Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
+ protobuf::AnalyzeNode {
+ input: Some(Box::new(input)),
+ verbose: *verbose,
+ },
+ ))),
+ })
+ }
LogicalPlan::Explain { verbose, plan, .. } => {
let input: protobuf::LogicalPlanNode =
plan.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 1d4cffd..608f6db 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -289,6 +289,8 @@ pub trait DataFrame: Send + Sync {
/// Return a DataFrame with the explanation of its plan so far.
///
+ /// if `analyze` is specified, runs the plan and reports metrics
+ ///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
@@ -296,11 +298,11 @@ pub trait DataFrame: Send + Sync {
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
- /// let batches = df.limit(100)?.explain(false)?.collect().await?;
+ /// let batches = df.limit(100)?.explain(false, false)?.collect().await?;
/// # Ok(())
/// # }
/// ```
- fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>>;
+ fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn
DataFrame>>;
/// Return a `FunctionRegistry` used to plan udf's calls
///
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index 0cf8b3b..7a54bea 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -986,7 +986,7 @@ mod tests {
let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema,
None)
.unwrap()
- .explain(true)
+ .explain(true, false)
.unwrap()
.build()
.unwrap();
diff --git a/datafusion/src/execution/dataframe_impl.rs
b/datafusion/src/execution/dataframe_impl.rs
index 1c0094b..ddaa04e 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -183,9 +183,9 @@ impl DataFrame for DataFrameImpl {
self.plan.schema()
}
- fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
+ fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn
DataFrame>> {
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
- .explain(verbose)?
+ .explain(verbose, analyze)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}
@@ -318,7 +318,7 @@ mod tests {
let df = df
.select_columns(&["c1", "c2", "c11"])?
.limit(10)?
- .explain(false)?;
+ .explain(false, false)?;
let plan = df.to_logical_plan();
// build query using SQL
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index 0dfc1e7..d9afe2e 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -455,18 +455,32 @@ impl LogicalPlanBuilder {
}
/// Create an expression to represent the explanation of the plan
- pub fn explain(&self, verbose: bool) -> Result<Self> {
- let stringified_plans =
- vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
-
+ ///
+ /// if `analyze` is true, runs the actual plan and produces
+ /// information about metrics during run.
+ ///
+ /// if `verbose` is true, prints out additional details.
+ pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Self> {
let schema = LogicalPlan::explain_schema();
-
- Ok(Self::from(LogicalPlan::Explain {
- verbose,
- plan: Arc::new(self.plan.clone()),
- stringified_plans,
- schema: schema.to_dfschema_ref()?,
- }))
+ let schema = schema.to_dfschema_ref()?;
+
+ if analyze {
+ Ok(Self::from(LogicalPlan::Analyze {
+ verbose,
+ input: Arc::new(self.plan.clone()),
+ schema,
+ }))
+ } else {
+ let stringified_plans =
+ vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
+
+ Ok(Self::from(LogicalPlan::Explain {
+ verbose,
+ plan: Arc::new(self.plan.clone()),
+ stringified_plans,
+ schema,
+ }))
+ }
}
/// Build the plan
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index 28405fb..cb81b8d 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -213,6 +213,16 @@ pub enum LogicalPlan {
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
+ /// Runs the actual plan, and then prints the physical plan with
+ /// with execution metrics.
+ Analyze {
+ /// Should extra detail be included?
+ verbose: bool,
+ /// The logical plan that is being EXPLAIN ANALYZE'd
+ input: Arc<LogicalPlan>,
+ /// The output schema of the explain (2 columns of text)
+ schema: DFSchemaRef,
+ },
/// Extension operator defined outside of DataFusion
Extension {
/// The runtime extension operator
@@ -239,6 +249,7 @@ impl LogicalPlan {
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => schema,
LogicalPlan::Explain { schema, .. } => schema,
+ LogicalPlan::Analyze { schema, .. } => schema,
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Union { schema, .. } => schema,
}
@@ -278,6 +289,7 @@ impl LogicalPlan {
}
LogicalPlan::Extension { node } => vec![node.schema()],
LogicalPlan::Explain { schema, .. }
+ | LogicalPlan::Analyze { schema, .. }
| LogicalPlan::EmptyRelation { schema, .. }
| LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
LogicalPlan::Limit { input, .. }
@@ -327,6 +339,7 @@ impl LogicalPlan {
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CrossJoin { .. }
+ | LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union { .. } => {
vec![]
@@ -350,6 +363,7 @@ impl LogicalPlan {
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain { plan, .. } => vec![plan],
+ LogicalPlan::Analyze { input: plan, .. } => vec![plan],
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
@@ -495,6 +509,7 @@ impl LogicalPlan {
true
}
LogicalPlan::Explain { plan, .. } => plan.accept(visitor)?,
+ LogicalPlan::Analyze { input: plan, .. } => plan.accept(visitor)?,
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
@@ -790,6 +805,7 @@ impl LogicalPlan {
write!(f, "CreateExternalTable: {:?}", name)
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
+ LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
LogicalPlan::Extension { ref node } =>
node.fmt_for_explain(f),
}
diff --git a/datafusion/src/optimizer/constant_folding.rs
b/datafusion/src/optimizer/constant_folding.rs
index b4c4a96..31b0b7f 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -80,6 +80,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
+ | LogicalPlan::Analyze { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. }
diff --git a/datafusion/src/optimizer/filter_push_down.rs
b/datafusion/src/optimizer/filter_push_down.rs
index 039e92d..d0990de 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -284,6 +284,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
// push the optimization to the plan of this explain
push_down(&state, plan)
}
+ LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter { input, predicate } => {
let mut predicates = vec![];
split_members(predicate, &mut predicates);
diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs
b/datafusion/src/optimizer/hash_build_probe_order.rs
index ecb3b40..209faf4 100644
--- a/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -80,6 +80,11 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize>
{
// we cannot predict how rows will be repartitioned
None
}
+ LogicalPlan::Analyze { .. } => {
+ // Analyze produces one row, verbose produces more
+ // but it should never be used as an input to a Join anyways
+ None
+ }
// the following operators are special cases and not querying data
LogicalPlan::CreateExternalTable { .. } => None,
LogicalPlan::Explain { .. } => None,
@@ -201,6 +206,7 @@ impl OptimizerRule for HashBuildProbeOrder {
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. }
+ | LogicalPlan::Analyze { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index 96c5094..7dddbff 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -356,6 +356,31 @@ fn optimize_plan(
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the
plan".to_string(),
)),
+ LogicalPlan::Analyze {
+ input,
+ verbose,
+ schema,
+ } => {
+ // make sure we keep all the columns from the input plan
+ let required_columns = input
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| f.qualified_column())
+ .collect::<HashSet<Column>>();
+
+ Ok(LogicalPlan::Analyze {
+ input: Arc::new(optimize_plan(
+ optimizer,
+ input,
+ &required_columns,
+ false,
+ execution_props,
+ )?),
+ verbose: *verbose,
+ schema: schema.clone(),
+ })
+ }
LogicalPlan::Union {
inputs,
schema,
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index 615f0cc..8ce6fe5 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -198,7 +198,8 @@ pub fn from_plan(
LogicalPlan::EmptyRelation { .. }
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable { .. }
- | LogicalPlan::Explain { .. } => Ok(plan.clone()),
+ | LogicalPlan::Explain { .. }
+ | LogicalPlan::Analyze { .. } => Ok(plan.clone()),
}
}
diff --git a/datafusion/src/physical_plan/analyze.rs
b/datafusion/src/physical_plan/analyze.rs
new file mode 100644
index 0000000..36726ad
--- /dev/null
+++ b/datafusion/src/physical_plan/analyze.rs
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the ANALYZE operator
+
+use std::sync::Arc;
+use std::{any::Any, time::Instant};
+
+use crate::{
+ error::{DataFusionError, Result},
+ physical_plan::{display::DisplayableExecutionPlan, Partitioning},
+ physical_plan::{DisplayFormatType, ExecutionPlan},
+};
+use arrow::{array::StringBuilder, datatypes::SchemaRef,
record_batch::RecordBatch};
+use futures::StreamExt;
+
+use super::{stream::RecordBatchReceiverStream, Distribution,
SendableRecordBatchStream};
+use async_trait::async_trait;
+
+/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
+/// discards the results, and then prints out an annotated plan with metrics
+#[derive(Debug, Clone)]
+pub struct AnalyzeExec {
+ /// control how much extra to print
+ verbose: bool,
+ /// The input plan (the plan being analyzed)
+ input: Arc<dyn ExecutionPlan>,
+ /// The output schema for RecordBatches of this exec node
+ schema: SchemaRef,
+}
+
+impl AnalyzeExec {
+ /// Create a new AnalyzeExec
+ pub fn new(verbose: bool, input: Arc<dyn ExecutionPlan>, schema:
SchemaRef) -> Self {
+ AnalyzeExec {
+ verbose,
+ input,
+ schema,
+ }
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for AnalyzeExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ /// Specifies we want the input as a single stream
+ fn required_child_distribution(&self) -> Distribution {
+ Distribution::SinglePartition
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ fn with_new_children(
+ &self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if children.len() == 1 {
+ Ok(Arc::new(Self::new(
+ self.verbose,
+ children.pop().unwrap(),
+ self.schema.clone(),
+ )))
+ } else {
+ Err(DataFusionError::Internal(format!(
+ "Invalid child count for AnalyzeExec. Expected 1 got {}",
+ children.len()
+ )))
+ }
+ }
+
+ async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
+ if 0 != partition {
+ return Err(DataFusionError::Internal(format!(
+ "AnalyzeExec invalid partition. Expected 0, got {}",
+ partition
+ )));
+ }
+
+ // should be ensured by `SinglePartition` above
+ let input_partitions =
self.input.output_partitioning().partition_count();
+ if input_partitions != 1 {
+ return Err(DataFusionError::Internal(format!(
+ "AnalyzeExec invalid number of input partitions. Expected 1,
got {}",
+ input_partitions
+ )));
+ }
+
+ let (tx, rx) = tokio::sync::mpsc::channel(input_partitions);
+
+ let captured_input = self.input.clone();
+ let mut input_stream = captured_input.execute(0).await?;
+ let captured_schema = self.schema.clone();
+ let verbose = self.verbose;
+
+ // Task reads batches the input and when complete produce a
+ // RecordBatch with a report that is written to `tx` when done
+ tokio::task::spawn(async move {
+ let start = Instant::now();
+ let mut total_rows = 0;
+
+ // Note the code below ignores errors sending on tx. An
+ // error sending means the plan is being torn down and
+ // nothing is left that will handle the error (aka no one
+ // will hear us scream)
+ while let Some(b) = input_stream.next().await {
+ match b {
+ Ok(batch) => {
+ total_rows += batch.num_rows();
+ }
+ b @ Err(_) => {
+ // try and pass on errors from input
+ if tx.send(b).await.is_err() {
+ // receiver hung up, stop executing (no
+ // one will look at any further results we
+ // send)
+ return;
+ }
+ }
+ }
+ }
+ let end = Instant::now();
+
+ let mut type_builder = StringBuilder::new(1);
+ let mut plan_builder = StringBuilder::new(1);
+
+ // TODO use some sort of enum rather than strings?
+ type_builder.append_value("Plan with Metrics").unwrap();
+
+ let annotated_plan =
+ DisplayableExecutionPlan::with_metrics(captured_input.as_ref())
+ .indent()
+ .to_string();
+ plan_builder.append_value(annotated_plan).unwrap();
+
+ // Verbose output
+ // TODO make this more sophisticated
+ if verbose {
+ type_builder.append_value("Output Rows").unwrap();
+ plan_builder.append_value(total_rows.to_string()).unwrap();
+
+ type_builder.append_value("Duration").unwrap();
+ plan_builder
+ .append_value(format!("{:?}", end - start))
+ .unwrap();
+ }
+
+ let maybe_batch = RecordBatch::try_new(
+ captured_schema,
+ vec![
+ Arc::new(type_builder.finish()),
+ Arc::new(plan_builder.finish()),
+ ],
+ );
+ // again ignore error
+ tx.send(maybe_batch).await.ok();
+ });
+
+ Ok(RecordBatchReceiverStream::create(&self.schema, rx))
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "AnalyzeExec verbose={}", self.verbose)
+ }
+ }
+ }
+}
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index 0df6e60..8f7db72 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -639,6 +639,7 @@ pub trait Accumulator: Send + Sync + Debug {
}
pub mod aggregates;
+pub mod analyze;
pub mod array_expressions;
pub mod coalesce_batches;
pub mod coalesce_partitions;
@@ -671,6 +672,7 @@ pub mod repartition;
pub mod sort;
pub mod sort_preserving_merge;
pub mod source;
+pub mod stream;
pub mod string_expressions;
pub mod type_coercion;
pub mod udaf;
diff --git a/datafusion/src/physical_plan/parquet.rs
b/datafusion/src/physical_plan/parquet.rs
index ec5611f..ff8bb5b 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -20,7 +20,6 @@
use std::fmt;
use std::fs::File;
use std::sync::Arc;
-use std::task::{Context, Poll};
use std::{any::Any, convert::TryInto};
use crate::{
@@ -28,8 +27,7 @@ use crate::{
logical_plan::{Column, Expr},
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
physical_plan::{
- common, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
- SendableRecordBatchStream,
+ common, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
},
scalar::ScalarValue,
};
@@ -55,12 +53,11 @@ use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task,
};
-use tokio_stream::wrappers::ReceiverStream;
use crate::datasource::datasource::{ColumnStatistics, Statistics};
use async_trait::async_trait;
-use futures::stream::{Stream, StreamExt};
+use super::stream::RecordBatchReceiverStream;
use super::SQLMetric;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::Accumulator;
@@ -688,10 +685,7 @@ impl ExecutionPlan for ParquetExec {
}
});
- Ok(Box::pin(ParquetStream {
- schema: self.schema.clone(),
- inner: ReceiverStream::new(response_rx),
- }))
+ Ok(RecordBatchReceiverStream::create(&self.schema, response_rx))
}
fn fmt_as(
@@ -938,28 +932,6 @@ fn split_files(filenames: &[String], n: usize) ->
Vec<&[String]> {
filenames.chunks(chunk_size).collect()
}
-struct ParquetStream {
- schema: SchemaRef,
- inner: ReceiverStream<ArrowResult<RecordBatch>>,
-}
-
-impl Stream for ParquetStream {
- type Item = ArrowResult<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.inner.poll_next_unpin(cx)
- }
-}
-
-impl RecordBatchStream for ParquetStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index e662821..256a43b 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -17,6 +17,7 @@
//! Physical query planner
+use super::analyze::AnalyzeExec;
use super::{
aggregates, cross_join::CrossJoinExec, empty::EmptyExec,
expressions::binary,
functions, hash_join::PartitionMode, udaf, union::UnionExec, windows,
@@ -741,6 +742,15 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the
plan".to_string(),
)),
+ LogicalPlan::Analyze {
+ verbose,
+ input,
+ schema,
+ } => {
+ let input = self.create_initial_plan(input, ctx_state)?;
+ let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+ Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
+ }
LogicalPlan::Extension { node } => {
let physical_inputs = node
.inputs()
@@ -1651,7 +1661,7 @@ mod tests {
let logical_plan =
LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None)
.unwrap()
- .explain(true)
+ .explain(true, false)
.unwrap()
.build()
.unwrap();
diff --git a/datafusion/src/physical_plan/stream.rs
b/datafusion/src/physical_plan/stream.rs
new file mode 100644
index 0000000..0c29f87
--- /dev/null
+++ b/datafusion/src/physical_plan/stream.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream wrappers for physical operators
+
+use arrow::{
+ datatypes::SchemaRef, error::Result as ArrowResult,
record_batch::RecordBatch,
+};
+use futures::{Stream, StreamExt};
+use tokio_stream::wrappers::ReceiverStream;
+
+use super::{RecordBatchStream, SendableRecordBatchStream};
+
+/// Adapter for a tokio [`ReceiverStream`] that implements the
+/// [`SendableRecordBatchStream`]
+/// interface
+pub struct RecordBatchReceiverStream {
+ schema: SchemaRef,
+ inner: ReceiverStream<ArrowResult<RecordBatch>>,
+}
+
+impl RecordBatchReceiverStream {
+ /// Construct a new [`RecordBatchReceiverStream`] which will send
+ /// batches of the specfied schema from `inner`
+ pub fn create(
+ schema: &SchemaRef,
+ rx: tokio::sync::mpsc::Receiver<ArrowResult<RecordBatch>>,
+ ) -> SendableRecordBatchStream {
+ let schema = schema.clone();
+ let inner = ReceiverStream::new(rx);
+ Box::pin(Self { schema, inner })
+ }
+}
+
+impl Stream for RecordBatchReceiverStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.inner.poll_next_unpin(cx)
+ }
+}
+
+impl RecordBatchStream for RecordBatchReceiverStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index ef2b634..29204f4 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -101,8 +101,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Statement::Explain {
verbose,
statement,
- analyze: _,
- } => self.explain_statement_to_plan(*verbose, statement),
+ analyze,
+ } => self.explain_statement_to_plan(*verbose, *analyze, statement),
Statement::Query(query) => self.query_to_plan(query),
Statement::ShowVariable { variable } =>
self.show_variable_to_plan(variable),
Statement::ShowColumns {
@@ -230,21 +230,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub fn explain_statement_to_plan(
&self,
verbose: bool,
+ analyze: bool,
statement: &Statement,
) -> Result<LogicalPlan> {
let plan = self.sql_statement_to_plan(statement)?;
-
- let stringified_plans =
vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
-
- let schema = LogicalPlan::explain_schema();
let plan = Arc::new(plan);
+ let schema = LogicalPlan::explain_schema();
+ let schema = schema.to_dfschema_ref()?;
- Ok(LogicalPlan::Explain {
- verbose,
- plan,
- stringified_plans,
- schema: schema.to_dfschema_ref()?,
- })
+ if analyze {
+ Ok(LogicalPlan::Analyze {
+ verbose,
+ input: plan,
+ schema,
+ })
+ } else {
+ let stringified_plans =
+ vec![plan.to_stringified(PlanType::InitialLogicalPlan)];
+ Ok(LogicalPlan::Explain {
+ verbose,
+ plan,
+ stringified_plans,
+ schema,
+ })
+ }
}
fn build_schema(&self, columns: &[SQLColumnDef]) -> Result<Schema> {
diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs
index 3971db3..247dab1 100644
--- a/datafusion/src/test/exec.rs
+++ b/datafusion/src/test/exec.rs
@@ -30,13 +30,15 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use futures::{Stream, StreamExt};
-use tokio_stream::wrappers::ReceiverStream;
+use futures::Stream;
-use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};
+use crate::{
+ error::{DataFusionError, Result},
+ physical_plan::stream::RecordBatchReceiverStream,
+};
/// Index into the data that has been returned so far
#[derive(Debug, Default, Clone)]
@@ -161,8 +163,6 @@ impl ExecutionPlan for MockExec {
async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
assert_eq!(partition, 0);
- let schema = self.schema();
-
// Result doesn't implement clone, so do it ourself
let data: Vec<_> = self
.data
@@ -188,11 +188,7 @@ impl ExecutionPlan for MockExec {
});
// returned stream simply reads off the rx stream
- let stream = DelayedStream {
- schema,
- inner: ReceiverStream::new(rx),
- };
- Ok(Box::pin(stream))
+ Ok(RecordBatchReceiverStream::create(&self.schema, rx))
}
}
@@ -204,29 +200,6 @@ fn clone_error(e: &ArrowError) -> ArrowError {
}
}
-#[derive(Debug)]
-pub struct DelayedStream {
- schema: SchemaRef,
- inner: ReceiverStream<ArrowResult<RecordBatch>>,
-}
-
-impl Stream for DelayedStream {
- type Item = ArrowResult<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- self.inner.poll_next_unpin(cx)
- }
-}
-
-impl RecordBatchStream for DelayedStream {
- fn schema(&self) -> SchemaRef {
- Arc::clone(&self.schema)
- }
-}
-
/// A Mock ExecutionPlan that does not start producing input until a
/// barrier is called
///
@@ -289,8 +262,6 @@ impl ExecutionPlan for BarrierExec {
async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
assert!(partition < self.data.len());
- let schema = self.schema();
-
let (tx, rx) = tokio::sync::mpsc::channel(2);
// task simply sends data in order after barrier is reached
@@ -308,11 +279,7 @@ impl ExecutionPlan for BarrierExec {
});
// returned stream simply reads off the rx stream
- let stream = DelayedStream {
- schema,
- inner: ReceiverStream::new(rx),
- };
- Ok(Box::pin(stream))
+ Ok(RecordBatchReceiverStream::create(&self.schema, rx))
}
}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 0c33bd4..2a062f6 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2141,6 +2141,54 @@ async fn csv_explain() {
}
#[tokio::test]
+async fn csv_explain_analyze() {
+ // This test uses the execute function to run an actual plan under EXPLAIN
ANALYZE
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_csv_by_sql(&mut ctx).await;
+ let sql = "EXPLAIN ANALYZE SELECT count(*), c1 FROM aggregate_test_100
group by c1";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let formatted =
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+ let formatted = normalize_for_explain(&formatted);
+
+ // Only test basic plumbing and try to avoid having to change too
+ // many things
+ let needle = "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES),
metrics=[";
+ assert!(
+ formatted.contains(needle),
+ "did not find '{}' in\n{}",
+ needle,
+ formatted
+ );
+ let verbose_needle = "Output Rows | 5";
+ assert!(
+ !formatted.contains(verbose_needle),
+ "found unexpected '{}' in\n{}",
+ verbose_needle,
+ formatted
+ );
+}
+
+#[tokio::test]
+async fn csv_explain_analyze_verbose() {
+ // This test uses the execute function to run an actual plan under EXPLAIN
VERBOSE ANALYZE
+ let mut ctx = ExecutionContext::new();
+ register_aggregate_csv_by_sql(&mut ctx).await;
+ let sql =
+ "EXPLAIN ANALYZE VERBOSE SELECT count(*), c1 FROM aggregate_test_100
group by c1";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let formatted =
arrow::util::pretty::pretty_format_batches(&actual).unwrap();
+ let formatted = normalize_for_explain(&formatted);
+
+ let verbose_needle = "Output Rows | 5";
+ assert!(
+ formatted.contains(verbose_needle),
+ "did not find '{}' in\n{}",
+ verbose_needle,
+ formatted
+ );
+}
+
+#[tokio::test]
async fn csv_explain_plans() {
// This test verify the look of each plan in its full cycle plan creation