This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b54adb36b8 Refactor physical create_initial_plan to iteratively &
concurrently construct plan from the bottom up (#10023)
b54adb36b8 is described below
commit b54adb36b855f198b5098fdb3e4cdf1934818efd
Author: Jeffrey Vo <[email protected]>
AuthorDate: Wed Apr 17 01:59:42 2024 +1000
Refactor physical create_initial_plan to iteratively & concurrently
construct plan from the bottom up (#10023)
* Refactor physical create_initial_plan to construct bottom up
* Refactor node mapping into separate function
* Experiment with concurrent bottom up physical planning
* Refactoring and comments
* Remove unnecessary collect
* Preallocate vector capacity
* Remove children.clone()
* Introduce ChildrenContainer enum
* Formatting
* Fix case where extension may have 0 or 1 children
* Documentation and cleanup unwraps
* Minor changes
- Use tokio::Mutex in async environment
- Remove Option from enum, since it is only used for taking.
---------
Co-authored-by: metesynnada <[email protected]>
---
datafusion/core/src/physical_planner.rs | 1827 ++++++++++++++++++-------------
datafusion/core/tests/tpcds_planning.rs | 1 -
2 files changed, 1056 insertions(+), 772 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index f5e937bb56..301f68c0f2 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -17,6 +17,7 @@
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
+use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
@@ -35,12 +36,11 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_expr::{
- Aggregate, EmptyRelation, Join, Projection, Sort, SubqueryAlias,
TableScan, Unnest,
- Window,
+ Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest,
Window,
};
use crate::logical_expr::{
- CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning,
PlanType,
- Repartition, Union, UserDefinedLogicalNode,
+ Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType,
Repartition,
+ UserDefinedLogicalNode,
};
use crate::logical_expr::{Limit, Values};
use crate::physical_expr::{create_physical_expr, create_physical_exprs};
@@ -74,9 +74,11 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::RecordBatch;
+use datafusion_common::config::FormatOptions;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::{
- exec_err, internal_err, not_impl_err, plan_err, DFSchema, FileType,
ScalarValue,
+ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err,
DFSchema,
+ FileType, ScalarValue,
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
@@ -87,21 +89,20 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
- DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
- StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
+ DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery,
+ ScalarFunctionDefinition, StringifiedPlan, WindowFrame, WindowFrameBound,
WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
+use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_sql::utils::window_expr_common_partition_keys;
use async_trait::async_trait;
-use datafusion_common::config::FormatOptions;
-use datafusion_physical_expr::LexOrdering;
-use futures::future::BoxFuture;
-use futures::{FutureExt, StreamExt, TryStreamExt};
+use futures::{StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use sqlparser::ast::NullTreatment;
+use tokio::sync::Mutex;
fn create_function_physical_name(
fun: &str,
@@ -445,6 +446,22 @@ pub trait ExtensionPlanner {
/// Default single node physical query planner that converts a
/// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
+///
+/// This planner will first flatten the `LogicalPlan` tree via a
+/// depth first approach, which allows it to identify the leaves
+/// of the tree.
+///
+/// Tasks are spawned from these leaves and traverse back up the
+/// tree towards the root, converting each `LogicalPlan` node it
+/// reaches into their equivalent `ExecutionPlan` node. When these
+/// tasks reach a common node, they will terminate until the last
+/// task reaches the node which will then continue building up the
+/// tree.
+///
+/// Up to [`planning_concurrency`] tasks are buffered at once to
+/// execute concurrently.
+///
+/// [`planning_concurrency`]:
crate::config::ExecutionOptions::planning_concurrency
#[derive(Default)]
pub struct DefaultPhysicalPlanner {
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
@@ -485,6 +502,63 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
}
}
+#[derive(Debug)]
+struct ExecutionPlanChild {
+ /// Index needed to order children of parent to ensure consistency with
original
+ /// `LogicalPlan`
+ index: usize,
+ plan: Arc<dyn ExecutionPlan>,
+}
+
+#[derive(Debug)]
+enum NodeState {
+ ZeroOrOneChild,
+ /// Nodes with multiple children will have multiple tasks accessing it,
+ /// and each task will append their contribution until the last task takes
+ /// all the children to build the parent node.
+ TwoOrMoreChildren(Mutex<Vec<ExecutionPlanChild>>),
+}
+
+/// To avoid needing to pass single child wrapped in a Vec for nodes
+/// with only one child.
+enum ChildrenContainer {
+ None,
+ One(Arc<dyn ExecutionPlan>),
+ Multiple(Vec<Arc<dyn ExecutionPlan>>),
+}
+
+impl ChildrenContainer {
+ fn one(self) -> Result<Arc<dyn ExecutionPlan>> {
+ match self {
+ Self::One(p) => Ok(p),
+ _ => internal_err!("More than one child in ChildrenContainer"),
+ }
+ }
+
+ fn two(self) -> Result<[Arc<dyn ExecutionPlan>; 2]> {
+ match self {
+ Self::Multiple(v) if v.len() == 2 => Ok(v.try_into().unwrap()),
+ _ => internal_err!("ChildrenContainer doesn't contain exactly 2
children"),
+ }
+ }
+
+ fn vec(self) -> Vec<Arc<dyn ExecutionPlan>> {
+ match self {
+ Self::None => vec![],
+ Self::One(p) => vec![p],
+ Self::Multiple(v) => v,
+ }
+ }
+}
+
+#[derive(Debug)]
+struct LogicalNode<'a> {
+ node: &'a LogicalPlan,
+ // None if root
+ parent_index: Option<usize>,
+ state: NodeState,
+}
+
impl DefaultPhysicalPlanner {
/// Create a physical planner that uses `extension_planners` to
/// plan user-defined logical nodes [`LogicalPlan::Extension`].
@@ -496,777 +570,983 @@ impl DefaultPhysicalPlanner {
Self { extension_planners }
}
- /// Create a physical plans for multiple logical plans.
- ///
- /// This is the same as [`create_initial_plan`](Self::create_initial_plan)
but runs the planning concurrently.
- ///
- /// The result order is the same as the input order.
- fn create_initial_plan_multi<'a>(
- &'a self,
- logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
- session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
- async move {
- // First build futures with as little references as possible, then
performing some stream magic.
- // Otherwise rustc bails out w/:
- //
- // error: higher-ranked lifetime error
- // ...
- // note: could not prove `[async block@...]: std::marker::Send`
- let futures = logical_plans
- .into_iter()
- .enumerate()
- .map(|(idx, lp)| async move {
- let plan = self.create_initial_plan(lp,
session_state).await?;
- Ok((idx, plan)) as Result<_>
- })
- .collect::<Vec<_>>();
-
- let mut physical_plans = futures::stream::iter(futures)
- .buffer_unordered(
- session_state
- .config_options()
- .execution
- .planning_concurrency,
- )
- .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
- .await?;
- physical_plans.sort_by_key(|(idx, _plan)| *idx);
- let physical_plans = physical_plans
- .into_iter()
- .map(|(_idx, plan)| plan)
- .collect::<Vec<_>>();
- Ok(physical_plans)
- }
- .boxed()
+ /// Create a physical plan from a logical plan
+ async fn create_initial_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // DFS the tree to flatten it into a Vec.
+ // This will allow us to build the Physical Plan from the leaves up
+ // to avoid recursion, and also to make it easier to build a valid
+ // Physical Plan from the start and not rely on some intermediate
+ // representation (since parents need to know their children at
+ // construction time).
+ let mut flat_tree = vec![];
+ let mut dfs_visit_stack = vec![(None, logical_plan)];
+ // Use this to be able to find the leaves to start construction bottom
+ // up concurrently.
+ let mut flat_tree_leaf_indices = vec![];
+ while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+ let current_index = flat_tree.len();
+ // Because of how we extend the visit stack here, we visit the
children
+ // in reverse order of how they appear, so later we need to reverse
+ // the order of children when building the nodes.
+ dfs_visit_stack
+ .extend(node.inputs().iter().map(|&n| (Some(current_index),
n)));
+ let state = match node.inputs().len() {
+ 0 => {
+ flat_tree_leaf_indices.push(current_index);
+ NodeState::ZeroOrOneChild
+ }
+ 1 => NodeState::ZeroOrOneChild,
+ _ => {
+ let ready_children =
Vec::with_capacity(node.inputs().len());
+ let ready_children = Mutex::new(ready_children);
+ NodeState::TwoOrMoreChildren(ready_children)
+ }
+ };
+ let node = LogicalNode {
+ node,
+ parent_index,
+ state,
+ };
+ flat_tree.push(node);
+ }
+ let flat_tree = Arc::new(flat_tree);
+
+ let planning_concurrency = session_state
+ .config_options()
+ .execution
+ .planning_concurrency;
+ // Can never spawn more tasks than leaves in the tree, as these tasks
must
+ // all converge down to the root node, which can only be processed by a
+ // single task.
+ let max_concurrency =
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+ // Spawning tasks which will traverse leaf up to the root.
+ let tasks = flat_tree_leaf_indices
+ .into_iter()
+ .map(|index| self.task_helper(index, flat_tree.clone(),
session_state));
+ let mut outputs = futures::stream::iter(tasks)
+ .buffer_unordered(max_concurrency)
+ .try_collect::<Vec<_>>()
+ .await?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>();
+ // Ideally this never happens if we have a valid LogicalPlan tree
+ if outputs.len() != 1 {
+ return internal_err!(
+ "Failed to convert LogicalPlan to ExecutionPlan: More than one
root detected"
+ );
+ }
+ let plan = outputs.pop().unwrap();
+ Ok(plan)
}
- /// Create a physical plan from a logical plan
- fn create_initial_plan<'a>(
+ /// These tasks start at a leaf and traverse up the tree towards the root,
building
+ /// an ExecutionPlan as they go. When they reach a node with two or more
children,
+ /// they append their current result (a child of the parent node) to the
children
+ /// vector, and if this is sufficient to create the parent then continues
traversing
+ /// the tree to create nodes. Otherwise, the task terminates.
+ async fn task_helper<'a>(
&'a self,
- logical_plan: &'a LogicalPlan,
+ leaf_starter_index: usize,
+ flat_tree: Arc<Vec<LogicalNode<'a>>>,
session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
- async move {
- let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan
{
- LogicalPlan::TableScan(TableScan {
- source,
- projection,
- filters,
- fetch,
- ..
- }) => {
- let source = source_as_provider(source)?;
- // Remove all qualifiers from the scan as the provider
- // doesn't know (nor should care) how the relation was
- // referred to in the query
- let filters = unnormalize_cols(filters.iter().cloned());
- source.scan(session_state, projection.as_ref(), &filters,
*fetch).await
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ // We always start with a leaf, so can ignore status and pass empty
children
+ let mut node = flat_tree.get(leaf_starter_index).ok_or_else(|| {
+ internal_datafusion_err!(
+ "Invalid index whilst creating initial physical plan"
+ )
+ })?;
+ let mut plan = self
+ .map_logical_node_to_physical(
+ node.node,
+ session_state,
+ ChildrenContainer::None,
+ )
+ .await?;
+ let mut current_index = leaf_starter_index;
+ // parent_index is None only for root
+ while let Some(parent_index) = node.parent_index {
+ node = flat_tree.get(parent_index).ok_or_else(|| {
+ internal_datafusion_err!(
+ "Invalid index whilst creating initial physical plan"
+ )
+ })?;
+ match &node.state {
+ NodeState::ZeroOrOneChild => {
+ plan = self
+ .map_logical_node_to_physical(
+ node.node,
+ session_state,
+ ChildrenContainer::One(plan),
+ )
+ .await?;
}
- LogicalPlan::Copy(CopyTo{
- input,
- output_url,
- format_options,
- partition_by,
- options: source_option_tuples
- }) => {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- let parsed_url = ListingTableUrl::parse(output_url)?;
- let object_store_url = parsed_url.object_store();
-
- let schema: Schema = (**input.schema()).clone().into();
-
- // Note: the DataType passed here is ignored for the
purposes of writing and inferred instead
- // from the schema of the RecordBatch being written. This
allows COPY statements to specify only
- // the column name rather than column name + explicit data
type.
- let table_partition_cols = partition_by.iter()
- .map(|s| (s.to_string(), arrow_schema::DataType::Null))
- .collect::<Vec<_>>();
-
- // Set file sink related options
- let config = FileSinkConfig {
- object_store_url,
- table_paths: vec![parsed_url],
- file_groups: vec![],
- output_schema: Arc::new(schema),
- table_partition_cols,
- overwrite: false,
- };
- let mut table_options =
session_state.default_table_options();
- let sink_format: Arc<dyn FileFormat> = match
format_options {
- FormatOptions::CSV(options) => {
- table_options.csv = options.clone();
- table_options.set_file_format(FileType::CSV);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(CsvFormat::default().with_options(table_options.csv))
- },
- FormatOptions::JSON(options) => {
- table_options.json = options.clone();
- table_options.set_file_format(FileType::JSON);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(JsonFormat::default().with_options(table_options.json))
- },
- #[cfg(feature = "parquet")]
- FormatOptions::PARQUET(options) => {
- table_options.parquet = options.clone();
- table_options.set_file_format(FileType::PARQUET);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
- },
- FormatOptions::AVRO => Arc::new(AvroFormat {} ),
- FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+ // See if we have all children to build the node.
+ NodeState::TwoOrMoreChildren(children) => {
+ let mut children: Vec<ExecutionPlanChild> = {
+ let mut guard = children.lock().await;
+ // Add our contribution to this parent node.
+ // Vec is pre-allocated so no allocation should occur
here.
+ guard.push(ExecutionPlanChild {
+ index: current_index,
+ plan,
+ });
+ if guard.len() < node.node.inputs().len() {
+ // This node is not ready yet, still pending more
children.
+ // This task is finished forever.
+ return Ok(None);
+ }
+
+ // With this task's contribution we have enough
children.
+ // This task is the only one building this node now,
and thus
+ // no other task will need the Mutex for this node, so
take
+ // all children.
+ std::mem::take(guard.as_mut())
};
- sink_format.create_writer_physical_plan(input_exec,
session_state, config, None).await
+ // Indices refer to position in flat tree Vec, which means
they are
+ // guaranteed to be unique, hence unstable sort used.
+ //
+ // We reverse sort because of how we visited the node in
the initial
+ // DFS traversal (see above).
+ children.sort_unstable_by_key(|epc|
std::cmp::Reverse(epc.index));
+ let children = children.into_iter().map(|epc|
epc.plan).collect();
+ let children = ChildrenContainer::Multiple(children);
+ plan = self
+ .map_logical_node_to_physical(node.node,
session_state, children)
+ .await?;
}
- LogicalPlan::Dml(DmlStatement {
- table_name,
- op: WriteOp::InsertInto,
- input,
- ..
- }) => {
- let name = table_name.table();
- let schema =
session_state.schema_for_ref(table_name.clone())?;
- if let Some(provider) = schema.table(name).await? {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- provider.insert_into(session_state, input_exec,
false).await
- } else {
- return exec_err!(
- "Table '{table_name}' does not exist"
- );
+ }
+ current_index = parent_index;
+ }
+ // Only one task should ever reach this point for a valid LogicalPlan
tree.
+ Ok(Some(plan))
+ }
+
+ /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan
counterpart.
+ async fn map_logical_node_to_physical(
+ &self,
+ node: &LogicalPlan,
+ session_state: &SessionState,
+ children: ChildrenContainer,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let exec_node: Arc<dyn ExecutionPlan> = match node {
+ // Leaves (no children)
+ LogicalPlan::TableScan(TableScan {
+ source,
+ projection,
+ filters,
+ fetch,
+ ..
+ }) => {
+ let source = source_as_provider(source)?;
+ // Remove all qualifiers from the scan as the provider
+ // doesn't know (nor should care) how the relation was
+ // referred to in the query
+ let filters = unnormalize_cols(filters.iter().cloned());
+ source
+ .scan(session_state, projection.as_ref(), &filters, *fetch)
+ .await?
+ }
+ LogicalPlan::Values(Values { values, schema }) => {
+ let exec_schema = schema.as_ref().to_owned().into();
+ let exprs = values
+ .iter()
+ .map(|row| {
+ row.iter()
+ .map(|expr| {
+ self.create_physical_expr(expr, schema,
session_state)
+ })
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let value_exec =
ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
+ Arc::new(value_exec)
+ }
+ LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema,
+ }) => Arc::new(EmptyExec::new(SchemaRef::new(
+ schema.as_ref().to_owned().into(),
+ ))),
+ LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: true,
+ schema,
+ }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new(
+ schema.as_ref().to_owned().into(),
+ ))),
+ LogicalPlan::DescribeTable(DescribeTable {
+ schema,
+ output_schema,
+ }) => {
+ let output_schema: Schema = output_schema.as_ref().into();
+ self.plan_describe(schema.clone(), Arc::new(output_schema))?
+ }
+
+ // 1 Child
+ LogicalPlan::Copy(CopyTo {
+ input,
+ output_url,
+ format_options,
+ partition_by,
+ options: source_option_tuples,
+ }) => {
+ let input_exec = children.one()?;
+ let parsed_url = ListingTableUrl::parse(output_url)?;
+ let object_store_url = parsed_url.object_store();
+
+ let schema: Schema = (**input.schema()).clone().into();
+
+ // Note: the DataType passed here is ignored for the purposes
of writing and inferred instead
+ // from the schema of the RecordBatch being written. This
allows COPY statements to specify only
+ // the column name rather than column name + explicit data
type.
+ let table_partition_cols = partition_by
+ .iter()
+ .map(|s| (s.to_string(), arrow_schema::DataType::Null))
+ .collect::<Vec<_>>();
+
+ // Set file sink related options
+ let config = FileSinkConfig {
+ object_store_url,
+ table_paths: vec![parsed_url],
+ file_groups: vec![],
+ output_schema: Arc::new(schema),
+ table_partition_cols,
+ overwrite: false,
+ };
+ let mut table_options = session_state.default_table_options();
+ let sink_format: Arc<dyn FileFormat> = match format_options {
+ FormatOptions::CSV(options) => {
+ table_options.csv = options.clone();
+ table_options.set_file_format(FileType::CSV);
+
table_options.alter_with_string_hash_map(source_option_tuples)?;
+
Arc::new(CsvFormat::default().with_options(table_options.csv))
}
- }
- LogicalPlan::Dml(DmlStatement {
- table_name,
- op: WriteOp::InsertOverwrite,
- input,
- ..
- }) => {
- let name = table_name.table();
- let schema =
session_state.schema_for_ref(table_name.clone())?;
- if let Some(provider) = schema.table(name).await? {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- provider.insert_into(session_state, input_exec,
true).await
- } else {
- return exec_err!(
- "Table '{table_name}' does not exist"
- );
+ FormatOptions::JSON(options) => {
+ table_options.json = options.clone();
+ table_options.set_file_format(FileType::JSON);
+
table_options.alter_with_string_hash_map(source_option_tuples)?;
+
Arc::new(JsonFormat::default().with_options(table_options.json))
+ }
+ #[cfg(feature = "parquet")]
+ FormatOptions::PARQUET(options) => {
+ table_options.parquet = options.clone();
+ table_options.set_file_format(FileType::PARQUET);
+
table_options.alter_with_string_hash_map(source_option_tuples)?;
+ Arc::new(
+
ParquetFormat::default().with_options(table_options.parquet),
+ )
}
+ FormatOptions::AVRO => Arc::new(AvroFormat {}),
+ FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+ };
+
+ sink_format
+ .create_writer_physical_plan(input_exec, session_state,
config, None)
+ .await?
+ }
+ LogicalPlan::Dml(DmlStatement {
+ table_name,
+ op: WriteOp::InsertInto,
+ ..
+ }) => {
+ let name = table_name.table();
+ let schema = session_state.schema_for_ref(table_name.clone())?;
+ if let Some(provider) = schema.table(name).await? {
+ let input_exec = children.one()?;
+ provider
+ .insert_into(session_state, input_exec, false)
+ .await?
+ } else {
+ return exec_err!("Table '{table_name}' does not exist");
}
- LogicalPlan::Values(Values {
- values,
- schema,
- }) => {
- let exec_schema = schema.as_ref().to_owned().into();
- let exprs = values.iter()
- .map(|row| {
- row.iter().map(|expr| {
- self.create_physical_expr(
- expr,
- schema,
- session_state,
- )
- })
- .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
- })
- .collect::<Result<Vec<_>>>()?;
- let value_exec = ValuesExec::try_new(
- SchemaRef::new(exec_schema),
- exprs,
- )?;
- Ok(Arc::new(value_exec))
+ }
+ LogicalPlan::Dml(DmlStatement {
+ table_name,
+ op: WriteOp::InsertOverwrite,
+ ..
+ }) => {
+ let name = table_name.table();
+ let schema = session_state.schema_for_ref(table_name.clone())?;
+ if let Some(provider) = schema.table(name).await? {
+ let input_exec = children.one()?;
+ provider
+ .insert_into(session_state, input_exec, true)
+ .await?
+ } else {
+ return exec_err!("Table '{table_name}' does not exist");
}
- LogicalPlan::Window(Window {
- input, window_expr, ..
- }) => {
- if window_expr.is_empty() {
- return internal_err!(
- "Impossibly got empty window expression"
- );
+ }
+ LogicalPlan::Window(Window {
+ input, window_expr, ..
+ }) => {
+ if window_expr.is_empty() {
+ return internal_err!("Impossibly got empty window
expression");
+ }
+
+ let input_exec = children.one()?;
+
+ // at this moment we are guaranteed by the logical planner
+ // to have all the window_expr to have equal sort key
+ let partition_keys =
window_expr_common_partition_keys(window_expr)?;
+
+ let can_repartition = !partition_keys.is_empty()
+ && session_state.config().target_partitions() > 1
+ && session_state.config().repartition_window_functions();
+
+ let physical_partition_keys = if can_repartition {
+ partition_keys
+ .iter()
+ .map(|e| {
+ self.create_physical_expr(e, input.schema(),
session_state)
+ })
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
+ } else {
+ vec![]
+ };
+
+ let get_sort_keys = |expr: &Expr| match expr {
+ Expr::WindowFunction(WindowFunction {
+ ref partition_by,
+ ref order_by,
+ ..
+ }) => generate_sort_key(partition_by, order_by),
+ Expr::Alias(Alias { expr, .. }) => {
+ // Convert &Box<T> to &T
+ match &**expr {
+ Expr::WindowFunction(WindowFunction {
+ ref partition_by,
+ ref order_by,
+ ..
+ }) => generate_sort_key(partition_by, order_by),
+ _ => unreachable!(),
+ }
}
+ _ => unreachable!(),
+ };
+ let sort_keys = get_sort_keys(&window_expr[0])?;
+ if window_expr.len() > 1 {
+ debug_assert!(
+ window_expr[1..]
+ .iter()
+ .all(|expr| get_sort_keys(expr).unwrap() ==
sort_keys),
+ "all window expressions shall have the same sort
keys, as guaranteed by logical planning"
+ );
+ }
- let input_exec = self.create_initial_plan(input,
session_state).await?;
+ let logical_schema = node.schema();
+ let window_expr = window_expr
+ .iter()
+ .map(|e| {
+ create_window_expr(
+ e,
+ logical_schema,
+ session_state.execution_props(),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
- // at this moment we are guaranteed by the logical planner
- // to have all the window_expr to have equal sort key
- let partition_keys =
window_expr_common_partition_keys(window_expr)?;
+ let uses_bounded_memory =
+ window_expr.iter().all(|e| e.uses_bounded_memory());
+ // If all window expressions can run with bounded memory,
+ // choose the bounded window variant:
+ if uses_bounded_memory {
+ Arc::new(BoundedWindowAggExec::try_new(
+ window_expr,
+ input_exec,
+ physical_partition_keys,
+ InputOrderMode::Sorted,
+ )?)
+ } else {
+ Arc::new(WindowAggExec::try_new(
+ window_expr,
+ input_exec,
+ physical_partition_keys,
+ )?)
+ }
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ ..
+ }) => {
+ // Initially need to perform the aggregate and then merge the
partitions
+ let input_exec = children.one()?;
+ let physical_input_schema = input_exec.schema();
+ let logical_input_schema = input.as_ref().schema();
+
+ let groups = self.create_grouping_physical_expr(
+ group_expr,
+ logical_input_schema,
+ &physical_input_schema,
+ session_state,
+ )?;
- let can_repartition = !partition_keys.is_empty()
- && session_state.config().target_partitions() > 1
- &&
session_state.config().repartition_window_functions();
+ let agg_filter = aggr_expr
+ .iter()
+ .map(|e| {
+ create_aggregate_expr_and_maybe_filter(
+ e,
+ logical_input_schema,
+ &physical_input_schema,
+ session_state.execution_props(),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
- let physical_partition_keys = if can_repartition
- {
- partition_keys
+ let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>,
Vec<_>) =
+ multiunzip(agg_filter);
+
+ let initial_aggr = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ groups.clone(),
+ aggregates.clone(),
+ filters.clone(),
+ input_exec,
+ physical_input_schema.clone(),
+ )?);
+
+ // update group column indices based on partial aggregate plan
evaluation
+ let final_group: Vec<Arc<dyn PhysicalExpr>> =
+ initial_aggr.output_group_expr();
+
+ let can_repartition = !groups.is_empty()
+ && session_state.config().target_partitions() > 1
+ && session_state.config().repartition_aggregations();
+
+ // Some aggregators may be modified during initialization for
+ // optimization purposes. For example, a FIRST_VALUE may turn
+ // into a LAST_VALUE with the reverse ordering requirement.
+ // To reflect such changes to subsequent stages, use the
updated
+ // `AggregateExpr`/`PhysicalSortExpr` objects.
+ let updated_aggregates = initial_aggr.aggr_expr().to_vec();
+
+ let next_partition_mode = if can_repartition {
+ // construct a second aggregation with
'AggregateMode::FinalPartitioned'
+ AggregateMode::FinalPartitioned
+ } else {
+ // construct a second aggregation, keeping the final
column name equal to the
+ // first aggregation and the expressions corresponding to
the respective aggregate
+ AggregateMode::Final
+ };
+
+ let final_grouping_set = PhysicalGroupBy::new_single(
+ final_group
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| (expr.clone(),
groups.expr()[i].1.clone()))
+ .collect(),
+ );
+
+ Arc::new(AggregateExec::try_new(
+ next_partition_mode,
+ final_grouping_set,
+ updated_aggregates,
+ filters,
+ initial_aggr,
+ physical_input_schema.clone(),
+ )?)
+ }
+ LogicalPlan::Projection(Projection { input, expr, .. }) => self
+ .create_project_physical_exec(
+ session_state,
+ children.one()?,
+ input,
+ expr,
+ )?,
+ LogicalPlan::Filter(Filter {
+ predicate, input, ..
+ }) => {
+ let physical_input = children.one()?;
+ let input_dfschema = input.schema();
+
+ let runtime_expr =
+ self.create_physical_expr(predicate, input_dfschema,
session_state)?;
+ let selectivity = session_state
+ .config()
+ .options()
+ .optimizer
+ .default_filter_selectivity;
+ let filter = FilterExec::try_new(runtime_expr,
physical_input)?;
+ Arc::new(filter.with_default_selectivity(selectivity)?)
+ }
+ LogicalPlan::Repartition(Repartition {
+ input,
+ partitioning_scheme,
+ }) => {
+ let physical_input = children.one()?;
+ let input_dfschema = input.as_ref().schema();
+ let physical_partitioning = match partitioning_scheme {
+ LogicalPartitioning::RoundRobinBatch(n) => {
+ Partitioning::RoundRobinBatch(*n)
+ }
+ LogicalPartitioning::Hash(expr, n) => {
+ let runtime_expr = expr
.iter()
.map(|e| {
self.create_physical_expr(
e,
- input.schema(),
+ input_dfschema,
session_state,
)
})
- .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
- } else {
- vec![]
- };
-
- let get_sort_keys = |expr: &Expr| match expr {
- Expr::WindowFunction(WindowFunction{
- ref partition_by,
- ref order_by,
- ..
- }) => generate_sort_key(partition_by, order_by),
- Expr::Alias(Alias{expr,..}) => {
- // Convert &Box<T> to &T
- match &**expr {
- Expr::WindowFunction(WindowFunction{
- ref partition_by,
- ref order_by,
- ..}) => generate_sort_key(partition_by,
order_by),
- _ => unreachable!(),
- }
- }
- _ => unreachable!(),
- };
- let sort_keys = get_sort_keys(&window_expr[0])?;
- if window_expr.len() > 1 {
- debug_assert!(
- window_expr[1..]
- .iter()
- .all(|expr| get_sort_keys(expr).unwrap() ==
sort_keys),
- "all window expressions shall have the same sort
keys, as guaranteed by logical planning"
+ .collect::<Result<Vec<_>>>()?;
+ Partitioning::Hash(runtime_expr, *n)
+ }
+ LogicalPartitioning::DistributeBy(_) => {
+ return not_impl_err!(
+ "Physical plan does not support DistributeBy
partitioning"
);
}
-
- let logical_schema = logical_plan.schema();
- let window_expr = window_expr
- .iter()
- .map(|e| {
- create_window_expr(
- e,
- logical_schema,
- session_state.execution_props(),
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- let uses_bounded_memory = window_expr
- .iter()
- .all(|e| e.uses_bounded_memory());
- // If all window expressions can run with bounded memory,
- // choose the bounded window variant:
- Ok(if uses_bounded_memory {
- Arc::new(BoundedWindowAggExec::try_new(
- window_expr,
- input_exec,
- physical_partition_keys,
- InputOrderMode::Sorted,
- )?)
+ };
+ Arc::new(RepartitionExec::try_new(
+ physical_input,
+ physical_partitioning,
+ )?)
+ }
+ LogicalPlan::Sort(Sort {
+ expr, input, fetch, ..
+ }) => {
+ let physical_input = children.one()?;
+ let input_dfschema = input.as_ref().schema();
+ let sort_expr = create_physical_sort_exprs(
+ expr,
+ input_dfschema,
+ session_state.execution_props(),
+ )?;
+ let new_sort =
+ SortExec::new(sort_expr,
physical_input).with_fetch(*fetch);
+ Arc::new(new_sort)
+ }
+ LogicalPlan::Subquery(_) => todo!(),
+ LogicalPlan::SubqueryAlias(_) => children.one()?,
+ LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
+ let input = children.one()?;
+
+ // GlobalLimitExec requires a single partition for input
+ let input = if input.output_partitioning().partition_count()
== 1 {
+ input
+ } else {
+ // Apply a LocalLimitExec to each partition. The optimizer
will also insert
+ // a CoalescePartitionsExec between the GlobalLimitExec
and LocalLimitExec
+ if let Some(fetch) = fetch {
+ Arc::new(LocalLimitExec::new(input, *fetch + skip))
} else {
- Arc::new(WindowAggExec::try_new(
- window_expr,
- input_exec,
- physical_partition_keys,
- )?)
+ input
+ }
+ };
+
+ Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
+ }
+ LogicalPlan::Unnest(Unnest {
+ columns,
+ schema,
+ options,
+ ..
+ }) => {
+ let input = children.one()?;
+ let column_execs = columns
+ .iter()
+ .map(|column| {
+ schema
+ .index_of_column(column)
+ .map(|idx| Column::new(&column.name, idx))
})
- }
- LogicalPlan::Aggregate(Aggregate {
+ .collect::<Result<_>>()?;
+ let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+ Arc::new(UnnestExec::new(
input,
- group_expr,
- aggr_expr,
- ..
- }) => {
- // Initially need to perform the aggregate and then merge
the partitions
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- let physical_input_schema = input_exec.schema();
- let logical_input_schema = input.as_ref().schema();
-
- let groups = self.create_grouping_physical_expr(
- group_expr,
- logical_input_schema,
- &physical_input_schema,
- session_state)?;
-
- let agg_filter = aggr_expr
- .iter()
- .map(|e| {
- create_aggregate_expr_and_maybe_filter(
- e,
- logical_input_schema,
- &physical_input_schema,
- session_state.execution_props(),
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>,
Vec<_>) = multiunzip(agg_filter);
+ column_execs,
+ schema,
+ options.clone(),
+ ))
+ }
- let initial_aggr = Arc::new(AggregateExec::try_new(
- AggregateMode::Partial,
- groups.clone(),
- aggregates.clone(),
- filters.clone(),
- input_exec,
- physical_input_schema.clone(),
+ // 2 Children
+ LogicalPlan::Join(Join {
+ left,
+ right,
+ on: keys,
+ filter,
+ join_type,
+ null_equals_null,
+ schema: join_schema,
+ ..
+ }) => {
+ let null_equals_null = *null_equals_null;
+
+ let [physical_left, physical_right] = children.two()?;
+
+ // If join has expression equijoin keys, add physical
projection.
+ let has_expr_join_key = keys.iter().any(|(l, r)| {
+ !(matches!(l, Expr::Column(_)) && matches!(r,
Expr::Column(_)))
+ });
+ let (new_logical, physical_left, physical_right) = if
has_expr_join_key {
+ // TODO: Can we extract this transformation to somewhere
before physical plan
+ // creation?
+ let (left_keys, right_keys): (Vec<_>, Vec<_>) =
+ keys.iter().cloned().unzip();
+
+ let (left, left_col_keys, left_projected) =
+ wrap_projection_for_join_if_necessary(
+ &left_keys,
+ left.as_ref().clone(),
+ )?;
+ let (right, right_col_keys, right_projected) =
+ wrap_projection_for_join_if_necessary(
+ &right_keys,
+ right.as_ref().clone(),
+ )?;
+ let column_on = (left_col_keys, right_col_keys);
+
+ let left = Arc::new(left);
+ let right = Arc::new(right);
+ let new_join =
LogicalPlan::Join(Join::try_new_with_project_input(
+ node,
+ left.clone(),
+ right.clone(),
+ column_on,
)?);
- // update group column indices based on partial aggregate
plan evaluation
- let final_group: Vec<Arc<dyn PhysicalExpr>> =
initial_aggr.output_group_expr();
-
- let can_repartition = !groups.is_empty()
- && session_state.config().target_partitions() > 1
- && session_state.config().repartition_aggregations();
-
- // Some aggregators may be modified during initialization
for
- // optimization purposes. For example, a FIRST_VALUE may
turn
- // into a LAST_VALUE with the reverse ordering requirement.
- // To reflect such changes to subsequent stages, use the
updated
- // `AggregateExpr`/`PhysicalSortExpr` objects.
- let updated_aggregates = initial_aggr.aggr_expr().to_vec();
-
- let next_partition_mode = if can_repartition {
- // construct a second aggregation with
'AggregateMode::FinalPartitioned'
- AggregateMode::FinalPartitioned
- } else {
- // construct a second aggregation, keeping the final
column name equal to the
- // first aggregation and the expressions corresponding
to the respective aggregate
- AggregateMode::Final
+ // If inputs were projected then create ExecutionPlan for
these new
+ // LogicalPlan nodes.
+ let physical_left = match (left_projected, left.as_ref()) {
+ // If left_projected is true we are guaranteed that
left is a Projection
+ (
+ true,
+ LogicalPlan::Projection(Projection { input, expr,
.. }),
+ ) => self.create_project_physical_exec(
+ session_state,
+ physical_left,
+ input,
+ expr,
+ )?,
+ _ => physical_left,
+ };
+ let physical_right = match (right_projected,
right.as_ref()) {
+ // If right_projected is true we are guaranteed that
right is a Projection
+ (
+ true,
+ LogicalPlan::Projection(Projection { input, expr,
.. }),
+ ) => self.create_project_physical_exec(
+ session_state,
+ physical_right,
+ input,
+ expr,
+ )?,
+ _ => physical_right,
};
- let final_grouping_set = PhysicalGroupBy::new_single(
- final_group
+ // Remove temporary projected columns
+ if left_projected || right_projected {
+ let final_join_result = join_schema
.iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(),
groups.expr()[i].1.clone()))
- .collect()
- );
-
- Ok(Arc::new(AggregateExec::try_new(
- next_partition_mode,
- final_grouping_set,
- updated_aggregates,
- filters,
- initial_aggr,
- physical_input_schema.clone(),
- )?))
- }
- LogicalPlan::Projection(Projection { input, expr, .. }) => {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- let input_schema = input.as_ref().schema();
-
- let physical_exprs = expr
- .iter()
- .map(|e| {
- // For projections, SQL planner and logical plan
builder may convert user
- // provided expressions into logical Column
expressions if their results
- // are already provided from the input plans.
Because we work with
- // qualified columns in logical plane, derived
columns involve operators or
- // functions will contain qualifiers as well. This
will result in logical
- // columns with names like `SUM(t1.c1)`, `t1.c1 +
t1.c2`, etc.
- //
- // If we run these logical columns through
physical_name function, we will
- // get physical names with column qualifiers,
which violates DataFusion's
- // field name semantics. To account for this, we
need to derive the
- // physical name from physical input instead.
- //
- // This depends on the invariant that logical
schema field index MUST match
- // with physical schema field index.
- let physical_name = if let Expr::Column(col) = e {
- match input_schema.index_of_column(col) {
- Ok(idx) => {
- // index physical field using logical
field index
-
Ok(input_exec.schema().field(idx).name().to_string())
- }
- // logical column is not a derived column,
safe to pass along to
- // physical_name
- Err(_) => physical_name(e),
- }
- } else {
- physical_name(e)
- };
-
- tuple_err((
- self.create_physical_expr(
- e,
- input_schema,
- session_state,
- ),
- physical_name,
- ))
- })
- .collect::<Result<Vec<_>>>()?;
+ .map(|(qualifier, field)| {
+ Expr::Column(datafusion_common::Column::from((
+ qualifier,
+ field.as_ref(),
+ )))
+ })
+ .collect::<Vec<_>>();
+ let projection =
LogicalPlan::Projection(Projection::try_new(
+ final_join_result,
+ Arc::new(new_join),
+ )?);
+ // LogicalPlan mutated
+ (Cow::Owned(projection), physical_left, physical_right)
+ } else {
+ // LogicalPlan mutated
+ (Cow::Owned(new_join), physical_left, physical_right)
+ }
+ } else {
+ // LogicalPlan unchanged
+ (Cow::Borrowed(node), physical_left, physical_right)
+ };
+
+ // Retrieving new left/right and join keys (in case plan was
mutated above)
+ let (left, right, keys, new_project) = match
new_logical.as_ref() {
+ LogicalPlan::Projection(Projection { input, expr, .. }) =>
{
+ if let LogicalPlan::Join(Join {
+ left, right, on, ..
+ }) = input.as_ref()
+ {
+ (left, right, on, Some((input, expr)))
+ } else {
+ unreachable!()
+ }
+ }
+ LogicalPlan::Join(Join {
+ left, right, on, ..
+ }) => (left, right, on, None),
+ // Should either be the original Join, or Join with a
Projection on top
+ _ => unreachable!(),
+ };
+
+ // All equi-join keys are columns now, create physical join
plan
+ let left_df_schema = left.schema();
+ let right_df_schema = right.schema();
+ let execution_props = session_state.execution_props();
+ let join_on = keys
+ .iter()
+ .map(|(l, r)| {
+ let l = create_physical_expr(l, left_df_schema,
execution_props)?;
+ let r =
+ create_physical_expr(r, right_df_schema,
execution_props)?;
+ Ok((l, r))
+ })
+ .collect::<Result<join_utils::JoinOn>>()?;
- Ok(Arc::new(ProjectionExec::try_new(
- physical_exprs,
- input_exec,
- )?))
- }
- LogicalPlan::Filter(filter) => {
- let physical_input =
self.create_initial_plan(&filter.input, session_state).await?;
- let input_dfschema = filter.input.schema();
+ let join_filter = match filter {
+ Some(expr) => {
+ // Extract columns from filter expression and saved in
a HashSet
+ let cols = expr.to_columns()?;
- let runtime_expr = self.create_physical_expr(
- &filter.predicate,
- input_dfschema,
- session_state,
- )?;
- let selectivity =
session_state.config().options().optimizer.default_filter_selectivity;
- let filter = FilterExec::try_new(runtime_expr,
physical_input)?;
- Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
- }
- LogicalPlan::Union(Union { inputs, schema: _ }) => {
- let physical_plans =
self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()),
session_state).await?;
+ // Collect left & right field indices, the field
indices are sorted in ascending order
+ let left_field_indices = cols
+ .iter()
+ .filter_map(|c| match
left_df_schema.index_of_column(c) {
+ Ok(idx) => Some(idx),
+ _ => None,
+ })
+ .sorted()
+ .collect::<Vec<_>>();
+ let right_field_indices = cols
+ .iter()
+ .filter_map(|c| match
right_df_schema.index_of_column(c) {
+ Ok(idx) => Some(idx),
+ _ => None,
+ })
+ .sorted()
+ .collect::<Vec<_>>();
- Ok(Arc::new(UnionExec::new(physical_plans)))
- }
- LogicalPlan::Repartition(Repartition {
- input,
- partitioning_scheme,
- }) => {
- let physical_input = self.create_initial_plan(input,
session_state).await?;
- let input_dfschema = input.as_ref().schema();
- let physical_partitioning = match partitioning_scheme {
- LogicalPartitioning::RoundRobinBatch(n) => {
- Partitioning::RoundRobinBatch(*n)
- }
- LogicalPartitioning::Hash(expr, n) => {
- let runtime_expr = expr
- .iter()
- .map(|e| {
- self.create_physical_expr(
- e,
- input_dfschema,
- session_state,
+ // Collect DFFields and Fields required for
intermediate schemas
+ let (filter_df_fields, filter_fields): (Vec<_>,
Vec<_>) =
+ left_field_indices
+ .clone()
+ .into_iter()
+ .map(|i| {
+ (
+ left_df_schema.qualified_field(i),
+
physical_left.schema().field(i).clone(),
)
})
- .collect::<Result<Vec<_>>>()?;
- Partitioning::Hash(runtime_expr, *n)
- }
- LogicalPartitioning::DistributeBy(_) => {
- return not_impl_err!("Physical plan does not
support DistributeBy partitioning");
- }
- };
- Ok(Arc::new(RepartitionExec::try_new(
- physical_input,
- physical_partitioning,
- )?))
- }
- LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
- let physical_input = self.create_initial_plan(input,
session_state).await?;
- let input_dfschema = input.as_ref().schema();
- let sort_expr = create_physical_sort_exprs(expr,
input_dfschema, session_state.execution_props())?;
- let new_sort = SortExec::new(sort_expr, physical_input)
- .with_fetch(*fetch);
- Ok(Arc::new(new_sort))
- }
- LogicalPlan::Join(Join {
- left,
- right,
- on: keys,
- filter,
- join_type,
- null_equals_null,
- schema: join_schema,
- ..
- }) => {
- let null_equals_null = *null_equals_null;
-
- // If join has expression equijoin keys, add physical
projection.
- let has_expr_join_key = keys.iter().any(|(l, r)| {
- !(matches!(l, Expr::Column(_))
- && matches!(r, Expr::Column(_)))
- });
- if has_expr_join_key {
- // Logic extracted into a function here as subsequent
recursive create_initial_plan()
- // call can cause a stack overflow for a large number
of joins.
- //
- // See #9962 and #1047 for detailed explanation.
- let join_plan =
project_expr_join_keys(keys,left,right,logical_plan,join_schema)?;
- return self
- .create_initial_plan(&join_plan, session_state)
- .await;
- }
-
- // All equi-join keys are columns now, create physical
join plan
- let left_right =
self.create_initial_plan_multi([left.as_ref(), right.as_ref()],
session_state).await?;
- let [physical_left, physical_right]: [Arc<dyn
ExecutionPlan>; 2] = left_right.try_into().map_err(|_|
DataFusionError::Internal("`create_initial_plan_multi` is
broken".to_string()))?;
- let left_df_schema = left.schema();
- let right_df_schema = right.schema();
- let execution_props = session_state.execution_props();
- let join_on = keys
- .iter()
- .map(|(l, r)| {
- let l = create_physical_expr(
- l,
- left_df_schema,
- execution_props
- )?;
- let r = create_physical_expr(
- r,
- right_df_schema,
- execution_props
- )?;
- Ok((l, r))
- })
- .collect::<Result<join_utils::JoinOn>>()?;
-
- let join_filter = match filter {
- Some(expr) => {
- // Extract columns from filter expression and
saved in a HashSet
- let cols = expr.to_columns()?;
-
- // Collect left & right field indices, the field
indices are sorted in ascending order
- let left_field_indices = cols.iter()
- .filter_map(|c| match
left_df_schema.index_of_column(c) {
- Ok(idx) => Some(idx),
- _ => None,
- }).sorted()
- .collect::<Vec<_>>();
- let right_field_indices = cols.iter()
- .filter_map(|c| match
right_df_schema.index_of_column(c) {
- Ok(idx) => Some(idx),
- _ => None,
- }).sorted()
- .collect::<Vec<_>>();
-
- // Collect DFFields and Fields required for
intermediate schemas
- let (filter_df_fields, filter_fields): (Vec<_>,
Vec<_>) = left_field_indices.clone()
- .into_iter()
- .map(|i| (
- left_df_schema.qualified_field(i),
- physical_left.schema().field(i).clone(),
- ))
- .chain(
- right_field_indices.clone()
- .into_iter()
- .map(|i| (
- right_df_schema.qualified_field(i),
-
physical_right.schema().field(i).clone(),
- ))
- )
+
.chain(right_field_indices.clone().into_iter().map(|i| {
+ (
+ right_df_schema.qualified_field(i),
+
physical_right.schema().field(i).clone(),
+ )
+ }))
.unzip();
- let filter_df_fields =
filter_df_fields.into_iter().map(|(qualifier, field)| (qualifier.cloned(),
Arc::new(field.clone()))).collect();
-
- // Construct intermediate schemas used for
filtering data and
- // convert logical expression to physical
according to filter schema
- let filter_df_schema =
DFSchema::new_with_metadata(filter_df_fields, HashMap::new())?;
- let filter_schema =
Schema::new_with_metadata(filter_fields, HashMap::new());
- let filter_expr = create_physical_expr(
- expr,
- &filter_df_schema,
- session_state.execution_props(),
- )?;
- let column_indices =
join_utils::JoinFilter::build_column_indices(left_field_indices,
right_field_indices);
-
- Some(join_utils::JoinFilter::new(
- filter_expr,
- column_indices,
- filter_schema,
- ))
- }
- _ => None
- };
-
- let prefer_hash_join =
session_state.config_options().optimizer.prefer_hash_join;
+ let filter_df_fields = filter_df_fields
+ .into_iter()
+ .map(|(qualifier, field)| {
+ (qualifier.cloned(), Arc::new(field.clone()))
+ })
+ .collect();
+
+ // Construct intermediate schemas used for filtering
data and
+ // convert logical expression to physical according to
filter schema
+ let filter_df_schema = DFSchema::new_with_metadata(
+ filter_df_fields,
+ HashMap::new(),
+ )?;
+ let filter_schema =
+ Schema::new_with_metadata(filter_fields,
HashMap::new());
+ let filter_expr = create_physical_expr(
+ expr,
+ &filter_df_schema,
+ session_state.execution_props(),
+ )?;
+ let column_indices =
join_utils::JoinFilter::build_column_indices(
+ left_field_indices,
+ right_field_indices,
+ );
- if join_on.is_empty() {
- // there is no equal join condition, use the nested
loop join
- // TODO optimize the plan, and use the config of
`target_partitions` and `repartition_joins`
- Ok(Arc::new(NestedLoopJoinExec::try_new(
- physical_left,
- physical_right,
- join_filter,
- join_type,
- )?))
- } else if session_state.config().target_partitions() > 1
- && session_state.config().repartition_joins()
- && !prefer_hash_join
- {
- // Use SortMergeJoin if hash join is not preferred
- // Sort-Merge join support currently is experimental
-
- let join_on_len = join_on.len();
- Ok(Arc::new(SortMergeJoinExec::try_new(
- physical_left,
- physical_right,
- join_on,
- join_filter,
- *join_type,
- vec![SortOptions::default(); join_on_len],
- null_equals_null,
- )?))
- } else if session_state.config().target_partitions() > 1
- && session_state.config().repartition_joins()
- && prefer_hash_join {
- let partition_mode = {
- if session_state.config().collect_statistics() {
- PartitionMode::Auto
- } else {
- PartitionMode::Partitioned
- }
- };
- Ok(Arc::new(HashJoinExec::try_new(
- physical_left,
- physical_right,
- join_on,
- join_filter,
- join_type,
- None,
- partition_mode,
- null_equals_null,
- )?))
- } else {
- Ok(Arc::new(HashJoinExec::try_new(
- physical_left,
- physical_right,
- join_on,
- join_filter,
- join_type,
- None,
- PartitionMode::CollectLeft,
- null_equals_null,
- )?))
+ Some(join_utils::JoinFilter::new(
+ filter_expr,
+ column_indices,
+ filter_schema,
+ ))
}
- }
- LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
- let left_right =
self.create_initial_plan_multi([left.as_ref(), right.as_ref()],
session_state).await?;
- let [left, right]: [Arc<dyn ExecutionPlan>; 2] =
left_right.try_into().map_err(|_|
DataFusionError::Internal("`create_initial_plan_multi` is
broken".to_string()))?;
- Ok(Arc::new(CrossJoinExec::new(left, right)))
- }
- LogicalPlan::Subquery(_) => todo!(),
- LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row: false,
- schema,
- }) => Ok(Arc::new(EmptyExec::new(
- SchemaRef::new(schema.as_ref().to_owned().into()),
- ))),
- LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row: true,
- schema,
- }) => Ok(Arc::new(PlaceholderRowExec::new(
- SchemaRef::new(schema.as_ref().to_owned().into()),
- ))),
- LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
- self.create_initial_plan(input, session_state).await
- }
- LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
- let input = self.create_initial_plan(input,
session_state).await?;
-
- // GlobalLimitExec requires a single partition for input
- let input = if
input.output_partitioning().partition_count() == 1 {
- input
- } else {
- // Apply a LocalLimitExec to each partition. The
optimizer will also insert
- // a CoalescePartitionsExec between the
GlobalLimitExec and LocalLimitExec
- if let Some(fetch) = fetch {
- Arc::new(LocalLimitExec::new(input, *fetch + skip))
+ _ => None,
+ };
+
+ let prefer_hash_join =
+ session_state.config_options().optimizer.prefer_hash_join;
+
+ let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
+ // there is no equal join condition, use the nested loop
join
+ // TODO optimize the plan, and use the config of
`target_partitions` and `repartition_joins`
+ Arc::new(NestedLoopJoinExec::try_new(
+ physical_left,
+ physical_right,
+ join_filter,
+ join_type,
+ )?)
+ } else if session_state.config().target_partitions() > 1
+ && session_state.config().repartition_joins()
+ && !prefer_hash_join
+ {
+ // Use SortMergeJoin if hash join is not preferred
+ // Sort-Merge join support currently is experimental
+
+ let join_on_len = join_on.len();
+ Arc::new(SortMergeJoinExec::try_new(
+ physical_left,
+ physical_right,
+ join_on,
+ join_filter,
+ *join_type,
+ vec![SortOptions::default(); join_on_len],
+ null_equals_null,
+ )?)
+ } else if session_state.config().target_partitions() > 1
+ && session_state.config().repartition_joins()
+ && prefer_hash_join
+ {
+ let partition_mode = {
+ if session_state.config().collect_statistics() {
+ PartitionMode::Auto
} else {
- input
+ PartitionMode::Partitioned
}
};
-
- Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
- }
- LogicalPlan::Unnest(Unnest { input, columns, schema, options
}) => {
- let input = self.create_initial_plan(input,
session_state).await?;
- let column_execs = columns.iter().map(|column| {
- schema.index_of_column(column).map(|idx|
Column::new(&column.name, idx))
- }).collect::<Result<_>>()?;
- let schema =
SchemaRef::new(schema.as_ref().to_owned().into());
- Ok(Arc::new(UnnestExec::new(input, column_execs, schema,
options.clone())))
- }
- LogicalPlan::Ddl(ddl) => {
- // There is no default plan for DDl statements --
- // it must be handled at a higher level (so that
- // the appropriate table can be registered with
- // the context)
- let name = ddl.name();
- not_impl_err!(
- "Unsupported logical plan: {name}"
- )
- }
- LogicalPlan::Prepare(_) => {
- // There is no default plan for "PREPARE" -- it must be
- // handled at a higher level (so that the appropriate
- // statement can be prepared)
- not_impl_err!(
- "Unsupported logical plan: Prepare"
- )
+ Arc::new(HashJoinExec::try_new(
+ physical_left,
+ physical_right,
+ join_on,
+ join_filter,
+ join_type,
+ None,
+ partition_mode,
+ null_equals_null,
+ )?)
+ } else {
+ Arc::new(HashJoinExec::try_new(
+ physical_left,
+ physical_right,
+ join_on,
+ join_filter,
+ join_type,
+ None,
+ PartitionMode::CollectLeft,
+ null_equals_null,
+ )?)
+ };
+
+ // If plan was mutated previously then need to create the
ExecutionPlan
+ // for the new Projection that was applied on top.
+ if let Some((input, expr)) = new_project {
+ self.create_project_physical_exec(session_state, join,
input, expr)?
+ } else {
+ join
}
- LogicalPlan::Dml(dml) => {
- // DataFusion is a read-only query engine, but also a
library, so consumers may implement this
- not_impl_err!(
- "Unsupported logical plan: Dml({0})", dml.op
- )
- }
- LogicalPlan::Statement(statement) => {
- // DataFusion is a read-only query engine, but also a
library, so consumers may implement this
- let name = statement.name();
- not_impl_err!(
- "Unsupported logical plan: Statement({name})"
- )
- }
- LogicalPlan::DescribeTable(DescribeTable { schema,
output_schema}) => {
- let output_schema: Schema = output_schema.as_ref().into();
- self.plan_describe(schema.clone(), Arc::new(output_schema))
- }
- LogicalPlan::Explain(_) => internal_err!(
- "Unsupported logical plan: Explain must be root of the
plan"
- ),
- LogicalPlan::Distinct(_) => {
- internal_err!(
- "Unsupported logical plan: Distinct should be replaced
to Aggregate"
- )
- }
- LogicalPlan::Analyze(_) => internal_err!(
- "Unsupported logical plan: Analyze must be root of the
plan"
- ),
- LogicalPlan::Extension(e) => {
- let physical_inputs =
self.create_initial_plan_multi(e.node.inputs(), session_state).await?;
+ }
+ LogicalPlan::CrossJoin(_) => {
+ let [left, right] = children.two()?;
+ Arc::new(CrossJoinExec::new(left, right))
+ }
+ LogicalPlan::RecursiveQuery(RecursiveQuery {
+ name, is_distinct, ..
+ }) => {
+ let [static_term, recursive_term] = children.two()?;
+ Arc::new(RecursiveQueryExec::try_new(
+ name.clone(),
+ static_term,
+ recursive_term,
+ *is_distinct,
+ )?)
+ }
- let mut maybe_plan = None;
- for planner in &self.extension_planners {
- if maybe_plan.is_some() {
- break;
- }
+ // N Children
+ LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())),
+ LogicalPlan::Extension(Extension { node }) => {
+ let mut maybe_plan = None;
+ let children = children.vec();
+ for planner in &self.extension_planners {
+ if maybe_plan.is_some() {
+ break;
+ }
- let logical_input = e.node.inputs();
- maybe_plan = planner.plan_extension(
+ let logical_input = node.inputs();
+ maybe_plan = planner
+ .plan_extension(
self,
- e.node.as_ref(),
+ node.as_ref(),
&logical_input,
- &physical_inputs,
+ &children,
session_state,
- ).await?;
- }
+ )
+ .await?;
+ }
- let plan = match maybe_plan {
+ let plan = match maybe_plan {
Some(v) => Ok(v),
- _ => plan_err!("No installed planner was able to
convert the custom node to an execution plan: {:?}", e.node)
+ _ => plan_err!("No installed planner was able to
convert the custom node to an execution plan: {:?}", node)
}?;
- // Ensure the ExecutionPlan's schema matches the
- // declared logical schema to catch and warn about
- // logic errors when creating user defined plans.
- if !e.node.schema().matches_arrow_schema(&plan.schema()) {
- plan_err!(
+ // Ensure the ExecutionPlan's schema matches the
+ // declared logical schema to catch and warn about
+ // logic errors when creating user defined plans.
+ if !node.schema().matches_arrow_schema(&plan.schema()) {
+ return plan_err!(
"Extension planner for {:?} created an
ExecutionPlan with mismatched schema. \
LogicalPlan schema: {:?}, ExecutionPlan schema:
{:?}",
- e.node, e.node.schema(), plan.schema()
- )
- } else {
- Ok(plan)
- }
- }
- LogicalPlan::RecursiveQuery(RecursiveQuery { name,
static_term, recursive_term, is_distinct,.. }) => {
- let static_term = self.create_initial_plan(static_term,
session_state).await?;
- let recursive_term =
self.create_initial_plan(recursive_term, session_state).await?;
- Ok(Arc::new(RecursiveQueryExec::try_new(name.clone(),
static_term, recursive_term, *is_distinct)?))
+ node, node.schema(), plan.schema()
+ );
+ } else {
+ plan
}
- };
- exec_plan
- }.boxed()
+ }
+
+ // Other
+ LogicalPlan::Statement(statement) => {
+ // DataFusion is a read-only query engine, but also a library,
so consumers may implement this
+ let name = statement.name();
+ return not_impl_err!("Unsupported logical plan:
Statement({name})");
+ }
+ LogicalPlan::Prepare(_) => {
+ // There is no default plan for "PREPARE" -- it must be
+ // handled at a higher level (so that the appropriate
+ // statement can be prepared)
+ return not_impl_err!("Unsupported logical plan: Prepare");
+ }
+ LogicalPlan::Dml(dml) => {
+ // DataFusion is a read-only query engine, but also a library,
so consumers may implement this
+ return not_impl_err!("Unsupported logical plan: Dml({0})",
dml.op);
+ }
+ LogicalPlan::Ddl(ddl) => {
+ // There is no default plan for DDl statements --
+ // it must be handled at a higher level (so that
+ // the appropriate table can be registered with
+ // the context)
+ let name = ddl.name();
+ return not_impl_err!("Unsupported logical plan: {name}");
+ }
+ LogicalPlan::Explain(_) => {
+ return internal_err!(
+ "Unsupported logical plan: Explain must be root of the
plan"
+ )
+ }
+ LogicalPlan::Distinct(_) => {
+ return internal_err!(
+ "Unsupported logical plan: Distinct should be replaced to
Aggregate"
+ )
+ }
+ LogicalPlan::Analyze(_) => {
+ return internal_err!(
+ "Unsupported logical plan: Analyze must be root of the
plan"
+ )
+ }
+ };
+ Ok(exec_node)
}
fn create_grouping_physical_expr(
@@ -1942,6 +2222,59 @@ impl DefaultPhysicalPlanner {
let mem_exec = MemoryExec::try_new(&partitions, schema, projection)?;
Ok(Arc::new(mem_exec))
}
+
+ fn create_project_physical_exec(
+ &self,
+ session_state: &SessionState,
+ input_exec: Arc<dyn ExecutionPlan>,
+ input: &Arc<LogicalPlan>,
+ expr: &[Expr],
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let input_schema = input.as_ref().schema();
+
+ let physical_exprs = expr
+ .iter()
+ .map(|e| {
+ // For projections, SQL planner and logical plan builder may
convert user
+ // provided expressions into logical Column expressions if
their results
+ // are already provided from the input plans. Because we work
with
+ // qualified columns in logical plane, derived columns involve
operators or
+ // functions will contain qualifiers as well. This will result
in logical
+ // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
+ //
+ // If we run these logical columns through physical_name
function, we will
+ // get physical names with column qualifiers, which violates
DataFusion's
+ // field name semantics. To account for this, we need to
derive the
+ // physical name from physical input instead.
+ //
+ // This depends on the invariant that logical schema field
index MUST match
+ // with physical schema field index.
+ let physical_name = if let Expr::Column(col) = e {
+ match input_schema.index_of_column(col) {
+ Ok(idx) => {
+ // index physical field using logical field index
+
Ok(input_exec.schema().field(idx).name().to_string())
+ }
+ // logical column is not a derived column, safe to
pass along to
+ // physical_name
+ Err(_) => physical_name(e),
+ }
+ } else {
+ physical_name(e)
+ };
+
+ tuple_err((
+ self.create_physical_expr(e, input_schema, session_state),
+ physical_name,
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ Ok(Arc::new(ProjectionExec::try_new(
+ physical_exprs,
+ input_exec,
+ )?))
+ }
}
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
@@ -1953,54 +2286,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) ->
Result<(T, R)> {
}
}
-/// Adding physical projection to join if has expression equijoin keys.
-fn project_expr_join_keys(
- keys: &[(Expr, Expr)],
- left: &Arc<LogicalPlan>,
- right: &Arc<LogicalPlan>,
- logical_plan: &LogicalPlan,
- join_schema: &Arc<DFSchema>,
-) -> Result<LogicalPlan> {
- let left_keys = keys.iter().map(|(l, _r)| l).cloned().collect::<Vec<_>>();
- let right_keys = keys.iter().map(|(_l, r)| r).cloned().collect::<Vec<_>>();
- let (left, right, column_on, added_project) = {
- let (left, left_col_keys, left_projected) =
- wrap_projection_for_join_if_necessary(
- left_keys.as_slice(),
- left.as_ref().clone(),
- )?;
- let (right, right_col_keys, right_projected) =
- wrap_projection_for_join_if_necessary(&right_keys,
right.as_ref().clone())?;
- (
- left,
- right,
- (left_col_keys, right_col_keys),
- left_projected || right_projected,
- )
- };
-
- let join_plan = LogicalPlan::Join(Join::try_new_with_project_input(
- logical_plan,
- Arc::new(left),
- Arc::new(right),
- column_on,
- )?);
-
- // Remove temporary projected columns
- if added_project {
- let final_join_result = join_schema
- .iter()
- .map(|(qualifier, field)| {
- Expr::Column(datafusion_common::Column::from((qualifier,
field.as_ref())))
- })
- .collect::<Vec<_>>();
- let projection = Projection::try_new(final_join_result,
Arc::new(join_plan))?;
- Ok(LogicalPlan::Projection(projection))
- } else {
- Ok(join_plan)
- }
-}
-
#[cfg(test)]
mod tests {
use std::any::Any;
diff --git a/datafusion/core/tests/tpcds_planning.rs
b/datafusion/core/tests/tpcds_planning.rs
index a4a85c6bd1..237771248f 100644
--- a/datafusion/core/tests/tpcds_planning.rs
+++ b/datafusion/core/tests/tpcds_planning.rs
@@ -846,7 +846,6 @@ async fn tpcds_physical_q63() -> Result<()> {
create_physical_plan(63).await
}
-#[ignore] // thread 'q64' has overflowed its stack
#[tokio::test]
async fn tpcds_physical_q64() -> Result<()> {
create_physical_plan(64).await