This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new db45ff3eea Minor: split EXPLAIN and ANALYZE planning into different
functions (#15188)
db45ff3eea is described below
commit db45ff3eea33c0e3ad607ce1abff266a9956ab22
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Mar 12 14:57:03 2025 -0400
Minor: split EXPLAIN and ANALYZE planning into different functions (#15188)
---
datafusion/core/src/physical_planner.rs | 320 +++++++++++++++++---------------
1 file changed, 167 insertions(+), 153 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index f200fb0e0f..6aff9280ff 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -78,8 +78,9 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
- DescribeTable, DmlStatement, Extension, FetchType, Filter, JoinType,
RecursiveQuery,
- SkipType, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound,
WriteOp,
+ Analyze, DescribeTable, DmlStatement, Explain, Extension, FetchType,
Filter,
+ JoinType, RecursiveQuery, SkipType, SortExpr, StringifiedPlan, WindowFrame,
+ WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder,
AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
@@ -177,16 +178,17 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
- match self.handle_explain(logical_plan, session_state).await? {
- Some(plan) => Ok(plan),
- None => {
- let plan = self
- .create_initial_plan(logical_plan, session_state)
- .await?;
-
- self.optimize_physical_plan(plan, session_state, |_, _| {})
- }
+ if let Some(plan) = self
+ .handle_explain_or_analyze(logical_plan, session_state)
+ .await?
+ {
+ return Ok(plan);
}
+ let plan = self
+ .create_initial_plan(logical_plan, session_state)
+ .await?;
+
+ self.optimize_physical_plan(plan, session_state, |_, _| {})
}
/// Create a physical expression from a logical expression
@@ -1715,167 +1717,179 @@ impl DefaultPhysicalPlanner {
/// Returns
/// Some(plan) if optimized, and None if logical_plan was not an
/// explain (and thus needs to be optimized as normal)
- async fn handle_explain(
+ async fn handle_explain_or_analyze(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- if let LogicalPlan::Explain(e) = logical_plan {
- use PlanType::*;
- let mut stringified_plans = vec![];
+ let execution_plan = match logical_plan {
+ LogicalPlan::Explain(e) => self.handle_explain(e,
session_state).await?,
+ LogicalPlan::Analyze(a) => self.handle_analyze(a,
session_state).await?,
+ _ => return Ok(None),
+ };
+ Ok(Some(execution_plan))
+ }
+
+ /// Planner for `LogicalPlan::Explain`
+ async fn handle_explain(
+ &self,
+ e: &Explain,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ use PlanType::*;
+ let mut stringified_plans = vec![];
- let config = &session_state.config_options().explain;
- let explain_format = DisplayFormatType::from_str(&config.format)?;
+ let config = &session_state.config_options().explain;
+ let explain_format = DisplayFormatType::from_str(&config.format)?;
- let skip_logical_plan = config.physical_plan_only
- || explain_format == DisplayFormatType::TreeRender;
+ let skip_logical_plan =
+ config.physical_plan_only || explain_format ==
DisplayFormatType::TreeRender;
- if !skip_logical_plan {
- stringified_plans.clone_from(&e.stringified_plans);
- if e.logical_optimization_succeeded {
-
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
- }
+ if !skip_logical_plan {
+ stringified_plans.clone_from(&e.stringified_plans);
+ if e.logical_optimization_succeeded {
+
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
}
+ }
- if !config.logical_plan_only && e.logical_optimization_succeeded {
- match self
- .create_initial_plan(e.plan.as_ref(), session_state)
- .await
- {
- Ok(input) => {
- // Include statistics / schema if enabled
- stringified_plans.push(
- displayable(input.as_ref())
- .set_show_statistics(config.show_statistics)
- .set_show_schema(config.show_schema)
- .to_stringified(
- e.verbose,
- InitialPhysicalPlan,
- explain_format,
- ),
- );
+ if !config.logical_plan_only && e.logical_optimization_succeeded {
+ match self
+ .create_initial_plan(e.plan.as_ref(), session_state)
+ .await
+ {
+ Ok(input) => {
+ // Include statistics / schema if enabled
+ stringified_plans.push(
+ displayable(input.as_ref())
+ .set_show_statistics(config.show_statistics)
+ .set_show_schema(config.show_schema)
+ .to_stringified(
+ e.verbose,
+ InitialPhysicalPlan,
+ explain_format,
+ ),
+ );
- // Show statistics + schema in verbose output even if
not
- // explicitly requested
- if e.verbose {
- if !config.show_statistics {
- stringified_plans.push(
- displayable(input.as_ref())
- .set_show_statistics(true)
- .to_stringified(
- e.verbose,
- InitialPhysicalPlanWithStats,
- explain_format,
- ),
- );
- }
- if !config.show_schema {
- stringified_plans.push(
- displayable(input.as_ref())
- .set_show_schema(true)
- .to_stringified(
- e.verbose,
- InitialPhysicalPlanWithSchema,
- explain_format,
- ),
- );
- }
+ // Show statistics + schema in verbose output even if not
+ // explicitly requested
+ if e.verbose {
+ if !config.show_statistics {
+ stringified_plans.push(
+ displayable(input.as_ref())
+ .set_show_statistics(true)
+ .to_stringified(
+ e.verbose,
+ InitialPhysicalPlanWithStats,
+ explain_format,
+ ),
+ );
}
+ if !config.show_schema {
+ stringified_plans.push(
+ displayable(input.as_ref())
+ .set_show_schema(true)
+ .to_stringified(
+ e.verbose,
+ InitialPhysicalPlanWithSchema,
+ explain_format,
+ ),
+ );
+ }
+ }
- let optimized_plan = self.optimize_physical_plan(
- input,
- session_state,
- |plan, optimizer| {
- let optimizer_name =
optimizer.name().to_string();
- let plan_type = OptimizedPhysicalPlan {
optimizer_name };
- stringified_plans.push(
- displayable(plan)
-
.set_show_statistics(config.show_statistics)
- .set_show_schema(config.show_schema)
- .to_stringified(
- e.verbose,
- plan_type,
- explain_format,
- ),
- );
- },
- );
- match optimized_plan {
- Ok(input) => {
- // This plan will includes statistics if
show_statistics is on
- stringified_plans.push(
- displayable(input.as_ref())
-
.set_show_statistics(config.show_statistics)
- .set_show_schema(config.show_schema)
- .to_stringified(
- e.verbose,
- FinalPhysicalPlan,
- explain_format,
- ),
- );
-
- // Show statistics + schema in verbose output
even if not
- // explicitly requested
- if e.verbose {
- if !config.show_statistics {
- stringified_plans.push(
- displayable(input.as_ref())
- .set_show_statistics(true)
- .to_stringified(
- e.verbose,
- FinalPhysicalPlanWithStats,
- explain_format,
- ),
- );
- }
- if !config.show_schema {
- stringified_plans.push(
- displayable(input.as_ref())
- .set_show_schema(true)
- .to_stringified(
- e.verbose,
-
FinalPhysicalPlanWithSchema,
- explain_format,
- ),
- );
- }
+ let optimized_plan = self.optimize_physical_plan(
+ input,
+ session_state,
+ |plan, optimizer| {
+ let optimizer_name = optimizer.name().to_string();
+ let plan_type = OptimizedPhysicalPlan {
optimizer_name };
+ stringified_plans.push(
+ displayable(plan)
+
.set_show_statistics(config.show_statistics)
+ .set_show_schema(config.show_schema)
+ .to_stringified(e.verbose, plan_type,
explain_format),
+ );
+ },
+ );
+ match optimized_plan {
+ Ok(input) => {
+ // This plan will includes statistics if
show_statistics is on
+ stringified_plans.push(
+ displayable(input.as_ref())
+
.set_show_statistics(config.show_statistics)
+ .set_show_schema(config.show_schema)
+ .to_stringified(
+ e.verbose,
+ FinalPhysicalPlan,
+ explain_format,
+ ),
+ );
+
+ // Show statistics + schema in verbose output even
if not
+ // explicitly requested
+ if e.verbose {
+ if !config.show_statistics {
+ stringified_plans.push(
+ displayable(input.as_ref())
+ .set_show_statistics(true)
+ .to_stringified(
+ e.verbose,
+ FinalPhysicalPlanWithStats,
+ explain_format,
+ ),
+ );
+ }
+ if !config.show_schema {
+ stringified_plans.push(
+ displayable(input.as_ref())
+ .set_show_schema(true)
+ .to_stringified(
+ e.verbose,
+ FinalPhysicalPlanWithSchema,
+ explain_format,
+ ),
+ );
}
}
- Err(DataFusionError::Context(optimizer_name, e))
=> {
- let plan_type = OptimizedPhysicalPlan {
optimizer_name };
- stringified_plans
- .push(StringifiedPlan::new(plan_type,
e.to_string()))
- }
- Err(e) => return Err(e),
}
- }
- Err(err) => {
- stringified_plans.push(StringifiedPlan::new(
- PhysicalPlanError,
- err.strip_backtrace(),
- ));
+ Err(DataFusionError::Context(optimizer_name, e)) => {
+ let plan_type = OptimizedPhysicalPlan {
optimizer_name };
+ stringified_plans
+ .push(StringifiedPlan::new(plan_type,
e.to_string()))
+ }
+ Err(e) => return Err(e),
}
}
+ Err(err) => {
+ stringified_plans.push(StringifiedPlan::new(
+ PhysicalPlanError,
+ err.strip_backtrace(),
+ ));
+ }
}
-
- Ok(Some(Arc::new(ExplainExec::new(
- SchemaRef::new(e.schema.as_ref().to_owned().into()),
- stringified_plans,
- e.verbose,
- ))))
- } else if let LogicalPlan::Analyze(a) = logical_plan {
- let input = self.create_physical_plan(&a.input,
session_state).await?;
- let schema = SchemaRef::new((*a.schema).clone().into());
- let show_statistics =
session_state.config_options().explain.show_statistics;
- Ok(Some(Arc::new(AnalyzeExec::new(
- a.verbose,
- show_statistics,
- input,
- schema,
- ))))
- } else {
- Ok(None)
}
+
+ Ok(Arc::new(ExplainExec::new(
+ Arc::clone(e.schema.inner()),
+ stringified_plans,
+ e.verbose,
+ )))
+ }
+
+ async fn handle_analyze(
+ &self,
+ a: &Analyze,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let input = self.create_physical_plan(&a.input, session_state).await?;
+ let schema = SchemaRef::new((*a.schema).clone().into());
+ let show_statistics =
session_state.config_options().explain.show_statistics;
+ Ok(Arc::new(AnalyzeExec::new(
+ a.verbose,
+ show_statistics,
+ input,
+ schema,
+ )))
}
/// Optimize a physical plan by applying each physical optimizer,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]