alamb commented on code in PR #10023:
URL: 
https://github.com/apache/arrow-datafusion/pull/10023#discussion_r1560815473


##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +550,966 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appeart, so later we need to 
reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => {
+                    let ready_children = 
Vec::with_capacity(node.inputs().len());
+                    let ready_children = Mutex::new(Some(ready_children));
+                    NodeState::TwoOrMoreChildren(ready_children)
+                }
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+        // Spawning tasks which will traverse leaf up to the root.
+        let tasks = flat_tree_leaf_indices
+            .into_iter()
+            .map(|index| self.task_helper(index, flat_tree.clone(), 
session_state));
+        let mut outputs = futures::stream::iter(tasks)
+            .buffer_unordered(max_concurrency)
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Ideally this never happens if we have a valid LogicalPlan tree
+        assert!(
+            outputs.len() == 1,
+            "Invalid physical plan created from logical plan"
+        );
+        let plan = outputs.pop().unwrap();
+        Ok(plan)
     }
 
-    /// Create a physical plan from a logical plan
-    fn create_initial_plan<'a>(
+    /// These tasks start at a leaf and traverse up the tree towards the root, 
building

Review Comment:
   this is very clever



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +550,966 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appeart, so later we need to 
reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => {
+                    let ready_children = 
Vec::with_capacity(node.inputs().len());
+                    let ready_children = Mutex::new(Some(ready_children));
+                    NodeState::TwoOrMoreChildren(ready_children)
+                }
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+        // Spawning tasks which will traverse leaf up to the root.
+        let tasks = flat_tree_leaf_indices
+            .into_iter()
+            .map(|index| self.task_helper(index, flat_tree.clone(), 
session_state));
+        let mut outputs = futures::stream::iter(tasks)
+            .buffer_unordered(max_concurrency)
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Ideally this never happens if we have a valid LogicalPlan tree
+        assert!(
+            outputs.len() == 1,
+            "Invalid physical plan created from logical plan"
+        );
+        let plan = outputs.pop().unwrap();
+        Ok(plan)
     }
 
-    /// Create a physical plan from a logical plan
-    fn create_initial_plan<'a>(
+    /// These tasks start at a leaf and traverse up the tree towards the root, 
building
+    /// an ExecutionPlan as they go. When they reach a node with two or more 
children,
+    /// they append their current result (a child of the parent node) to the 
children
+    /// vector, and if this is sufficient to create the parent then continues 
traversing
+    /// the tree to create nodes. Otherwise, the task terminates.
+    async fn task_helper<'a>(
         &'a self,
-        logical_plan: &'a LogicalPlan,
+        leaf_starter_index: usize,
+        flat_tree: Arc<Vec<LogicalNode<'a>>>,
         session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
-        async move {
-            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projection,
-                    filters,
-                    fetch,
-                    ..
-                }) => {
-                    let source = source_as_provider(source)?;
-                    // Remove all qualifiers from the scan as the provider
-                    // doesn't know (nor should care) how the relation was
-                    // referred to in the query
-                    let filters = unnormalize_cols(filters.iter().cloned());
-                    source.scan(session_state, projection.as_ref(), &filters, 
*fetch).await
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // We always start with a leaf, so can ignore status and pass empty 
children
+        let mut node = &flat_tree[leaf_starter_index];
+        let mut plan = self
+            .map_logical_node_to_physical(
+                node.node,
+                session_state,
+                ChildrenContainer::None,
+            )
+            .await?;
+        let mut current_index = leaf_starter_index;
+        // parent_index is None only for root
+        while let Some(parent_index) = node.parent_index {
+            node = &flat_tree[parent_index];
+            match &node.state {
+                NodeState::ZeroOrOneChild => {
+                    plan = self
+                        .map_logical_node_to_physical(
+                            node.node,
+                            session_state,
+                            ChildrenContainer::One(plan),
+                        )
+                        .await?;
                 }
-                LogicalPlan::Copy(CopyTo{
-                    input,
-                    output_url,
-                    format_options,
-                    partition_by,
-                    options: source_option_tuples
-                }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let parsed_url = ListingTableUrl::parse(output_url)?;
-                    let object_store_url = parsed_url.object_store();
-
-                    let schema: Schema = (**input.schema()).clone().into();
-
-                    // Note: the DataType passed here is ignored for the 
purposes of writing and inferred instead
-                    // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
-                    // the column name rather than column name + explicit data 
type.
-                    let table_partition_cols = partition_by.iter()
-                        .map(|s| (s.to_string(), arrow_schema::DataType::Null))
-                        .collect::<Vec<_>>();
-
-                    // Set file sink related options
-                    let config = FileSinkConfig {
-                        object_store_url,
-                        table_paths: vec![parsed_url],
-                        file_groups: vec![],
-                        output_schema: Arc::new(schema),
-                        table_partition_cols,
-                        overwrite: false,
-                    };
-                    let mut table_options = 
session_state.default_table_options();
-                    let sink_format: Arc<dyn FileFormat> = match 
format_options {
-                        FormatOptions::CSV(options) => {
-                            table_options.csv = options.clone();
-                            table_options.set_file_format(FileType::CSV);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(CsvFormat::default().with_options(table_options.csv))
-                        },
-                        FormatOptions::JSON(options) => {
-                            table_options.json = options.clone();
-                            table_options.set_file_format(FileType::JSON);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(JsonFormat::default().with_options(table_options.json))
-                        },
-                        #[cfg(feature = "parquet")]
-                        FormatOptions::PARQUET(options) => {
-                            table_options.parquet = options.clone();
-                            table_options.set_file_format(FileType::PARQUET);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-                        },
-                        FormatOptions::AVRO => Arc::new(AvroFormat {} ),
-                        FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                // See if we have all children to build the node.
+                NodeState::TwoOrMoreChildren(children) => {
+                    let mut children = {
+                        let mut guard = children.lock().unwrap();
+                        // Safe unwrap on option as only the last task 
reaching this
+                        // node will take the contents (which happens after 
this line).
+                        let children = guard.as_mut().unwrap();

Review Comment:
   Maybe we could return an internal error instead of panic if for some reason 
the option was `None`



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +515,957 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appeart, so later we need to 
reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])),
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());

Review Comment:
   I don't think the original behavior is intended. Your change makes sense to 
me



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +550,966 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appeart, so later we need to 
reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => {
+                    let ready_children = 
Vec::with_capacity(node.inputs().len());
+                    let ready_children = Mutex::new(Some(ready_children));
+                    NodeState::TwoOrMoreChildren(ready_children)
+                }
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+        // Spawning tasks which will traverse leaf up to the root.
+        let tasks = flat_tree_leaf_indices
+            .into_iter()
+            .map(|index| self.task_helper(index, flat_tree.clone(), 
session_state));
+        let mut outputs = futures::stream::iter(tasks)
+            .buffer_unordered(max_concurrency)
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Ideally this never happens if we have a valid LogicalPlan tree
+        assert!(
+            outputs.len() == 1,
+            "Invalid physical plan created from logical plan"
+        );
+        let plan = outputs.pop().unwrap();
+        Ok(plan)
     }
 
-    /// Create a physical plan from a logical plan
-    fn create_initial_plan<'a>(
+    /// These tasks start at a leaf and traverse up the tree towards the root, 
building
+    /// an ExecutionPlan as they go. When they reach a node with two or more 
children,
+    /// they append their current result (a child of the parent node) to the 
children
+    /// vector, and if this is sufficient to create the parent then continues 
traversing
+    /// the tree to create nodes. Otherwise, the task terminates.
+    async fn task_helper<'a>(
         &'a self,
-        logical_plan: &'a LogicalPlan,
+        leaf_starter_index: usize,
+        flat_tree: Arc<Vec<LogicalNode<'a>>>,
         session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
-        async move {
-            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projection,
-                    filters,
-                    fetch,
-                    ..
-                }) => {
-                    let source = source_as_provider(source)?;
-                    // Remove all qualifiers from the scan as the provider
-                    // doesn't know (nor should care) how the relation was
-                    // referred to in the query
-                    let filters = unnormalize_cols(filters.iter().cloned());
-                    source.scan(session_state, projection.as_ref(), &filters, 
*fetch).await
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // We always start with a leaf, so can ignore status and pass empty 
children
+        let mut node = &flat_tree[leaf_starter_index];
+        let mut plan = self
+            .map_logical_node_to_physical(
+                node.node,
+                session_state,
+                ChildrenContainer::None,
+            )
+            .await?;
+        let mut current_index = leaf_starter_index;
+        // parent_index is None only for root
+        while let Some(parent_index) = node.parent_index {
+            node = &flat_tree[parent_index];
+            match &node.state {
+                NodeState::ZeroOrOneChild => {
+                    plan = self
+                        .map_logical_node_to_physical(
+                            node.node,
+                            session_state,
+                            ChildrenContainer::One(plan),
+                        )
+                        .await?;
                 }
-                LogicalPlan::Copy(CopyTo{
-                    input,
-                    output_url,
-                    format_options,
-                    partition_by,
-                    options: source_option_tuples
-                }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let parsed_url = ListingTableUrl::parse(output_url)?;
-                    let object_store_url = parsed_url.object_store();
-
-                    let schema: Schema = (**input.schema()).clone().into();
-
-                    // Note: the DataType passed here is ignored for the 
purposes of writing and inferred instead
-                    // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
-                    // the column name rather than column name + explicit data 
type.
-                    let table_partition_cols = partition_by.iter()
-                        .map(|s| (s.to_string(), arrow_schema::DataType::Null))
-                        .collect::<Vec<_>>();
-
-                    // Set file sink related options
-                    let config = FileSinkConfig {
-                        object_store_url,
-                        table_paths: vec![parsed_url],
-                        file_groups: vec![],
-                        output_schema: Arc::new(schema),
-                        table_partition_cols,
-                        overwrite: false,
-                    };
-                    let mut table_options = 
session_state.default_table_options();
-                    let sink_format: Arc<dyn FileFormat> = match 
format_options {
-                        FormatOptions::CSV(options) => {
-                            table_options.csv = options.clone();
-                            table_options.set_file_format(FileType::CSV);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(CsvFormat::default().with_options(table_options.csv))
-                        },
-                        FormatOptions::JSON(options) => {
-                            table_options.json = options.clone();
-                            table_options.set_file_format(FileType::JSON);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(JsonFormat::default().with_options(table_options.json))
-                        },
-                        #[cfg(feature = "parquet")]
-                        FormatOptions::PARQUET(options) => {
-                            table_options.parquet = options.clone();
-                            table_options.set_file_format(FileType::PARQUET);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-                        },
-                        FormatOptions::AVRO => Arc::new(AvroFormat {} ),
-                        FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                // See if we have all children to build the node.
+                NodeState::TwoOrMoreChildren(children) => {
+                    let mut children = {
+                        let mut guard = children.lock().unwrap();
+                        // Safe unwrap on option as only the last task 
reaching this
+                        // node will take the contents (which happens after 
this line).
+                        let children = guard.as_mut().unwrap();
+                        // Add our contribution to this parent node.
+                        children.push((current_index, plan));
+                        if children.len() < node.node.inputs().len() {
+                            // This node is not ready yet, still pending more 
children.
+                            // This task is finished forever.
+                            return Ok(None);
+                        }
+
+                        // With this task's contribution we have enough 
children.
+                        // This task is the only one building this node now, 
and thus
+                        // no other task will need the Mutex for this node, so 
take
+                        // all children.
+                        //
+                        // This take is the only place the Option becomes None.
+                        guard.take().unwrap()
                     };
 
-                    sink_format.create_writer_physical_plan(input_exec, 
session_state, config, None).await
-                }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertInto,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
false).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
-                    }
-                }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertOverwrite,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
true).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
-                    }
+                    // Indices refer to position in flat tree Vec, which means 
they are
+                    // guaranteed to be unique, hence unstable sort used.
+                    //
+                    // We reverse sort because of how we visited the node in 
the initial
+                    // DFS traversal (see above).
+                    children.sort_unstable_by_key(|(index, _)| 
std::cmp::Reverse(*index));
+                    let children =
+                        children.iter().map(|(_, plan)| 
plan.clone()).collect();
+                    let children = ChildrenContainer::Multiple(children);
+                    plan = self
+                        .map_logical_node_to_physical(node.node, 
session_state, children)
+                        .await?;
                 }
-                LogicalPlan::Values(Values {
-                    values,
-                    schema,
-                }) => {
-                    let exec_schema = schema.as_ref().to_owned().into();
-                    let exprs = values.iter()
-                        .map(|row| {
-                            row.iter().map(|expr| {
-                                self.create_physical_expr(
-                                    expr,
-                                    schema,
-                                    session_state,
-                                )
+            }
+            current_index = parent_index;
+        }
+        // Only one task should ever reach this point for a valid LogicalPlan 
tree.
+        Ok(Some(plan))
+    }
+
+    /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan 
counterpart.
+    async fn map_logical_node_to_physical(

Review Comment:
   As I understand it the main reason this structure reduces stack space is 
that this function requires a non trivial stack frame, but now instead of 
recursively calling itself (which results in many such frames on the stack) it 
calls itself iteratively (basically pushing the results on to a Vec)
   
   Or put another way, `map_logical_node_to_physical` never calls 
`map_logical_node_to_physical`



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +550,966 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appeart, so later we need to 
reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => {
+                    let ready_children = 
Vec::with_capacity(node.inputs().len());
+                    let ready_children = Mutex::new(Some(ready_children));
+                    NodeState::TwoOrMoreChildren(ready_children)
+                }
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+        // Spawning tasks which will traverse leaf up to the root.
+        let tasks = flat_tree_leaf_indices
+            .into_iter()
+            .map(|index| self.task_helper(index, flat_tree.clone(), 
session_state));
+        let mut outputs = futures::stream::iter(tasks)
+            .buffer_unordered(max_concurrency)
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Ideally this never happens if we have a valid LogicalPlan tree
+        assert!(
+            outputs.len() == 1,
+            "Invalid physical plan created from logical plan"
+        );
+        let plan = outputs.pop().unwrap();
+        Ok(plan)
     }
 
-    /// Create a physical plan from a logical plan
-    fn create_initial_plan<'a>(
+    /// These tasks start at a leaf and traverse up the tree towards the root, 
building
+    /// an ExecutionPlan as they go. When they reach a node with two or more 
children,
+    /// they append their current result (a child of the parent node) to the 
children
+    /// vector, and if this is sufficient to create the parent then continues 
traversing
+    /// the tree to create nodes. Otherwise, the task terminates.
+    async fn task_helper<'a>(
         &'a self,
-        logical_plan: &'a LogicalPlan,
+        leaf_starter_index: usize,
+        flat_tree: Arc<Vec<LogicalNode<'a>>>,
         session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
-        async move {
-            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projection,
-                    filters,
-                    fetch,
-                    ..
-                }) => {
-                    let source = source_as_provider(source)?;
-                    // Remove all qualifiers from the scan as the provider
-                    // doesn't know (nor should care) how the relation was
-                    // referred to in the query
-                    let filters = unnormalize_cols(filters.iter().cloned());
-                    source.scan(session_state, projection.as_ref(), &filters, 
*fetch).await
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // We always start with a leaf, so can ignore status and pass empty 
children
+        let mut node = &flat_tree[leaf_starter_index];
+        let mut plan = self
+            .map_logical_node_to_physical(
+                node.node,
+                session_state,
+                ChildrenContainer::None,
+            )
+            .await?;
+        let mut current_index = leaf_starter_index;
+        // parent_index is None only for root
+        while let Some(parent_index) = node.parent_index {
+            node = &flat_tree[parent_index];
+            match &node.state {
+                NodeState::ZeroOrOneChild => {
+                    plan = self
+                        .map_logical_node_to_physical(
+                            node.node,
+                            session_state,
+                            ChildrenContainer::One(plan),
+                        )
+                        .await?;
                 }
-                LogicalPlan::Copy(CopyTo{
-                    input,
-                    output_url,
-                    format_options,
-                    partition_by,
-                    options: source_option_tuples
-                }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let parsed_url = ListingTableUrl::parse(output_url)?;
-                    let object_store_url = parsed_url.object_store();
-
-                    let schema: Schema = (**input.schema()).clone().into();
-
-                    // Note: the DataType passed here is ignored for the 
purposes of writing and inferred instead
-                    // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
-                    // the column name rather than column name + explicit data 
type.
-                    let table_partition_cols = partition_by.iter()
-                        .map(|s| (s.to_string(), arrow_schema::DataType::Null))
-                        .collect::<Vec<_>>();
-
-                    // Set file sink related options
-                    let config = FileSinkConfig {
-                        object_store_url,
-                        table_paths: vec![parsed_url],
-                        file_groups: vec![],
-                        output_schema: Arc::new(schema),
-                        table_partition_cols,
-                        overwrite: false,
-                    };
-                    let mut table_options = 
session_state.default_table_options();
-                    let sink_format: Arc<dyn FileFormat> = match 
format_options {
-                        FormatOptions::CSV(options) => {
-                            table_options.csv = options.clone();
-                            table_options.set_file_format(FileType::CSV);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(CsvFormat::default().with_options(table_options.csv))
-                        },
-                        FormatOptions::JSON(options) => {
-                            table_options.json = options.clone();
-                            table_options.set_file_format(FileType::JSON);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(JsonFormat::default().with_options(table_options.json))
-                        },
-                        #[cfg(feature = "parquet")]
-                        FormatOptions::PARQUET(options) => {
-                            table_options.parquet = options.clone();
-                            table_options.set_file_format(FileType::PARQUET);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-                        },
-                        FormatOptions::AVRO => Arc::new(AvroFormat {} ),
-                        FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                // See if we have all children to build the node.
+                NodeState::TwoOrMoreChildren(children) => {
+                    let mut children = {
+                        let mut guard = children.lock().unwrap();
+                        // Safe unwrap on option as only the last task 
reaching this
+                        // node will take the contents (which happens after 
this line).
+                        let children = guard.as_mut().unwrap();
+                        // Add our contribution to this parent node.
+                        children.push((current_index, plan));
+                        if children.len() < node.node.inputs().len() {
+                            // This node is not ready yet, still pending more 
children.
+                            // This task is finished forever.
+                            return Ok(None);
+                        }
+
+                        // With this task's contribution we have enough 
children.
+                        // This task is the only one building this node now, 
and thus
+                        // no other task will need the Mutex for this node, so 
take
+                        // all children.
+                        //
+                        // This take is the only place the Option becomes None.
+                        guard.take().unwrap()
                     };
 
-                    sink_format.create_writer_physical_plan(input_exec, 
session_state, config, None).await
-                }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertInto,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
false).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
-                    }
-                }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertOverwrite,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
true).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
-                    }
+                    // Indices refer to position in flat tree Vec, which means 
they are
+                    // guaranteed to be unique, hence unstable sort used.
+                    //
+                    // We reverse sort because of how we visited the node in 
the initial
+                    // DFS traversal (see above).
+                    children.sort_unstable_by_key(|(index, _)| 
std::cmp::Reverse(*index));
+                    let children =
+                        children.iter().map(|(_, plan)| 
plan.clone()).collect();
+                    let children = ChildrenContainer::Multiple(children);
+                    plan = self
+                        .map_logical_node_to_physical(node.node, 
session_state, children)
+                        .await?;
                 }
-                LogicalPlan::Values(Values {
-                    values,
-                    schema,
-                }) => {
-                    let exec_schema = schema.as_ref().to_owned().into();
-                    let exprs = values.iter()
-                        .map(|row| {
-                            row.iter().map(|expr| {
-                                self.create_physical_expr(
-                                    expr,
-                                    schema,
-                                    session_state,
-                                )
+            }
+            current_index = parent_index;
+        }
+        // Only one task should ever reach this point for a valid LogicalPlan 
tree.
+        Ok(Some(plan))
+    }
+
+    /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan 
counterpart.
+    async fn map_logical_node_to_physical(
+        &self,
+        node: &LogicalPlan,
+        session_state: &SessionState,
+        children: ChildrenContainer,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let exec_node: Arc<dyn ExecutionPlan> = match node {
+            // Leaves (no children)
+            LogicalPlan::TableScan(TableScan {
+                source,
+                projection,
+                filters,
+                fetch,
+                ..
+            }) => {
+                let source = source_as_provider(source)?;
+                // Remove all qualifiers from the scan as the provider
+                // doesn't know (nor should care) how the relation was
+                // referred to in the query
+                let filters = unnormalize_cols(filters.iter().cloned());
+                source
+                    .scan(session_state, projection.as_ref(), &filters, *fetch)
+                    .await?
+            }
+            LogicalPlan::Values(Values { values, schema }) => {
+                let exec_schema = schema.as_ref().to_owned().into();
+                let exprs = values
+                    .iter()
+                    .map(|row| {
+                        row.iter()
+                            .map(|expr| {
+                                self.create_physical_expr(expr, schema, 
session_state)
                             })
                             .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-                    let value_exec = ValuesExec::try_new(
-                        SchemaRef::new(exec_schema),
-                        exprs,
-                    )?;
-                    Ok(Arc::new(value_exec))
-                }
-                LogicalPlan::Window(Window {
-                    input, window_expr, ..
-                }) => {
-                    if window_expr.is_empty() {
-                        return internal_err!(
-                            "Impossibly got empty window expression"
-                        );
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let value_exec = 
ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
+                Arc::new(value_exec)
+            }
+            LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: false,
+                schema,
+            }) => Arc::new(EmptyExec::new(SchemaRef::new(
+                schema.as_ref().to_owned().into(),
+            ))),
+            LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: true,
+                schema,
+            }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new(
+                schema.as_ref().to_owned().into(),
+            ))),
+            LogicalPlan::DescribeTable(DescribeTable {
+                schema,
+                output_schema,
+            }) => {
+                let output_schema: Schema = output_schema.as_ref().into();
+                self.plan_describe(schema.clone(), Arc::new(output_schema))?
+            }
+
+            // 1 Child
+            LogicalPlan::Copy(CopyTo {

Review Comment:
   As some follow on PR, we could refactor this logic out of one giant match 
statement into functions like
   
   ```rust
   match plan { 
   ...
       LogicalPlan::Copy(copy_to) => copy_to_physical(copy_to),
   ...
   }
   ```
   
   But after this PR that refactoring seems like it would mostly improve 
readability rather than any stack usage



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +550,966 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appeart, so later we need to 
reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => {
+                    let ready_children = 
Vec::with_capacity(node.inputs().len());
+                    let ready_children = Mutex::new(Some(ready_children));
+                    NodeState::TwoOrMoreChildren(ready_children)
+                }
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+        // Spawning tasks which will traverse leaf up to the root.
+        let tasks = flat_tree_leaf_indices
+            .into_iter()
+            .map(|index| self.task_helper(index, flat_tree.clone(), 
session_state));
+        let mut outputs = futures::stream::iter(tasks)
+            .buffer_unordered(max_concurrency)
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Ideally this never happens if we have a valid LogicalPlan tree
+        assert!(
+            outputs.len() == 1,
+            "Invalid physical plan created from logical plan"
+        );
+        let plan = outputs.pop().unwrap();
+        Ok(plan)
     }
 
-    /// Create a physical plan from a logical plan
-    fn create_initial_plan<'a>(
+    /// These tasks start at a leaf and traverse up the tree towards the root, 
building
+    /// an ExecutionPlan as they go. When they reach a node with two or more 
children,
+    /// they append their current result (a child of the parent node) to the 
children
+    /// vector, and if this is sufficient to create the parent then continues 
traversing
+    /// the tree to create nodes. Otherwise, the task terminates.
+    async fn task_helper<'a>(
         &'a self,
-        logical_plan: &'a LogicalPlan,
+        leaf_starter_index: usize,
+        flat_tree: Arc<Vec<LogicalNode<'a>>>,
         session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
-        async move {
-            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projection,
-                    filters,
-                    fetch,
-                    ..
-                }) => {
-                    let source = source_as_provider(source)?;
-                    // Remove all qualifiers from the scan as the provider
-                    // doesn't know (nor should care) how the relation was
-                    // referred to in the query
-                    let filters = unnormalize_cols(filters.iter().cloned());
-                    source.scan(session_state, projection.as_ref(), &filters, 
*fetch).await
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // We always start with a leaf, so can ignore status and pass empty 
children
+        let mut node = &flat_tree[leaf_starter_index];
+        let mut plan = self
+            .map_logical_node_to_physical(
+                node.node,
+                session_state,
+                ChildrenContainer::None,
+            )
+            .await?;
+        let mut current_index = leaf_starter_index;
+        // parent_index is None only for root
+        while let Some(parent_index) = node.parent_index {
+            node = &flat_tree[parent_index];
+            match &node.state {
+                NodeState::ZeroOrOneChild => {
+                    plan = self
+                        .map_logical_node_to_physical(
+                            node.node,
+                            session_state,
+                            ChildrenContainer::One(plan),
+                        )
+                        .await?;
                 }
-                LogicalPlan::Copy(CopyTo{
-                    input,
-                    output_url,
-                    format_options,
-                    partition_by,
-                    options: source_option_tuples
-                }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let parsed_url = ListingTableUrl::parse(output_url)?;
-                    let object_store_url = parsed_url.object_store();
-
-                    let schema: Schema = (**input.schema()).clone().into();
-
-                    // Note: the DataType passed here is ignored for the 
purposes of writing and inferred instead
-                    // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
-                    // the column name rather than column name + explicit data 
type.
-                    let table_partition_cols = partition_by.iter()
-                        .map(|s| (s.to_string(), arrow_schema::DataType::Null))
-                        .collect::<Vec<_>>();
-
-                    // Set file sink related options
-                    let config = FileSinkConfig {
-                        object_store_url,
-                        table_paths: vec![parsed_url],
-                        file_groups: vec![],
-                        output_schema: Arc::new(schema),
-                        table_partition_cols,
-                        overwrite: false,
-                    };
-                    let mut table_options = 
session_state.default_table_options();
-                    let sink_format: Arc<dyn FileFormat> = match 
format_options {
-                        FormatOptions::CSV(options) => {
-                            table_options.csv = options.clone();
-                            table_options.set_file_format(FileType::CSV);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(CsvFormat::default().with_options(table_options.csv))
-                        },
-                        FormatOptions::JSON(options) => {
-                            table_options.json = options.clone();
-                            table_options.set_file_format(FileType::JSON);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(JsonFormat::default().with_options(table_options.json))
-                        },
-                        #[cfg(feature = "parquet")]
-                        FormatOptions::PARQUET(options) => {
-                            table_options.parquet = options.clone();
-                            table_options.set_file_format(FileType::PARQUET);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-                        },
-                        FormatOptions::AVRO => Arc::new(AvroFormat {} ),
-                        FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                // See if we have all children to build the node.
+                NodeState::TwoOrMoreChildren(children) => {
+                    let mut children = {
+                        let mut guard = children.lock().unwrap();
+                        // Safe unwrap on option as only the last task 
reaching this
+                        // node will take the contents (which happens after 
this line).
+                        let children = guard.as_mut().unwrap();
+                        // Add our contribution to this parent node.
+                        children.push((current_index, plan));
+                        if children.len() < node.node.inputs().len() {
+                            // This node is not ready yet, still pending more 
children.
+                            // This task is finished forever.
+                            return Ok(None);
+                        }
+
+                        // With this task's contribution we have enough 
children.
+                        // This task is the only one building this node now, 
and thus
+                        // no other task will need the Mutex for this node, so 
take
+                        // all children.
+                        //
+                        // This take is the only place the Option becomes None.
+                        guard.take().unwrap()

Review Comment:
   likewise it would be sweet if this was an internal error rather than panic. 
but I don't think it is necessary, just a suggestion



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +515,957 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appeart, so later we need to 
reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])),
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+        // Spawning tasks which will traverse leaf up to the root.
+        let tasks = flat_tree_leaf_indices
+            .into_iter()
+            .map(|index| self.task_helper(index, flat_tree.clone(), 
session_state))
+            .collect::<Vec<_>>();
+        let mut outputs = futures::stream::iter(tasks)
+            .buffer_unordered(max_concurrency)
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Ideally this never happens if we have a valid LogicalPlan tree
+        assert!(
+            outputs.len() == 1,
+            "Invalid physical plan created from logical plan"
+        );
+        let plan = outputs.pop().unwrap();
+        Ok(plan)
     }
 
-    /// Create a physical plan from a logical plan
-    fn create_initial_plan<'a>(
+    /// These tasks start at a leaf and traverse up the tree towards the root, 
building
+    /// an ExecutionPlan as they go. When they reach a node with two or more 
children,
+    /// they append their current result (a child of the parent node) to the 
children
+    /// vector, and if this is sufficient to create the parent then continues 
traversing
+    /// the tree to create nodes. Otherwise, the task terminates.
+    async fn task_helper<'a>(
         &'a self,
-        logical_plan: &'a LogicalPlan,
+        leaf_starter_index: usize,
+        flat_tree: Arc<Vec<LogicalNode<'a>>>,
         session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
-        async move {
-            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projection,
-                    filters,
-                    fetch,
-                    ..
-                }) => {
-                    let source = source_as_provider(source)?;
-                    // Remove all qualifiers from the scan as the provider
-                    // doesn't know (nor should care) how the relation was
-                    // referred to in the query
-                    let filters = unnormalize_cols(filters.iter().cloned());
-                    source.scan(session_state, projection.as_ref(), &filters, 
*fetch).await
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // We always start with a leaf, so can ignore status and pass empty 
children
+        let mut node = &flat_tree[leaf_starter_index];
+        let mut plan = self
+            .map_logical_node_to_physical(node.node, session_state, vec![])
+            .await?;
+        let mut current_index = leaf_starter_index;
+        // parent_index is None only for root
+        while let Some(parent_index) = node.parent_index {
+            node = &flat_tree[parent_index];
+            match &node.state {
+                NodeState::ZeroOrOneChild => {
+                    plan = self
+                        .map_logical_node_to_physical(
+                            node.node,
+                            session_state,
+                            vec![plan],
+                        )
+                        .await?;
                 }
-                LogicalPlan::Copy(CopyTo{
-                    input,
-                    output_url,
-                    format_options,
-                    partition_by,
-                    options: source_option_tuples
-                }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let parsed_url = ListingTableUrl::parse(output_url)?;
-                    let object_store_url = parsed_url.object_store();
-
-                    let schema: Schema = (**input.schema()).clone().into();
-
-                    // Note: the DataType passed here is ignored for the 
purposes of writing and inferred instead
-                    // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
-                    // the column name rather than column name + explicit data 
type.
-                    let table_partition_cols = partition_by.iter()
-                        .map(|s| (s.to_string(), arrow_schema::DataType::Null))
-                        .collect::<Vec<_>>();
-
-                    // Set file sink related options
-                    let config = FileSinkConfig {
-                        object_store_url,
-                        table_paths: vec![parsed_url],
-                        file_groups: vec![],
-                        output_schema: Arc::new(schema),
-                        table_partition_cols,
-                        overwrite: false,
-                    };
-                    let mut table_options = 
session_state.default_table_options();
-                    let sink_format: Arc<dyn FileFormat> = match 
format_options {
-                        FormatOptions::CSV(options) => {
-                            table_options.csv = options.clone();
-                            table_options.set_file_format(FileType::CSV);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(CsvFormat::default().with_options(table_options.csv))
-                        },
-                        FormatOptions::JSON(options) => {
-                            table_options.json = options.clone();
-                            table_options.set_file_format(FileType::JSON);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(JsonFormat::default().with_options(table_options.json))
-                        },
-                        #[cfg(feature = "parquet")]
-                        FormatOptions::PARQUET(options) => {
-                            table_options.parquet = options.clone();
-                            table_options.set_file_format(FileType::PARQUET);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-                        },
-                        FormatOptions::AVRO => Arc::new(AvroFormat {} ),
-                        FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                // See if we have all children to build the node.
+                NodeState::TwoOrMoreChildren(children) => {
+                    let mut children = {
+                        let mut children = children.lock().unwrap();
+                        // Add our contribution to this parent node.
+                        children.push((current_index, plan));
+                        if children.len() < node.node.inputs().len() {
+                            // This node is not ready yet, still pending more 
children.
+                            // This task is finished forever.
+                            return Ok(None);
+                        } else {
+                            // With this task's contribution we have enough 
children.
+                            // This task is the only one building this node 
now, and thus
+                            // no other task will need the Mutex for this node.
+
+                            // TODO: How to do this without a clone? Take from 
the inner Mutex?
+                            children.clone()
+                        }
                     };
 
-                    sink_format.create_writer_physical_plan(input_exec, 
session_state, config, None).await
+                    // Indices refer to position in flat tree Vec, which means 
they are
+                    // guaranteed to be unique, hence unstable sort used.
+                    // We reverse sort because of how we visited the node in 
the initial
+                    // DFS traversal (see above).
+                    children.sort_unstable_by_key(|(index, _)| 
std::cmp::Reverse(*index));
+                    let children =
+                        children.iter().map(|(_, plan)| 
plan.clone()).collect();
+
+                    plan = self
+                        .map_logical_node_to_physical(node.node, 
session_state, children)
+                        .await?;
                 }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertInto,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
false).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
-                    }
-                }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertOverwrite,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
true).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
-                    }
-                }
-                LogicalPlan::Values(Values {
-                    values,
-                    schema,
-                }) => {
-                    let exec_schema = schema.as_ref().to_owned().into();
-                    let exprs = values.iter()
-                        .map(|row| {
-                            row.iter().map(|expr| {
-                                self.create_physical_expr(
-                                    expr,
-                                    schema,
-                                    session_state,
-                                )
+            }
+            current_index = parent_index;
+        }
+        // Only one task should ever reach this point for a valid LogicalPlan 
tree.
+        Ok(Some(plan))
+    }
+
+    /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan 
counterpart.
+    async fn map_logical_node_to_physical(
+        &self,
+        node: &LogicalPlan,
+        session_state: &SessionState,
+        // TODO: refactor to not use Vec? Wasted for leaves/1 child
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let exec_node: Arc<dyn ExecutionPlan> = match node {
+            // Leaves (no children)
+            LogicalPlan::TableScan(TableScan {
+                source,
+                projection,
+                filters,
+                fetch,
+                ..
+            }) => {
+                let source = source_as_provider(source)?;
+                // Remove all qualifiers from the scan as the provider
+                // doesn't know (nor should care) how the relation was
+                // referred to in the query
+                let filters = unnormalize_cols(filters.iter().cloned());
+                source
+                    .scan(session_state, projection.as_ref(), &filters, *fetch)
+                    .await?
+            }
+            LogicalPlan::Values(Values { values, schema }) => {
+                let exec_schema = schema.as_ref().to_owned().into();
+                let exprs = values
+                    .iter()
+                    .map(|row| {
+                        row.iter()
+                            .map(|expr| {
+                                self.create_physical_expr(expr, schema, 
session_state)
                             })
                             .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-                    let value_exec = ValuesExec::try_new(
-                        SchemaRef::new(exec_schema),
-                        exprs,
-                    )?;
-                    Ok(Arc::new(value_exec))
-                }
-                LogicalPlan::Window(Window {
-                    input, window_expr, ..
-                }) => {
-                    if window_expr.is_empty() {
-                        return internal_err!(
-                            "Impossibly got empty window expression"
-                        );
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let value_exec = 
ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
+                Arc::new(value_exec)
+            }
+            LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: false,
+                schema,
+            }) => Arc::new(EmptyExec::new(SchemaRef::new(
+                schema.as_ref().to_owned().into(),
+            ))),
+            LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: true,
+                schema,
+            }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new(
+                schema.as_ref().to_owned().into(),
+            ))),
+            LogicalPlan::DescribeTable(DescribeTable {
+                schema,
+                output_schema,
+            }) => {
+                let output_schema: Schema = output_schema.as_ref().into();
+                self.plan_describe(schema.clone(), Arc::new(output_schema))?
+            }
+
+            // 1 Child
+            LogicalPlan::Copy(CopyTo {
+                input,
+                output_url,
+                format_options,
+                partition_by,
+                options: source_option_tuples,
+            }) => {
+                let input_exec = children.pop().unwrap();
+                let parsed_url = ListingTableUrl::parse(output_url)?;
+                let object_store_url = parsed_url.object_store();
+
+                let schema: Schema = (**input.schema()).clone().into();
+
+                // Note: the DataType passed here is ignored for the purposes 
of writing and inferred instead
+                // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
+                // the column name rather than column name + explicit data 
type.
+                let table_partition_cols = partition_by
+                    .iter()
+                    .map(|s| (s.to_string(), arrow_schema::DataType::Null))
+                    .collect::<Vec<_>>();
+
+                // Set file sink related options
+                let config = FileSinkConfig {
+                    object_store_url,
+                    table_paths: vec![parsed_url],
+                    file_groups: vec![],
+                    output_schema: Arc::new(schema),
+                    table_partition_cols,
+                    overwrite: false,
+                };
+                let mut table_options = session_state.default_table_options();
+                let sink_format: Arc<dyn FileFormat> = match format_options {
+                    FormatOptions::CSV(options) => {
+                        table_options.csv = options.clone();
+                        table_options.set_file_format(FileType::CSV);
+                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
+                        
Arc::new(CsvFormat::default().with_options(table_options.csv))
                     }
+                    FormatOptions::JSON(options) => {
+                        table_options.json = options.clone();
+                        table_options.set_file_format(FileType::JSON);
+                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
+                        
Arc::new(JsonFormat::default().with_options(table_options.json))
+                    }
+                    #[cfg(feature = "parquet")]
+                    FormatOptions::PARQUET(options) => {
+                        table_options.parquet = options.clone();
+                        table_options.set_file_format(FileType::PARQUET);
+                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
+                        Arc::new(
+                            
ParquetFormat::default().with_options(table_options.parquet),
+                        )
+                    }
+                    FormatOptions::AVRO => Arc::new(AvroFormat {}),
+                    FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                };
 
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
+                sink_format
+                    .create_writer_physical_plan(input_exec, session_state, 
config, None)
+                    .await?
+            }
+            LogicalPlan::Dml(DmlStatement {
+                table_name,
+                op: WriteOp::InsertInto,
+                ..
+            }) => {
+                let name = table_name.table();
+                let schema = session_state.schema_for_ref(table_name.clone())?;
+                if let Some(provider) = schema.table(name).await? {
+                    let input_exec = children.pop().unwrap();
+                    provider
+                        .insert_into(session_state, input_exec, false)
+                        .await?
+                } else {
+                    return exec_err!("Table '{table_name}' does not exist");
+                }
+            }
+            LogicalPlan::Dml(DmlStatement {
+                table_name,
+                op: WriteOp::InsertOverwrite,
+                ..
+            }) => {
+                let name = table_name.table();
+                let schema = session_state.schema_for_ref(table_name.clone())?;
+                if let Some(provider) = schema.table(name).await? {
+                    let input_exec = children.pop().unwrap();
+                    provider
+                        .insert_into(session_state, input_exec, true)
+                        .await?
+                } else {
+                    return exec_err!("Table '{table_name}' does not exist");
+                }
+            }
+            LogicalPlan::Window(Window {
+                input, window_expr, ..
+            }) => {
+                if window_expr.is_empty() {
+                    return internal_err!("Impossibly got empty window 
expression");
+                }
 
-                    // at this moment we are guaranteed by the logical planner
-                    // to have all the window_expr to have equal sort key
-                    let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
+                let input_exec = children.pop().unwrap();
 
-                    let can_repartition = !partition_keys.is_empty()
-                        && session_state.config().target_partitions() > 1
-                        && 
session_state.config().repartition_window_functions();
+                // at this moment we are guaranteed by the logical planner
+                // to have all the window_expr to have equal sort key
+                let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
 
-                    let physical_partition_keys = if can_repartition
-                    {
-                        partition_keys
-                            .iter()
-                            .map(|e| {
-                                self.create_physical_expr(
-                                    e,
-                                    input.schema(),
-                                    session_state,
-                                )
-                            })
-                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
-                    } else {
-                        vec![]
-                    };
+                let can_repartition = !partition_keys.is_empty()
+                    && session_state.config().target_partitions() > 1
+                    && session_state.config().repartition_window_functions();
 
-                    let get_sort_keys = |expr: &Expr| match expr {
-                        Expr::WindowFunction(WindowFunction{
-                            ref partition_by,
-                            ref order_by,
-                            ..
-                        }) => generate_sort_key(partition_by, order_by),
-                        Expr::Alias(Alias{expr,..}) => {
-                            // Convert &Box<T> to &T
-                            match &**expr {
-                                Expr::WindowFunction(WindowFunction{
-                                    ref partition_by,
-                                    ref order_by,
-                                    ..}) => generate_sort_key(partition_by, 
order_by),
-                                _ => unreachable!(),
-                            }
+                let physical_partition_keys = if can_repartition {
+                    partition_keys
+                        .iter()
+                        .map(|e| {
+                            self.create_physical_expr(e, input.schema(), 
session_state)
+                        })
+                        .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
+                } else {
+                    vec![]
+                };
+
+                let get_sort_keys = |expr: &Expr| match expr {
+                    Expr::WindowFunction(WindowFunction {
+                        ref partition_by,
+                        ref order_by,
+                        ..
+                    }) => generate_sort_key(partition_by, order_by),
+                    Expr::Alias(Alias { expr, .. }) => {
+                        // Convert &Box<T> to &T
+                        match &**expr {
+                            Expr::WindowFunction(WindowFunction {
+                                ref partition_by,
+                                ref order_by,
+                                ..
+                            }) => generate_sort_key(partition_by, order_by),
+                            _ => unreachable!(),
                         }
-                        _ => unreachable!(),
-                    };
-                    let sort_keys = get_sort_keys(&window_expr[0])?;
-                    if window_expr.len() > 1 {
-                        debug_assert!(
+                    }
+                    _ => unreachable!(),
+                };
+                let sort_keys = get_sort_keys(&window_expr[0])?;
+                if window_expr.len() > 1 {
+                    debug_assert!(
                             window_expr[1..]
                                 .iter()
                                 .all(|expr| get_sort_keys(expr).unwrap() == 
sort_keys),
                             "all window expressions shall have the same sort 
keys, as guaranteed by logical planning"
                         );
-                    }
-
-                    let logical_schema = logical_plan.schema();
-                    let window_expr = window_expr
-                        .iter()
-                        .map(|e| {
-                            create_window_expr(
-                                e,
-                                logical_schema,
-                                session_state.execution_props(),
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-
-                    let uses_bounded_memory = window_expr
-                        .iter()
-                        .all(|e| e.uses_bounded_memory());
-                    // If all window expressions can run with bounded memory,
-                    // choose the bounded window variant:
-                    Ok(if uses_bounded_memory {
-                        Arc::new(BoundedWindowAggExec::try_new(
-                            window_expr,
-                            input_exec,
-                            physical_partition_keys,
-                            InputOrderMode::Sorted,
-                        )?)
-                    } else {
-                        Arc::new(WindowAggExec::try_new(
-                            window_expr,
-                            input_exec,
-                            physical_partition_keys,
-                        )?)
-                    })
                 }
-                LogicalPlan::Aggregate(Aggregate {
-                    input,
-                    group_expr,
-                    aggr_expr,
-                    ..
-                }) => {
-                    // Initially need to perform the aggregate and then merge 
the partitions
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let physical_input_schema = input_exec.schema();
-                    let logical_input_schema = input.as_ref().schema();
-
-                    let groups = self.create_grouping_physical_expr(
-                        group_expr,
-                        logical_input_schema,
-                        &physical_input_schema,
-                        session_state)?;
-
-                    let agg_filter = aggr_expr
-                        .iter()
-                        .map(|e| {
-                            create_aggregate_expr_and_maybe_filter(
-                                e,
-                                logical_input_schema,
-                                &physical_input_schema,
-                                session_state.execution_props(),
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?;
 
-                    let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>, 
Vec<_>) = multiunzip(agg_filter);
+                let logical_schema = node.schema();
+                let window_expr = window_expr
+                    .iter()
+                    .map(|e| {
+                        create_window_expr(
+                            e,
+                            logical_schema,
+                            session_state.execution_props(),
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
 
-                    let initial_aggr = Arc::new(AggregateExec::try_new(
-                        AggregateMode::Partial,
-                        groups.clone(),
-                        aggregates.clone(),
-                        filters.clone(),
+                let uses_bounded_memory =
+                    window_expr.iter().all(|e| e.uses_bounded_memory());
+                // If all window expressions can run with bounded memory,
+                // choose the bounded window variant:
+                if uses_bounded_memory {
+                    Arc::new(BoundedWindowAggExec::try_new(
+                        window_expr,
                         input_exec,
-                        physical_input_schema.clone(),
-                    )?);
-
-                    // update group column indices based on partial aggregate 
plan evaluation
-                    let final_group: Vec<Arc<dyn PhysicalExpr>> = 
initial_aggr.output_group_expr();
-
-                    let can_repartition = !groups.is_empty()
-                        && session_state.config().target_partitions() > 1
-                        && session_state.config().repartition_aggregations();
-
-                    // Some aggregators may be modified during initialization 
for
-                    // optimization purposes. For example, a FIRST_VALUE may 
turn
-                    // into a LAST_VALUE with the reverse ordering requirement.
-                    // To reflect such changes to subsequent stages, use the 
updated
-                    // `AggregateExpr`/`PhysicalSortExpr` objects.
-                    let updated_aggregates = initial_aggr.aggr_expr().to_vec();
-
-                    let next_partition_mode = if can_repartition {
-                        // construct a second aggregation with 
'AggregateMode::FinalPartitioned'
-                        AggregateMode::FinalPartitioned
-                    } else {
-                        // construct a second aggregation, keeping the final 
column name equal to the
-                        // first aggregation and the expressions corresponding 
to the respective aggregate
-                        AggregateMode::Final
-                    };
-
-                    let final_grouping_set = PhysicalGroupBy::new_single(
-                        final_group
-                            .iter()
-                            .enumerate()
-                            .map(|(i, expr)| (expr.clone(), 
groups.expr()[i].1.clone()))
-                            .collect()
-                    );
-
-                    Ok(Arc::new(AggregateExec::try_new(
-                        next_partition_mode,
-                        final_grouping_set,
-                        updated_aggregates,
-                        filters,
-                        initial_aggr,
-                        physical_input_schema.clone(),
-                    )?))
+                        physical_partition_keys,
+                        InputOrderMode::Sorted,
+                    )?)
+                } else {
+                    Arc::new(WindowAggExec::try_new(
+                        window_expr,
+                        input_exec,
+                        physical_partition_keys,
+                    )?)
                 }
-                LogicalPlan::Projection(Projection { input, expr, .. }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let input_schema = input.as_ref().schema();
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                ..
+            }) => {
+                // Initially need to perform the aggregate and then merge the 
partitions
+                let input_exec = children.pop().unwrap();
+                let physical_input_schema = input_exec.schema();
+                let logical_input_schema = input.as_ref().schema();
+
+                let groups = self.create_grouping_physical_expr(
+                    group_expr,
+                    logical_input_schema,
+                    &physical_input_schema,
+                    session_state,
+                )?;
 
-                    let physical_exprs = expr
-                        .iter()
-                        .map(|e| {
-                            // For projections, SQL planner and logical plan 
builder may convert user
-                            // provided expressions into logical Column 
expressions if their results
-                            // are already provided from the input plans. 
Because we work with
-                            // qualified columns in logical plane, derived 
columns involve operators or
-                            // functions will contain qualifiers as well. This 
will result in logical
-                            // columns with names like `SUM(t1.c1)`, `t1.c1 + 
t1.c2`, etc.
-                            //
-                            // If we run these logical columns through 
physical_name function, we will
-                            // get physical names with column qualifiers, 
which violates DataFusion's
-                            // field name semantics. To account for this, we 
need to derive the
-                            // physical name from physical input instead.
-                            //
-                            // This depends on the invariant that logical 
schema field index MUST match
-                            // with physical schema field index.
-                            let physical_name = if let Expr::Column(col) = e {
-                                match input_schema.index_of_column(col) {
-                                    Ok(idx) => {
-                                        // index physical field using logical 
field index
-                                        
Ok(input_exec.schema().field(idx).name().to_string())
-                                    }
-                                    // logical column is not a derived column, 
safe to pass along to
-                                    // physical_name
-                                    Err(_) => physical_name(e),
-                                }
-                            } else {
-                                physical_name(e)
-                            };
+                let agg_filter = aggr_expr
+                    .iter()
+                    .map(|e| {
+                        create_aggregate_expr_and_maybe_filter(
+                            e,
+                            logical_input_schema,
+                            &physical_input_schema,
+                            session_state.execution_props(),
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
 
-                            tuple_err((
+                let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, 
Vec<_>) =
+                    multiunzip(agg_filter);
+
+                let initial_aggr = Arc::new(AggregateExec::try_new(
+                    AggregateMode::Partial,
+                    groups.clone(),
+                    aggregates.clone(),
+                    filters.clone(),
+                    input_exec,
+                    physical_input_schema.clone(),
+                )?);
+
+                // update group column indices based on partial aggregate plan 
evaluation
+                let final_group: Vec<Arc<dyn PhysicalExpr>> =
+                    initial_aggr.output_group_expr();
+
+                let can_repartition = !groups.is_empty()
+                    && session_state.config().target_partitions() > 1
+                    && session_state.config().repartition_aggregations();
+
+                // Some aggregators may be modified during initialization for
+                // optimization purposes. For example, a FIRST_VALUE may turn
+                // into a LAST_VALUE with the reverse ordering requirement.
+                // To reflect such changes to subsequent stages, use the 
updated
+                // `AggregateExpr`/`PhysicalSortExpr` objects.
+                let updated_aggregates = initial_aggr.aggr_expr().to_vec();
+
+                let next_partition_mode = if can_repartition {
+                    // construct a second aggregation with 
'AggregateMode::FinalPartitioned'
+                    AggregateMode::FinalPartitioned
+                } else {
+                    // construct a second aggregation, keeping the final 
column name equal to the
+                    // first aggregation and the expressions corresponding to 
the respective aggregate
+                    AggregateMode::Final
+                };
+
+                let final_grouping_set = PhysicalGroupBy::new_single(
+                    final_group
+                        .iter()
+                        .enumerate()
+                        .map(|(i, expr)| (expr.clone(), 
groups.expr()[i].1.clone()))
+                        .collect(),
+                );
+
+                Arc::new(AggregateExec::try_new(
+                    next_partition_mode,
+                    final_grouping_set,
+                    updated_aggregates,
+                    filters,
+                    initial_aggr,
+                    physical_input_schema.clone(),
+                )?)
+            }
+            LogicalPlan::Projection(Projection { input, expr, .. }) => self
+                .create_project_physical_exec(
+                    session_state,
+                    children.pop().unwrap(),
+                    input,
+                    expr,
+                )?,
+            LogicalPlan::Filter(Filter {
+                predicate, input, ..
+            }) => {
+                let physical_input = children.pop().unwrap();
+                let input_dfschema = input.schema();
+
+                let runtime_expr =
+                    self.create_physical_expr(predicate, input_dfschema, 
session_state)?;
+                let selectivity = session_state
+                    .config()
+                    .options()
+                    .optimizer
+                    .default_filter_selectivity;
+                let filter = FilterExec::try_new(runtime_expr, 
physical_input)?;
+                Arc::new(filter.with_default_selectivity(selectivity)?)
+            }
+            LogicalPlan::Repartition(Repartition {
+                input,
+                partitioning_scheme,
+            }) => {
+                let physical_input = children.pop().unwrap();
+                let input_dfschema = input.as_ref().schema();
+                let physical_partitioning = match partitioning_scheme {
+                    LogicalPartitioning::RoundRobinBatch(n) => {
+                        Partitioning::RoundRobinBatch(*n)
+                    }
+                    LogicalPartitioning::Hash(expr, n) => {
+                        let runtime_expr = expr
+                            .iter()
+                            .map(|e| {
                                 self.create_physical_expr(
                                     e,
-                                    input_schema,
+                                    input_dfschema,
                                     session_state,
-                                ),
-                                physical_name,
-                            ))
-                        })
-                        .collect::<Result<Vec<_>>>()?;
+                                )
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+                        Partitioning::Hash(runtime_expr, *n)
+                    }
+                    LogicalPartitioning::DistributeBy(_) => {
+                        return not_impl_err!(
+                            "Physical plan does not support DistributeBy 
partitioning"
+                        );
+                    }
+                };
+                Arc::new(RepartitionExec::try_new(
+                    physical_input,
+                    physical_partitioning,
+                )?)
+            }
+            LogicalPlan::Sort(Sort {
+                expr, input, fetch, ..
+            }) => {
+                let physical_input = children.pop().unwrap();
+                let input_dfschema = input.as_ref().schema();
+                let sort_expr = create_physical_sort_exprs(
+                    expr,
+                    input_dfschema,
+                    session_state.execution_props(),
+                )?;
+                let new_sort =
+                    SortExec::new(sort_expr, 
physical_input).with_fetch(*fetch);
+                Arc::new(new_sort)
+            }
+            LogicalPlan::Subquery(_) => todo!(),
+            LogicalPlan::SubqueryAlias(_) => children.pop().unwrap(),
+            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
+                let input = children.pop().unwrap();
+
+                // GlobalLimitExec requires a single partition for input
+                let input = if input.output_partitioning().partition_count() 
== 1 {
+                    input
+                } else {
+                    // Apply a LocalLimitExec to each partition. The optimizer 
will also insert
+                    // a CoalescePartitionsExec between the GlobalLimitExec 
and LocalLimitExec
+                    if let Some(fetch) = fetch {
+                        Arc::new(LocalLimitExec::new(input, *fetch + skip))
+                    } else {
+                        input
+                    }
+                };
 
-                    Ok(Arc::new(ProjectionExec::try_new(
-                        physical_exprs,
-                        input_exec,
-                    )?))
-                }
-                LogicalPlan::Filter(filter) => {
-                    let physical_input = 
self.create_initial_plan(&filter.input, session_state).await?;
-                    let input_dfschema = filter.input.schema();
+                Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
+            }
+            LogicalPlan::Unnest(Unnest {
+                column,
+                schema,
+                options,
+                ..
+            }) => {
+                let input = children.pop().unwrap();
+                let column_exec = schema
+                    .index_of_column(column)
+                    .map(|idx| Column::new(&column.name, idx))?;
+                let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+                Arc::new(UnnestExec::new(input, column_exec, schema, 
options.clone()))
+            }
 
-                    let runtime_expr = self.create_physical_expr(
-                        &filter.predicate,
-                        input_dfschema,
-                        session_state,
-                    )?;
-                    let selectivity = 
session_state.config().options().optimizer.default_filter_selectivity;
-                    let filter = FilterExec::try_new(runtime_expr, 
physical_input)?;
-                    Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
-                }
-                LogicalPlan::Union(Union { inputs, schema: _ }) => {
-                    let physical_plans = 
self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), 
session_state).await?;
+            // 2 Children
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                on: keys,
+                filter,
+                join_type,
+                null_equals_null,
+                schema: join_schema,
+                ..
+            }) => {
+                let null_equals_null = *null_equals_null;
+
+                let physical_right = children.pop().unwrap();
+                let physical_left = children.pop().unwrap();
+
+                // If join has expression equijoin keys, add physical 
projection.
+                let has_expr_join_key = keys.iter().any(|(l, r)| {
+                    !(matches!(l, Expr::Column(_)) && matches!(r, 
Expr::Column(_)))
+                });
+                let (new_logical, physical_left, physical_right) = if 
has_expr_join_key {
+                    // TODO: Can we extract this transformation to somewhere 
before physical plan
+                    //       creation?
+                    let (left_keys, right_keys): (Vec<_>, Vec<_>) =
+                        keys.iter().cloned().unzip();
+
+                    let (left, left_col_keys, left_projected) =
+                        wrap_projection_for_join_if_necessary(
+                            &left_keys,
+                            left.as_ref().clone(),
+                        )?;
+                    let (right, right_col_keys, right_projected) =
+                        wrap_projection_for_join_if_necessary(
+                            &right_keys,
+                            right.as_ref().clone(),
+                        )?;
+                    let column_on = (left_col_keys, right_col_keys);
+
+                    let left = Arc::new(left);
+                    let right = Arc::new(right);
+                    let new_join = 
LogicalPlan::Join(Join::try_new_with_project_input(
+                        node,
+                        left.clone(),
+                        right.clone(),
+                        column_on,
+                    )?);
 
-                    Ok(Arc::new(UnionExec::new(physical_plans)))
-                }
-                LogicalPlan::Repartition(Repartition {
-                    input,
-                    partitioning_scheme,
-                }) => {
-                    let physical_input = self.create_initial_plan(input, 
session_state).await?;
-                    let input_dfschema = input.as_ref().schema();
-                    let physical_partitioning = match partitioning_scheme {
-                        LogicalPartitioning::RoundRobinBatch(n) => {
-                            Partitioning::RoundRobinBatch(*n)
-                        }
-                        LogicalPartitioning::Hash(expr, n) => {
-                            let runtime_expr = expr
-                                .iter()
-                                .map(|e| {
-                                    self.create_physical_expr(
-                                        e,
-                                        input_dfschema,
-                                        session_state,
-                                    )
-                                })
-                                .collect::<Result<Vec<_>>>()?;
-                            Partitioning::Hash(runtime_expr, *n)
-                        }
-                        LogicalPartitioning::DistributeBy(_) => {
-                            return not_impl_err!("Physical plan does not 
support DistributeBy partitioning");
-                        }
+                    // If inputs were projected then create ExecutionPlan for 
these new
+                    // LogicalPlan nodes.
+                    let physical_left = match (left_projected, left.as_ref()) {
+                        // If left_projected is true we are guaranteed that 
left is a Projection
+                        (
+                            true,
+                            LogicalPlan::Projection(Projection { input, expr, 
.. }),
+                        ) => self.create_project_physical_exec(
+                            session_state,
+                            physical_left,
+                            input,
+                            expr,
+                        )?,
+                        _ => physical_left,

Review Comment:
   I agree it is nasty and splitting it off, or moving it to some other part of 
the code I think sounds like a good idea to me
   
   Perhaps as a follow on PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to