This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b54adb36b8 Refactor physical create_initial_plan to iteratively & 
concurrently construct plan from the bottom up (#10023)
b54adb36b8 is described below

commit b54adb36b855f198b5098fdb3e4cdf1934818efd
Author: Jeffrey Vo <[email protected]>
AuthorDate: Wed Apr 17 01:59:42 2024 +1000

    Refactor physical create_initial_plan to iteratively & concurrently 
construct plan from the bottom up (#10023)
    
    * Refactor physical create_initial_plan to construct bottom up
    
    * Refactor node mapping into separate function
    
    * Experiment with concurrent bottom up physical planning
    
    * Refactoring and comments
    
    * Remove unnecessary collect
    
    * Preallocate vector capacity
    
    * Remove children.clone()
    
    * Introduce ChildrenContainer enum
    
    * Formatting
    
    * Fix case where extension may have 0 or 1 children
    
    * Documentation and cleanup unwraps
    
    * Minor changes
    
    - Use tokio::Mutex in async environment
    - Remove Option from enum, since it is only used for taking.
    
    ---------
    
    Co-authored-by: metesynnada <[email protected]>
---
 datafusion/core/src/physical_planner.rs | 1827 ++++++++++++++++++-------------
 datafusion/core/tests/tpcds_planning.rs |    1 -
 2 files changed, 1056 insertions(+), 772 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index f5e937bb56..301f68c0f2 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -17,6 +17,7 @@
 
 //! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
 
+use std::borrow::Cow;
 use std::collections::HashMap;
 use std::fmt::Write;
 use std::sync::Arc;
@@ -35,12 +36,11 @@ use crate::error::{DataFusionError, Result};
 use crate::execution::context::{ExecutionProps, SessionState};
 use crate::logical_expr::utils::generate_sort_key;
 use crate::logical_expr::{
-    Aggregate, EmptyRelation, Join, Projection, Sort, SubqueryAlias, 
TableScan, Unnest,
-    Window,
+    Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, 
Window,
 };
 use crate::logical_expr::{
-    CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning, 
PlanType,
-    Repartition, Union, UserDefinedLogicalNode,
+    Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, 
Repartition,
+    UserDefinedLogicalNode,
 };
 use crate::logical_expr::{Limit, Values};
 use crate::physical_expr::{create_physical_expr, create_physical_exprs};
@@ -74,9 +74,11 @@ use arrow::compute::SortOptions;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow_array::builder::StringBuilder;
 use arrow_array::RecordBatch;
+use datafusion_common::config::FormatOptions;
 use datafusion_common::display::ToStringifiedPlan;
 use datafusion_common::{
-    exec_err, internal_err, not_impl_err, plan_err, DFSchema, FileType, 
ScalarValue,
+    exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, 
DFSchema,
+    FileType, ScalarValue,
 };
 use datafusion_expr::dml::CopyTo;
 use datafusion_expr::expr::{
@@ -87,21 +89,20 @@ use datafusion_expr::expr::{
 use datafusion_expr::expr_rewriter::unnormalize_cols;
 use 
datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
 use datafusion_expr::{
-    DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
-    StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
+    DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery,
+    ScalarFunctionDefinition, StringifiedPlan, WindowFrame, WindowFrameBound, 
WriteOp,
 };
 use datafusion_physical_expr::expressions::Literal;
+use datafusion_physical_expr::LexOrdering;
 use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
 use datafusion_sql::utils::window_expr_common_partition_keys;
 
 use async_trait::async_trait;
-use datafusion_common::config::FormatOptions;
-use datafusion_physical_expr::LexOrdering;
-use futures::future::BoxFuture;
-use futures::{FutureExt, StreamExt, TryStreamExt};
+use futures::{StreamExt, TryStreamExt};
 use itertools::{multiunzip, Itertools};
 use log::{debug, trace};
 use sqlparser::ast::NullTreatment;
+use tokio::sync::Mutex;
 
 fn create_function_physical_name(
     fun: &str,
@@ -445,6 +446,22 @@ pub trait ExtensionPlanner {
 
 /// Default single node physical query planner that converts a
 /// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
+///
+/// This planner will first flatten the `LogicalPlan` tree via a
+/// depth first approach, which allows it to identify the leaves
+/// of the tree.
+///
+/// Tasks are spawned from these leaves and traverse back up the
+/// tree towards the root, converting each `LogicalPlan` node it
+/// reaches into their equivalent `ExecutionPlan` node. When these
+/// tasks reach a common node, they will terminate until the last
+/// task reaches the node which will then continue building up the
+/// tree.
+///
+/// Up to [`planning_concurrency`] tasks are buffered at once to
+/// execute concurrently.
+///
+/// [`planning_concurrency`]: 
crate::config::ExecutionOptions::planning_concurrency
 #[derive(Default)]
 pub struct DefaultPhysicalPlanner {
     extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
@@ -485,6 +502,63 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
     }
 }
 
+#[derive(Debug)]
+struct ExecutionPlanChild {
+    /// Index needed to order children of parent to ensure consistency with 
original
+    /// `LogicalPlan`
+    index: usize,
+    plan: Arc<dyn ExecutionPlan>,
+}
+
+#[derive(Debug)]
+enum NodeState {
+    ZeroOrOneChild,
+    /// Nodes with multiple children will have multiple tasks accessing it,
+    /// and each task will append their contribution until the last task takes
+    /// all the children to build the parent node.
+    TwoOrMoreChildren(Mutex<Vec<ExecutionPlanChild>>),
+}
+
+/// To avoid needing to pass single child wrapped in a Vec for nodes
+/// with only one child.
+enum ChildrenContainer {
+    None,
+    One(Arc<dyn ExecutionPlan>),
+    Multiple(Vec<Arc<dyn ExecutionPlan>>),
+}
+
+impl ChildrenContainer {
+    fn one(self) -> Result<Arc<dyn ExecutionPlan>> {
+        match self {
+            Self::One(p) => Ok(p),
+            _ => internal_err!("More than one child in ChildrenContainer"),
+        }
+    }
+
+    fn two(self) -> Result<[Arc<dyn ExecutionPlan>; 2]> {
+        match self {
+            Self::Multiple(v) if v.len() == 2 => Ok(v.try_into().unwrap()),
+            _ => internal_err!("ChildrenContainer doesn't contain exactly 2 
children"),
+        }
+    }
+
+    fn vec(self) -> Vec<Arc<dyn ExecutionPlan>> {
+        match self {
+            Self::None => vec![],
+            Self::One(p) => vec![p],
+            Self::Multiple(v) => v,
+        }
+    }
+}
+
+#[derive(Debug)]
+struct LogicalNode<'a> {
+    node: &'a LogicalPlan,
+    // None if root
+    parent_index: Option<usize>,
+    state: NodeState,
+}
+
 impl DefaultPhysicalPlanner {
     /// Create a physical planner that uses `extension_planners` to
     /// plan user-defined logical nodes [`LogicalPlan::Extension`].
@@ -496,777 +570,983 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
-    /// Create a physical plans for multiple logical plans.
-    ///
-    /// This is the same as [`create_initial_plan`](Self::create_initial_plan) 
but runs the planning concurrently.
-    ///
-    /// The result order is the same as the input order.
-    fn create_initial_plan_multi<'a>(
-        &'a self,
-        logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
-        session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
-        async move {
-            // First build futures with as little references as possible, then 
performing some stream magic.
-            // Otherwise rustc bails out w/:
-            //
-            //   error: higher-ranked lifetime error
-            //   ...
-            //   note: could not prove `[async block@...]: std::marker::Send`
-            let futures = logical_plans
-                .into_iter()
-                .enumerate()
-                .map(|(idx, lp)| async move {
-                    let plan = self.create_initial_plan(lp, 
session_state).await?;
-                    Ok((idx, plan)) as Result<_>
-                })
-                .collect::<Vec<_>>();
-
-            let mut physical_plans = futures::stream::iter(futures)
-                .buffer_unordered(
-                    session_state
-                        .config_options()
-                        .execution
-                        .planning_concurrency,
-                )
-                .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
-                .await?;
-            physical_plans.sort_by_key(|(idx, _plan)| *idx);
-            let physical_plans = physical_plans
-                .into_iter()
-                .map(|(_idx, plan)| plan)
-                .collect::<Vec<_>>();
-            Ok(physical_plans)
-        }
-        .boxed()
+    /// Create a physical plan from a logical plan
+    async fn create_initial_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // DFS the tree to flatten it into a Vec.
+        // This will allow us to build the Physical Plan from the leaves up
+        // to avoid recursion, and also to make it easier to build a valid
+        // Physical Plan from the start and not rely on some intermediate
+        // representation (since parents need to know their children at
+        // construction time).
+        let mut flat_tree = vec![];
+        let mut dfs_visit_stack = vec![(None, logical_plan)];
+        // Use this to be able to find the leaves to start construction bottom
+        // up concurrently.
+        let mut flat_tree_leaf_indices = vec![];
+        while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+            let current_index = flat_tree.len();
+            // Because of how we extend the visit stack here, we visit the 
children
+            // in reverse order of how they appear, so later we need to reverse
+            // the order of children when building the nodes.
+            dfs_visit_stack
+                .extend(node.inputs().iter().map(|&n| (Some(current_index), 
n)));
+            let state = match node.inputs().len() {
+                0 => {
+                    flat_tree_leaf_indices.push(current_index);
+                    NodeState::ZeroOrOneChild
+                }
+                1 => NodeState::ZeroOrOneChild,
+                _ => {
+                    let ready_children = 
Vec::with_capacity(node.inputs().len());
+                    let ready_children = Mutex::new(ready_children);
+                    NodeState::TwoOrMoreChildren(ready_children)
+                }
+            };
+            let node = LogicalNode {
+                node,
+                parent_index,
+                state,
+            };
+            flat_tree.push(node);
+        }
+        let flat_tree = Arc::new(flat_tree);
+
+        let planning_concurrency = session_state
+            .config_options()
+            .execution
+            .planning_concurrency;
+        // Can never spawn more tasks than leaves in the tree, as these tasks 
must
+        // all converge down to the root node, which can only be processed by a
+        // single task.
+        let max_concurrency = 
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+        // Spawning tasks which will traverse leaf up to the root.
+        let tasks = flat_tree_leaf_indices
+            .into_iter()
+            .map(|index| self.task_helper(index, flat_tree.clone(), 
session_state));
+        let mut outputs = futures::stream::iter(tasks)
+            .buffer_unordered(max_concurrency)
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Ideally this never happens if we have a valid LogicalPlan tree
+        if outputs.len() != 1 {
+            return internal_err!(
+                "Failed to convert LogicalPlan to ExecutionPlan: More than one 
root detected"
+            );
+        }
+        let plan = outputs.pop().unwrap();
+        Ok(plan)
     }
 
-    /// Create a physical plan from a logical plan
-    fn create_initial_plan<'a>(
+    /// These tasks start at a leaf and traverse up the tree towards the root, 
building
+    /// an ExecutionPlan as they go. When they reach a node with two or more 
children,
+    /// they append their current result (a child of the parent node) to the 
children
+    /// vector, and if this is sufficient to create the parent then continues 
traversing
+    /// the tree to create nodes. Otherwise, the task terminates.
+    async fn task_helper<'a>(
         &'a self,
-        logical_plan: &'a LogicalPlan,
+        leaf_starter_index: usize,
+        flat_tree: Arc<Vec<LogicalNode<'a>>>,
         session_state: &'a SessionState,
-    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
-        async move {
-            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projection,
-                    filters,
-                    fetch,
-                    ..
-                }) => {
-                    let source = source_as_provider(source)?;
-                    // Remove all qualifiers from the scan as the provider
-                    // doesn't know (nor should care) how the relation was
-                    // referred to in the query
-                    let filters = unnormalize_cols(filters.iter().cloned());
-                    source.scan(session_state, projection.as_ref(), &filters, 
*fetch).await
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // We always start with a leaf, so can ignore status and pass empty 
children
+        let mut node = flat_tree.get(leaf_starter_index).ok_or_else(|| {
+            internal_datafusion_err!(
+                "Invalid index whilst creating initial physical plan"
+            )
+        })?;
+        let mut plan = self
+            .map_logical_node_to_physical(
+                node.node,
+                session_state,
+                ChildrenContainer::None,
+            )
+            .await?;
+        let mut current_index = leaf_starter_index;
+        // parent_index is None only for root
+        while let Some(parent_index) = node.parent_index {
+            node = flat_tree.get(parent_index).ok_or_else(|| {
+                internal_datafusion_err!(
+                    "Invalid index whilst creating initial physical plan"
+                )
+            })?;
+            match &node.state {
+                NodeState::ZeroOrOneChild => {
+                    plan = self
+                        .map_logical_node_to_physical(
+                            node.node,
+                            session_state,
+                            ChildrenContainer::One(plan),
+                        )
+                        .await?;
                 }
-                LogicalPlan::Copy(CopyTo{
-                    input,
-                    output_url,
-                    format_options,
-                    partition_by,
-                    options: source_option_tuples
-                }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let parsed_url = ListingTableUrl::parse(output_url)?;
-                    let object_store_url = parsed_url.object_store();
-
-                    let schema: Schema = (**input.schema()).clone().into();
-
-                    // Note: the DataType passed here is ignored for the 
purposes of writing and inferred instead
-                    // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
-                    // the column name rather than column name + explicit data 
type.
-                    let table_partition_cols = partition_by.iter()
-                        .map(|s| (s.to_string(), arrow_schema::DataType::Null))
-                        .collect::<Vec<_>>();
-
-                    // Set file sink related options
-                    let config = FileSinkConfig {
-                        object_store_url,
-                        table_paths: vec![parsed_url],
-                        file_groups: vec![],
-                        output_schema: Arc::new(schema),
-                        table_partition_cols,
-                        overwrite: false,
-                    };
-                    let mut table_options = 
session_state.default_table_options();
-                    let sink_format: Arc<dyn FileFormat> = match 
format_options {
-                        FormatOptions::CSV(options) => {
-                            table_options.csv = options.clone();
-                            table_options.set_file_format(FileType::CSV);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(CsvFormat::default().with_options(table_options.csv))
-                        },
-                        FormatOptions::JSON(options) => {
-                            table_options.json = options.clone();
-                            table_options.set_file_format(FileType::JSON);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(JsonFormat::default().with_options(table_options.json))
-                        },
-                        #[cfg(feature = "parquet")]
-                        FormatOptions::PARQUET(options) => {
-                            table_options.parquet = options.clone();
-                            table_options.set_file_format(FileType::PARQUET);
-                            
table_options.alter_with_string_hash_map(source_option_tuples)?;
-                            
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
-                        },
-                        FormatOptions::AVRO => Arc::new(AvroFormat {} ),
-                        FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                // See if we have all children to build the node.
+                NodeState::TwoOrMoreChildren(children) => {
+                    let mut children: Vec<ExecutionPlanChild> = {
+                        let mut guard = children.lock().await;
+                        // Add our contribution to this parent node.
+                        // Vec is pre-allocated so no allocation should occur 
here.
+                        guard.push(ExecutionPlanChild {
+                            index: current_index,
+                            plan,
+                        });
+                        if guard.len() < node.node.inputs().len() {
+                            // This node is not ready yet, still pending more 
children.
+                            // This task is finished forever.
+                            return Ok(None);
+                        }
+
+                        // With this task's contribution we have enough 
children.
+                        // This task is the only one building this node now, 
and thus
+                        // no other task will need the Mutex for this node, so 
take
+                        // all children.
+                        std::mem::take(guard.as_mut())
                     };
 
-                    sink_format.create_writer_physical_plan(input_exec, 
session_state, config, None).await
+                    // Indices refer to position in flat tree Vec, which means 
they are
+                    // guaranteed to be unique, hence unstable sort used.
+                    //
+                    // We reverse sort because of how we visited the node in 
the initial
+                    // DFS traversal (see above).
+                    children.sort_unstable_by_key(|epc| 
std::cmp::Reverse(epc.index));
+                    let children = children.into_iter().map(|epc| 
epc.plan).collect();
+                    let children = ChildrenContainer::Multiple(children);
+                    plan = self
+                        .map_logical_node_to_physical(node.node, 
session_state, children)
+                        .await?;
                 }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertInto,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
false).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
+            }
+            current_index = parent_index;
+        }
+        // Only one task should ever reach this point for a valid LogicalPlan 
tree.
+        Ok(Some(plan))
+    }
+
+    /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan 
counterpart.
+    async fn map_logical_node_to_physical(
+        &self,
+        node: &LogicalPlan,
+        session_state: &SessionState,
+        children: ChildrenContainer,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let exec_node: Arc<dyn ExecutionPlan> = match node {
+            // Leaves (no children)
+            LogicalPlan::TableScan(TableScan {
+                source,
+                projection,
+                filters,
+                fetch,
+                ..
+            }) => {
+                let source = source_as_provider(source)?;
+                // Remove all qualifiers from the scan as the provider
+                // doesn't know (nor should care) how the relation was
+                // referred to in the query
+                let filters = unnormalize_cols(filters.iter().cloned());
+                source
+                    .scan(session_state, projection.as_ref(), &filters, *fetch)
+                    .await?
+            }
+            LogicalPlan::Values(Values { values, schema }) => {
+                let exec_schema = schema.as_ref().to_owned().into();
+                let exprs = values
+                    .iter()
+                    .map(|row| {
+                        row.iter()
+                            .map(|expr| {
+                                self.create_physical_expr(expr, schema, 
session_state)
+                            })
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let value_exec = 
ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
+                Arc::new(value_exec)
+            }
+            LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: false,
+                schema,
+            }) => Arc::new(EmptyExec::new(SchemaRef::new(
+                schema.as_ref().to_owned().into(),
+            ))),
+            LogicalPlan::EmptyRelation(EmptyRelation {
+                produce_one_row: true,
+                schema,
+            }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new(
+                schema.as_ref().to_owned().into(),
+            ))),
+            LogicalPlan::DescribeTable(DescribeTable {
+                schema,
+                output_schema,
+            }) => {
+                let output_schema: Schema = output_schema.as_ref().into();
+                self.plan_describe(schema.clone(), Arc::new(output_schema))?
+            }
+
+            // 1 Child
+            LogicalPlan::Copy(CopyTo {
+                input,
+                output_url,
+                format_options,
+                partition_by,
+                options: source_option_tuples,
+            }) => {
+                let input_exec = children.one()?;
+                let parsed_url = ListingTableUrl::parse(output_url)?;
+                let object_store_url = parsed_url.object_store();
+
+                let schema: Schema = (**input.schema()).clone().into();
+
+                // Note: the DataType passed here is ignored for the purposes 
of writing and inferred instead
+                // from the schema of the RecordBatch being written. This 
allows COPY statements to specify only
+                // the column name rather than column name + explicit data 
type.
+                let table_partition_cols = partition_by
+                    .iter()
+                    .map(|s| (s.to_string(), arrow_schema::DataType::Null))
+                    .collect::<Vec<_>>();
+
+                // Set file sink related options
+                let config = FileSinkConfig {
+                    object_store_url,
+                    table_paths: vec![parsed_url],
+                    file_groups: vec![],
+                    output_schema: Arc::new(schema),
+                    table_partition_cols,
+                    overwrite: false,
+                };
+                let mut table_options = session_state.default_table_options();
+                let sink_format: Arc<dyn FileFormat> = match format_options {
+                    FormatOptions::CSV(options) => {
+                        table_options.csv = options.clone();
+                        table_options.set_file_format(FileType::CSV);
+                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
+                        
Arc::new(CsvFormat::default().with_options(table_options.csv))
                     }
-                }
-                LogicalPlan::Dml(DmlStatement {
-                    table_name,
-                    op: WriteOp::InsertOverwrite,
-                    input,
-                    ..
-                }) => {
-                    let name = table_name.table();
-                    let schema = 
session_state.schema_for_ref(table_name.clone())?;
-                    if let Some(provider) = schema.table(name).await? {
-                        let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                        provider.insert_into(session_state, input_exec, 
true).await
-                    } else {
-                        return exec_err!(
-                            "Table '{table_name}' does not exist"
-                        );
+                    FormatOptions::JSON(options) => {
+                        table_options.json = options.clone();
+                        table_options.set_file_format(FileType::JSON);
+                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
+                        
Arc::new(JsonFormat::default().with_options(table_options.json))
+                    }
+                    #[cfg(feature = "parquet")]
+                    FormatOptions::PARQUET(options) => {
+                        table_options.parquet = options.clone();
+                        table_options.set_file_format(FileType::PARQUET);
+                        
table_options.alter_with_string_hash_map(source_option_tuples)?;
+                        Arc::new(
+                            
ParquetFormat::default().with_options(table_options.parquet),
+                        )
                     }
+                    FormatOptions::AVRO => Arc::new(AvroFormat {}),
+                    FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+                };
+
+                sink_format
+                    .create_writer_physical_plan(input_exec, session_state, 
config, None)
+                    .await?
+            }
+            LogicalPlan::Dml(DmlStatement {
+                table_name,
+                op: WriteOp::InsertInto,
+                ..
+            }) => {
+                let name = table_name.table();
+                let schema = session_state.schema_for_ref(table_name.clone())?;
+                if let Some(provider) = schema.table(name).await? {
+                    let input_exec = children.one()?;
+                    provider
+                        .insert_into(session_state, input_exec, false)
+                        .await?
+                } else {
+                    return exec_err!("Table '{table_name}' does not exist");
                 }
-                LogicalPlan::Values(Values {
-                    values,
-                    schema,
-                }) => {
-                    let exec_schema = schema.as_ref().to_owned().into();
-                    let exprs = values.iter()
-                        .map(|row| {
-                            row.iter().map(|expr| {
-                                self.create_physical_expr(
-                                    expr,
-                                    schema,
-                                    session_state,
-                                )
-                            })
-                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-                    let value_exec = ValuesExec::try_new(
-                        SchemaRef::new(exec_schema),
-                        exprs,
-                    )?;
-                    Ok(Arc::new(value_exec))
+            }
+            LogicalPlan::Dml(DmlStatement {
+                table_name,
+                op: WriteOp::InsertOverwrite,
+                ..
+            }) => {
+                let name = table_name.table();
+                let schema = session_state.schema_for_ref(table_name.clone())?;
+                if let Some(provider) = schema.table(name).await? {
+                    let input_exec = children.one()?;
+                    provider
+                        .insert_into(session_state, input_exec, true)
+                        .await?
+                } else {
+                    return exec_err!("Table '{table_name}' does not exist");
                 }
-                LogicalPlan::Window(Window {
-                    input, window_expr, ..
-                }) => {
-                    if window_expr.is_empty() {
-                        return internal_err!(
-                            "Impossibly got empty window expression"
-                        );
+            }
+            LogicalPlan::Window(Window {
+                input, window_expr, ..
+            }) => {
+                if window_expr.is_empty() {
+                    return internal_err!("Impossibly got empty window 
expression");
+                }
+
+                let input_exec = children.one()?;
+
+                // at this moment we are guaranteed by the logical planner
+                // to have all the window_expr to have equal sort key
+                let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
+
+                let can_repartition = !partition_keys.is_empty()
+                    && session_state.config().target_partitions() > 1
+                    && session_state.config().repartition_window_functions();
+
+                let physical_partition_keys = if can_repartition {
+                    partition_keys
+                        .iter()
+                        .map(|e| {
+                            self.create_physical_expr(e, input.schema(), 
session_state)
+                        })
+                        .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
+                } else {
+                    vec![]
+                };
+
+                let get_sort_keys = |expr: &Expr| match expr {
+                    Expr::WindowFunction(WindowFunction {
+                        ref partition_by,
+                        ref order_by,
+                        ..
+                    }) => generate_sort_key(partition_by, order_by),
+                    Expr::Alias(Alias { expr, .. }) => {
+                        // Convert &Box<T> to &T
+                        match &**expr {
+                            Expr::WindowFunction(WindowFunction {
+                                ref partition_by,
+                                ref order_by,
+                                ..
+                            }) => generate_sort_key(partition_by, order_by),
+                            _ => unreachable!(),
+                        }
                     }
+                    _ => unreachable!(),
+                };
+                let sort_keys = get_sort_keys(&window_expr[0])?;
+                if window_expr.len() > 1 {
+                    debug_assert!(
+                            window_expr[1..]
+                                .iter()
+                                .all(|expr| get_sort_keys(expr).unwrap() == 
sort_keys),
+                            "all window expressions shall have the same sort 
keys, as guaranteed by logical planning"
+                        );
+                }
 
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
+                let logical_schema = node.schema();
+                let window_expr = window_expr
+                    .iter()
+                    .map(|e| {
+                        create_window_expr(
+                            e,
+                            logical_schema,
+                            session_state.execution_props(),
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
 
-                    // at this moment we are guaranteed by the logical planner
-                    // to have all the window_expr to have equal sort key
-                    let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
+                let uses_bounded_memory =
+                    window_expr.iter().all(|e| e.uses_bounded_memory());
+                // If all window expressions can run with bounded memory,
+                // choose the bounded window variant:
+                if uses_bounded_memory {
+                    Arc::new(BoundedWindowAggExec::try_new(
+                        window_expr,
+                        input_exec,
+                        physical_partition_keys,
+                        InputOrderMode::Sorted,
+                    )?)
+                } else {
+                    Arc::new(WindowAggExec::try_new(
+                        window_expr,
+                        input_exec,
+                        physical_partition_keys,
+                    )?)
+                }
+            }
+            LogicalPlan::Aggregate(Aggregate {
+                input,
+                group_expr,
+                aggr_expr,
+                ..
+            }) => {
+                // Initially need to perform the aggregate and then merge the 
partitions
+                let input_exec = children.one()?;
+                let physical_input_schema = input_exec.schema();
+                let logical_input_schema = input.as_ref().schema();
+
+                let groups = self.create_grouping_physical_expr(
+                    group_expr,
+                    logical_input_schema,
+                    &physical_input_schema,
+                    session_state,
+                )?;
 
-                    let can_repartition = !partition_keys.is_empty()
-                        && session_state.config().target_partitions() > 1
-                        && 
session_state.config().repartition_window_functions();
+                let agg_filter = aggr_expr
+                    .iter()
+                    .map(|e| {
+                        create_aggregate_expr_and_maybe_filter(
+                            e,
+                            logical_input_schema,
+                            &physical_input_schema,
+                            session_state.execution_props(),
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
 
-                    let physical_partition_keys = if can_repartition
-                    {
-                        partition_keys
+                let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, 
Vec<_>) =
+                    multiunzip(agg_filter);
+
+                let initial_aggr = Arc::new(AggregateExec::try_new(
+                    AggregateMode::Partial,
+                    groups.clone(),
+                    aggregates.clone(),
+                    filters.clone(),
+                    input_exec,
+                    physical_input_schema.clone(),
+                )?);
+
+                // update group column indices based on partial aggregate plan 
evaluation
+                let final_group: Vec<Arc<dyn PhysicalExpr>> =
+                    initial_aggr.output_group_expr();
+
+                let can_repartition = !groups.is_empty()
+                    && session_state.config().target_partitions() > 1
+                    && session_state.config().repartition_aggregations();
+
+                // Some aggregators may be modified during initialization for
+                // optimization purposes. For example, a FIRST_VALUE may turn
+                // into a LAST_VALUE with the reverse ordering requirement.
+                // To reflect such changes to subsequent stages, use the 
updated
+                // `AggregateExpr`/`PhysicalSortExpr` objects.
+                let updated_aggregates = initial_aggr.aggr_expr().to_vec();
+
+                let next_partition_mode = if can_repartition {
+                    // construct a second aggregation with 
'AggregateMode::FinalPartitioned'
+                    AggregateMode::FinalPartitioned
+                } else {
+                    // construct a second aggregation, keeping the final 
column name equal to the
+                    // first aggregation and the expressions corresponding to 
the respective aggregate
+                    AggregateMode::Final
+                };
+
+                let final_grouping_set = PhysicalGroupBy::new_single(
+                    final_group
+                        .iter()
+                        .enumerate()
+                        .map(|(i, expr)| (expr.clone(), 
groups.expr()[i].1.clone()))
+                        .collect(),
+                );
+
+                Arc::new(AggregateExec::try_new(
+                    next_partition_mode,
+                    final_grouping_set,
+                    updated_aggregates,
+                    filters,
+                    initial_aggr,
+                    physical_input_schema.clone(),
+                )?)
+            }
+            LogicalPlan::Projection(Projection { input, expr, .. }) => self
+                .create_project_physical_exec(
+                    session_state,
+                    children.one()?,
+                    input,
+                    expr,
+                )?,
+            LogicalPlan::Filter(Filter {
+                predicate, input, ..
+            }) => {
+                let physical_input = children.one()?;
+                let input_dfschema = input.schema();
+
+                let runtime_expr =
+                    self.create_physical_expr(predicate, input_dfschema, 
session_state)?;
+                let selectivity = session_state
+                    .config()
+                    .options()
+                    .optimizer
+                    .default_filter_selectivity;
+                let filter = FilterExec::try_new(runtime_expr, 
physical_input)?;
+                Arc::new(filter.with_default_selectivity(selectivity)?)
+            }
+            LogicalPlan::Repartition(Repartition {
+                input,
+                partitioning_scheme,
+            }) => {
+                let physical_input = children.one()?;
+                let input_dfschema = input.as_ref().schema();
+                let physical_partitioning = match partitioning_scheme {
+                    LogicalPartitioning::RoundRobinBatch(n) => {
+                        Partitioning::RoundRobinBatch(*n)
+                    }
+                    LogicalPartitioning::Hash(expr, n) => {
+                        let runtime_expr = expr
                             .iter()
                             .map(|e| {
                                 self.create_physical_expr(
                                     e,
-                                    input.schema(),
+                                    input_dfschema,
                                     session_state,
                                 )
                             })
-                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
-                    } else {
-                        vec![]
-                    };
-
-                    let get_sort_keys = |expr: &Expr| match expr {
-                        Expr::WindowFunction(WindowFunction{
-                            ref partition_by,
-                            ref order_by,
-                            ..
-                        }) => generate_sort_key(partition_by, order_by),
-                        Expr::Alias(Alias{expr,..}) => {
-                            // Convert &Box<T> to &T
-                            match &**expr {
-                                Expr::WindowFunction(WindowFunction{
-                                    ref partition_by,
-                                    ref order_by,
-                                    ..}) => generate_sort_key(partition_by, 
order_by),
-                                _ => unreachable!(),
-                            }
-                        }
-                        _ => unreachable!(),
-                    };
-                    let sort_keys = get_sort_keys(&window_expr[0])?;
-                    if window_expr.len() > 1 {
-                        debug_assert!(
-                            window_expr[1..]
-                                .iter()
-                                .all(|expr| get_sort_keys(expr).unwrap() == 
sort_keys),
-                            "all window expressions shall have the same sort 
keys, as guaranteed by logical planning"
+                            .collect::<Result<Vec<_>>>()?;
+                        Partitioning::Hash(runtime_expr, *n)
+                    }
+                    LogicalPartitioning::DistributeBy(_) => {
+                        return not_impl_err!(
+                            "Physical plan does not support DistributeBy 
partitioning"
                         );
                     }
-
-                    let logical_schema = logical_plan.schema();
-                    let window_expr = window_expr
-                        .iter()
-                        .map(|e| {
-                            create_window_expr(
-                                e,
-                                logical_schema,
-                                session_state.execution_props(),
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-
-                    let uses_bounded_memory = window_expr
-                        .iter()
-                        .all(|e| e.uses_bounded_memory());
-                    // If all window expressions can run with bounded memory,
-                    // choose the bounded window variant:
-                    Ok(if uses_bounded_memory {
-                        Arc::new(BoundedWindowAggExec::try_new(
-                            window_expr,
-                            input_exec,
-                            physical_partition_keys,
-                            InputOrderMode::Sorted,
-                        )?)
+                };
+                Arc::new(RepartitionExec::try_new(
+                    physical_input,
+                    physical_partitioning,
+                )?)
+            }
+            LogicalPlan::Sort(Sort {
+                expr, input, fetch, ..
+            }) => {
+                let physical_input = children.one()?;
+                let input_dfschema = input.as_ref().schema();
+                let sort_expr = create_physical_sort_exprs(
+                    expr,
+                    input_dfschema,
+                    session_state.execution_props(),
+                )?;
+                let new_sort =
+                    SortExec::new(sort_expr, 
physical_input).with_fetch(*fetch);
+                Arc::new(new_sort)
+            }
+            LogicalPlan::Subquery(_) => todo!(),
+            LogicalPlan::SubqueryAlias(_) => children.one()?,
+            LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
+                let input = children.one()?;
+
+                // GlobalLimitExec requires a single partition for input
+                let input = if input.output_partitioning().partition_count() 
== 1 {
+                    input
+                } else {
+                    // Apply a LocalLimitExec to each partition. The optimizer 
will also insert
+                    // a CoalescePartitionsExec between the GlobalLimitExec 
and LocalLimitExec
+                    if let Some(fetch) = fetch {
+                        Arc::new(LocalLimitExec::new(input, *fetch + skip))
                     } else {
-                        Arc::new(WindowAggExec::try_new(
-                            window_expr,
-                            input_exec,
-                            physical_partition_keys,
-                        )?)
+                        input
+                    }
+                };
+
+                Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
+            }
+            LogicalPlan::Unnest(Unnest {
+                columns,
+                schema,
+                options,
+                ..
+            }) => {
+                let input = children.one()?;
+                let column_execs = columns
+                    .iter()
+                    .map(|column| {
+                        schema
+                            .index_of_column(column)
+                            .map(|idx| Column::new(&column.name, idx))
                     })
-                }
-                LogicalPlan::Aggregate(Aggregate {
+                    .collect::<Result<_>>()?;
+                let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+                Arc::new(UnnestExec::new(
                     input,
-                    group_expr,
-                    aggr_expr,
-                    ..
-                }) => {
-                    // Initially need to perform the aggregate and then merge 
the partitions
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let physical_input_schema = input_exec.schema();
-                    let logical_input_schema = input.as_ref().schema();
-
-                    let groups = self.create_grouping_physical_expr(
-                        group_expr,
-                        logical_input_schema,
-                        &physical_input_schema,
-                        session_state)?;
-
-                    let agg_filter = aggr_expr
-                        .iter()
-                        .map(|e| {
-                            create_aggregate_expr_and_maybe_filter(
-                                e,
-                                logical_input_schema,
-                                &physical_input_schema,
-                                session_state.execution_props(),
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-
-                    let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>, 
Vec<_>) = multiunzip(agg_filter);
+                    column_execs,
+                    schema,
+                    options.clone(),
+                ))
+            }
 
-                    let initial_aggr = Arc::new(AggregateExec::try_new(
-                        AggregateMode::Partial,
-                        groups.clone(),
-                        aggregates.clone(),
-                        filters.clone(),
-                        input_exec,
-                        physical_input_schema.clone(),
+            // 2 Children
+            LogicalPlan::Join(Join {
+                left,
+                right,
+                on: keys,
+                filter,
+                join_type,
+                null_equals_null,
+                schema: join_schema,
+                ..
+            }) => {
+                let null_equals_null = *null_equals_null;
+
+                let [physical_left, physical_right] = children.two()?;
+
+                // If join has expression equijoin keys, add physical 
projection.
+                let has_expr_join_key = keys.iter().any(|(l, r)| {
+                    !(matches!(l, Expr::Column(_)) && matches!(r, 
Expr::Column(_)))
+                });
+                let (new_logical, physical_left, physical_right) = if 
has_expr_join_key {
+                    // TODO: Can we extract this transformation to somewhere 
before physical plan
+                    //       creation?
+                    let (left_keys, right_keys): (Vec<_>, Vec<_>) =
+                        keys.iter().cloned().unzip();
+
+                    let (left, left_col_keys, left_projected) =
+                        wrap_projection_for_join_if_necessary(
+                            &left_keys,
+                            left.as_ref().clone(),
+                        )?;
+                    let (right, right_col_keys, right_projected) =
+                        wrap_projection_for_join_if_necessary(
+                            &right_keys,
+                            right.as_ref().clone(),
+                        )?;
+                    let column_on = (left_col_keys, right_col_keys);
+
+                    let left = Arc::new(left);
+                    let right = Arc::new(right);
+                    let new_join = 
LogicalPlan::Join(Join::try_new_with_project_input(
+                        node,
+                        left.clone(),
+                        right.clone(),
+                        column_on,
                     )?);
 
-                    // update group column indices based on partial aggregate 
plan evaluation
-                    let final_group: Vec<Arc<dyn PhysicalExpr>> = 
initial_aggr.output_group_expr();
-
-                    let can_repartition = !groups.is_empty()
-                        && session_state.config().target_partitions() > 1
-                        && session_state.config().repartition_aggregations();
-
-                    // Some aggregators may be modified during initialization 
for
-                    // optimization purposes. For example, a FIRST_VALUE may 
turn
-                    // into a LAST_VALUE with the reverse ordering requirement.
-                    // To reflect such changes to subsequent stages, use the 
updated
-                    // `AggregateExpr`/`PhysicalSortExpr` objects.
-                    let updated_aggregates = initial_aggr.aggr_expr().to_vec();
-
-                    let next_partition_mode = if can_repartition {
-                        // construct a second aggregation with 
'AggregateMode::FinalPartitioned'
-                        AggregateMode::FinalPartitioned
-                    } else {
-                        // construct a second aggregation, keeping the final 
column name equal to the
-                        // first aggregation and the expressions corresponding 
to the respective aggregate
-                        AggregateMode::Final
+                    // If inputs were projected then create ExecutionPlan for 
these new
+                    // LogicalPlan nodes.
+                    let physical_left = match (left_projected, left.as_ref()) {
+                        // If left_projected is true we are guaranteed that 
left is a Projection
+                        (
+                            true,
+                            LogicalPlan::Projection(Projection { input, expr, 
.. }),
+                        ) => self.create_project_physical_exec(
+                            session_state,
+                            physical_left,
+                            input,
+                            expr,
+                        )?,
+                        _ => physical_left,
+                    };
+                    let physical_right = match (right_projected, 
right.as_ref()) {
+                        // If right_projected is true we are guaranteed that 
right is a Projection
+                        (
+                            true,
+                            LogicalPlan::Projection(Projection { input, expr, 
.. }),
+                        ) => self.create_project_physical_exec(
+                            session_state,
+                            physical_right,
+                            input,
+                            expr,
+                        )?,
+                        _ => physical_right,
                     };
 
-                    let final_grouping_set = PhysicalGroupBy::new_single(
-                        final_group
+                    // Remove temporary projected columns
+                    if left_projected || right_projected {
+                        let final_join_result = join_schema
                             .iter()
-                            .enumerate()
-                            .map(|(i, expr)| (expr.clone(), 
groups.expr()[i].1.clone()))
-                            .collect()
-                    );
-
-                    Ok(Arc::new(AggregateExec::try_new(
-                        next_partition_mode,
-                        final_grouping_set,
-                        updated_aggregates,
-                        filters,
-                        initial_aggr,
-                        physical_input_schema.clone(),
-                    )?))
-                }
-                LogicalPlan::Projection(Projection { input, expr, .. }) => {
-                    let input_exec = self.create_initial_plan(input, 
session_state).await?;
-                    let input_schema = input.as_ref().schema();
-
-                    let physical_exprs = expr
-                        .iter()
-                        .map(|e| {
-                            // For projections, SQL planner and logical plan 
builder may convert user
-                            // provided expressions into logical Column 
expressions if their results
-                            // are already provided from the input plans. 
Because we work with
-                            // qualified columns in logical plane, derived 
columns involve operators or
-                            // functions will contain qualifiers as well. This 
will result in logical
-                            // columns with names like `SUM(t1.c1)`, `t1.c1 + 
t1.c2`, etc.
-                            //
-                            // If we run these logical columns through 
physical_name function, we will
-                            // get physical names with column qualifiers, 
which violates DataFusion's
-                            // field name semantics. To account for this, we 
need to derive the
-                            // physical name from physical input instead.
-                            //
-                            // This depends on the invariant that logical 
schema field index MUST match
-                            // with physical schema field index.
-                            let physical_name = if let Expr::Column(col) = e {
-                                match input_schema.index_of_column(col) {
-                                    Ok(idx) => {
-                                        // index physical field using logical 
field index
-                                        
Ok(input_exec.schema().field(idx).name().to_string())
-                                    }
-                                    // logical column is not a derived column, 
safe to pass along to
-                                    // physical_name
-                                    Err(_) => physical_name(e),
-                                }
-                            } else {
-                                physical_name(e)
-                            };
-
-                            tuple_err((
-                                self.create_physical_expr(
-                                    e,
-                                    input_schema,
-                                    session_state,
-                                ),
-                                physical_name,
-                            ))
-                        })
-                        .collect::<Result<Vec<_>>>()?;
+                            .map(|(qualifier, field)| {
+                                Expr::Column(datafusion_common::Column::from((
+                                    qualifier,
+                                    field.as_ref(),
+                                )))
+                            })
+                            .collect::<Vec<_>>();
+                        let projection = 
LogicalPlan::Projection(Projection::try_new(
+                            final_join_result,
+                            Arc::new(new_join),
+                        )?);
+                        // LogicalPlan mutated
+                        (Cow::Owned(projection), physical_left, physical_right)
+                    } else {
+                        // LogicalPlan mutated
+                        (Cow::Owned(new_join), physical_left, physical_right)
+                    }
+                } else {
+                    // LogicalPlan unchanged
+                    (Cow::Borrowed(node), physical_left, physical_right)
+                };
+
+                // Retrieving new left/right and join keys (in case plan was 
mutated above)
+                let (left, right, keys, new_project) = match 
new_logical.as_ref() {
+                    LogicalPlan::Projection(Projection { input, expr, .. }) => 
{
+                        if let LogicalPlan::Join(Join {
+                            left, right, on, ..
+                        }) = input.as_ref()
+                        {
+                            (left, right, on, Some((input, expr)))
+                        } else {
+                            unreachable!()
+                        }
+                    }
+                    LogicalPlan::Join(Join {
+                        left, right, on, ..
+                    }) => (left, right, on, None),
+                    // Should either be the original Join, or Join with a 
Projection on top
+                    _ => unreachable!(),
+                };
+
+                // All equi-join keys are columns now, create physical join 
plan
+                let left_df_schema = left.schema();
+                let right_df_schema = right.schema();
+                let execution_props = session_state.execution_props();
+                let join_on = keys
+                    .iter()
+                    .map(|(l, r)| {
+                        let l = create_physical_expr(l, left_df_schema, 
execution_props)?;
+                        let r =
+                            create_physical_expr(r, right_df_schema, 
execution_props)?;
+                        Ok((l, r))
+                    })
+                    .collect::<Result<join_utils::JoinOn>>()?;
 
-                    Ok(Arc::new(ProjectionExec::try_new(
-                        physical_exprs,
-                        input_exec,
-                    )?))
-                }
-                LogicalPlan::Filter(filter) => {
-                    let physical_input = 
self.create_initial_plan(&filter.input, session_state).await?;
-                    let input_dfschema = filter.input.schema();
+                let join_filter = match filter {
+                    Some(expr) => {
+                        // Extract columns from filter expression and saved in 
a HashSet
+                        let cols = expr.to_columns()?;
 
-                    let runtime_expr = self.create_physical_expr(
-                        &filter.predicate,
-                        input_dfschema,
-                        session_state,
-                    )?;
-                    let selectivity = 
session_state.config().options().optimizer.default_filter_selectivity;
-                    let filter = FilterExec::try_new(runtime_expr, 
physical_input)?;
-                    Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
-                }
-                LogicalPlan::Union(Union { inputs, schema: _ }) => {
-                    let physical_plans = 
self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), 
session_state).await?;
+                        // Collect left & right field indices, the field 
indices are sorted in ascending order
+                        let left_field_indices = cols
+                            .iter()
+                            .filter_map(|c| match 
left_df_schema.index_of_column(c) {
+                                Ok(idx) => Some(idx),
+                                _ => None,
+                            })
+                            .sorted()
+                            .collect::<Vec<_>>();
+                        let right_field_indices = cols
+                            .iter()
+                            .filter_map(|c| match 
right_df_schema.index_of_column(c) {
+                                Ok(idx) => Some(idx),
+                                _ => None,
+                            })
+                            .sorted()
+                            .collect::<Vec<_>>();
 
-                    Ok(Arc::new(UnionExec::new(physical_plans)))
-                }
-                LogicalPlan::Repartition(Repartition {
-                    input,
-                    partitioning_scheme,
-                }) => {
-                    let physical_input = self.create_initial_plan(input, 
session_state).await?;
-                    let input_dfschema = input.as_ref().schema();
-                    let physical_partitioning = match partitioning_scheme {
-                        LogicalPartitioning::RoundRobinBatch(n) => {
-                            Partitioning::RoundRobinBatch(*n)
-                        }
-                        LogicalPartitioning::Hash(expr, n) => {
-                            let runtime_expr = expr
-                                .iter()
-                                .map(|e| {
-                                    self.create_physical_expr(
-                                        e,
-                                        input_dfschema,
-                                        session_state,
+                        // Collect DFFields and Fields required for 
intermediate schemas
+                        let (filter_df_fields, filter_fields): (Vec<_>, 
Vec<_>) =
+                            left_field_indices
+                                .clone()
+                                .into_iter()
+                                .map(|i| {
+                                    (
+                                        left_df_schema.qualified_field(i),
+                                        
physical_left.schema().field(i).clone(),
                                     )
                                 })
-                                .collect::<Result<Vec<_>>>()?;
-                            Partitioning::Hash(runtime_expr, *n)
-                        }
-                        LogicalPartitioning::DistributeBy(_) => {
-                            return not_impl_err!("Physical plan does not 
support DistributeBy partitioning");
-                        }
-                    };
-                    Ok(Arc::new(RepartitionExec::try_new(
-                        physical_input,
-                        physical_partitioning,
-                    )?))
-                }
-                LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
-                    let physical_input = self.create_initial_plan(input, 
session_state).await?;
-                    let input_dfschema = input.as_ref().schema();
-                    let sort_expr = create_physical_sort_exprs(expr, 
input_dfschema, session_state.execution_props())?;
-                    let new_sort = SortExec::new(sort_expr, physical_input)
-                        .with_fetch(*fetch);
-                    Ok(Arc::new(new_sort))
-                }
-                LogicalPlan::Join(Join {
-                    left,
-                    right,
-                    on: keys,
-                    filter,
-                    join_type,
-                    null_equals_null,
-                    schema: join_schema,
-                    ..
-                }) => {
-                    let null_equals_null = *null_equals_null;
-
-                    // If join has expression equijoin keys, add physical 
projection.
-                    let has_expr_join_key = keys.iter().any(|(l, r)| {
-                        !(matches!(l, Expr::Column(_))
-                            && matches!(r, Expr::Column(_)))
-                    });
-                    if has_expr_join_key {
-                        // Logic extracted into a function here as subsequent 
recursive create_initial_plan()
-                        // call can cause a stack overflow for a large number 
of joins.
-                        //
-                        // See #9962 and #1047 for detailed explanation.
-                        let join_plan = 
project_expr_join_keys(keys,left,right,logical_plan,join_schema)?;
-                        return self
-                            .create_initial_plan(&join_plan, session_state)
-                            .await;
-                    }
-
-                    // All equi-join keys are columns now, create physical 
join plan
-                    let left_right = 
self.create_initial_plan_multi([left.as_ref(), right.as_ref()], 
session_state).await?;
-                    let [physical_left, physical_right]: [Arc<dyn 
ExecutionPlan>; 2] = left_right.try_into().map_err(|_| 
DataFusionError::Internal("`create_initial_plan_multi` is 
broken".to_string()))?;
-                    let left_df_schema = left.schema();
-                    let right_df_schema = right.schema();
-                    let execution_props = session_state.execution_props();
-                    let join_on = keys
-                        .iter()
-                        .map(|(l, r)| {
-                            let l = create_physical_expr(
-                                l,
-                                left_df_schema,
-                                execution_props
-                            )?;
-                            let r = create_physical_expr(
-                                r,
-                                right_df_schema,
-                                execution_props
-                            )?;
-                            Ok((l, r))
-                        })
-                        .collect::<Result<join_utils::JoinOn>>()?;
-
-                    let join_filter = match filter {
-                        Some(expr) => {
-                            // Extract columns from filter expression and 
saved in a HashSet
-                            let cols = expr.to_columns()?;
-
-                            // Collect left & right field indices, the field 
indices are sorted in ascending order
-                            let left_field_indices = cols.iter()
-                                .filter_map(|c| match 
left_df_schema.index_of_column(c) {
-                                    Ok(idx) => Some(idx),
-                                    _ => None,
-                                }).sorted()
-                                .collect::<Vec<_>>();
-                            let right_field_indices = cols.iter()
-                                .filter_map(|c| match 
right_df_schema.index_of_column(c) {
-                                    Ok(idx) => Some(idx),
-                                    _ => None,
-                                }).sorted()
-                                .collect::<Vec<_>>();
-
-                            // Collect DFFields and Fields required for 
intermediate schemas
-                            let (filter_df_fields, filter_fields): (Vec<_>, 
Vec<_>) = left_field_indices.clone()
-                                .into_iter()
-                                .map(|i| (
-                                    left_df_schema.qualified_field(i),
-                                    physical_left.schema().field(i).clone(),
-                                ))
-                                .chain(
-                                    right_field_indices.clone()
-                                        .into_iter()
-                                        .map(|i| (
-                                            right_df_schema.qualified_field(i),
-                                            
physical_right.schema().field(i).clone(),
-                                        ))
-                                )
+                                
.chain(right_field_indices.clone().into_iter().map(|i| {
+                                    (
+                                        right_df_schema.qualified_field(i),
+                                        
physical_right.schema().field(i).clone(),
+                                    )
+                                }))
                                 .unzip();
-                            let filter_df_fields = 
filter_df_fields.into_iter().map(|(qualifier, field)| (qualifier.cloned(), 
Arc::new(field.clone()))).collect();
-
-                            // Construct intermediate schemas used for 
filtering data and
-                            // convert logical expression to physical 
according to filter schema
-                            let filter_df_schema = 
DFSchema::new_with_metadata(filter_df_fields, HashMap::new())?;
-                            let filter_schema = 
Schema::new_with_metadata(filter_fields, HashMap::new());
-                            let filter_expr = create_physical_expr(
-                                expr,
-                                &filter_df_schema,
-                                session_state.execution_props(),
-                            )?;
-                            let column_indices = 
join_utils::JoinFilter::build_column_indices(left_field_indices, 
right_field_indices);
-
-                            Some(join_utils::JoinFilter::new(
-                                filter_expr,
-                                column_indices,
-                                filter_schema,
-                            ))
-                        }
-                        _ => None
-                    };
-
-                    let prefer_hash_join = 
session_state.config_options().optimizer.prefer_hash_join;
+                        let filter_df_fields = filter_df_fields
+                            .into_iter()
+                            .map(|(qualifier, field)| {
+                                (qualifier.cloned(), Arc::new(field.clone()))
+                            })
+                            .collect();
+
+                        // Construct intermediate schemas used for filtering 
data and
+                        // convert logical expression to physical according to 
filter schema
+                        let filter_df_schema = DFSchema::new_with_metadata(
+                            filter_df_fields,
+                            HashMap::new(),
+                        )?;
+                        let filter_schema =
+                            Schema::new_with_metadata(filter_fields, 
HashMap::new());
+                        let filter_expr = create_physical_expr(
+                            expr,
+                            &filter_df_schema,
+                            session_state.execution_props(),
+                        )?;
+                        let column_indices = 
join_utils::JoinFilter::build_column_indices(
+                            left_field_indices,
+                            right_field_indices,
+                        );
 
-                    if join_on.is_empty() {
-                        // there is no equal join condition, use the nested 
loop join
-                        // TODO optimize the plan, and use the config of 
`target_partitions` and `repartition_joins`
-                        Ok(Arc::new(NestedLoopJoinExec::try_new(
-                            physical_left,
-                            physical_right,
-                            join_filter,
-                            join_type,
-                        )?))
-                    } else if session_state.config().target_partitions() > 1
-                        && session_state.config().repartition_joins()
-                        && !prefer_hash_join
-                    {
-                        // Use SortMergeJoin if hash join is not preferred
-                        // Sort-Merge join support currently is experimental
-
-                        let join_on_len = join_on.len();
-                        Ok(Arc::new(SortMergeJoinExec::try_new(
-                            physical_left,
-                            physical_right,
-                            join_on,
-                            join_filter,
-                            *join_type,
-                            vec![SortOptions::default(); join_on_len],
-                            null_equals_null,
-                        )?))
-                    } else if session_state.config().target_partitions() > 1
-                        && session_state.config().repartition_joins()
-                        && prefer_hash_join {
-                         let partition_mode = {
-                            if session_state.config().collect_statistics() {
-                                PartitionMode::Auto
-                            } else {
-                                PartitionMode::Partitioned
-                            }
-                         };
-                        Ok(Arc::new(HashJoinExec::try_new(
-                            physical_left,
-                            physical_right,
-                            join_on,
-                            join_filter,
-                            join_type,
-                                                       None,
-                            partition_mode,
-                            null_equals_null,
-                        )?))
-                    } else {
-                        Ok(Arc::new(HashJoinExec::try_new(
-                            physical_left,
-                            physical_right,
-                            join_on,
-                            join_filter,
-                            join_type,
-                                                       None,
-                            PartitionMode::CollectLeft,
-                            null_equals_null,
-                        )?))
+                        Some(join_utils::JoinFilter::new(
+                            filter_expr,
+                            column_indices,
+                            filter_schema,
+                        ))
                     }
-                }
-                LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
-                    let left_right = 
self.create_initial_plan_multi([left.as_ref(), right.as_ref()], 
session_state).await?;
-                    let [left, right]: [Arc<dyn ExecutionPlan>; 2] = 
left_right.try_into().map_err(|_| 
DataFusionError::Internal("`create_initial_plan_multi` is 
broken".to_string()))?;
-                    Ok(Arc::new(CrossJoinExec::new(left, right)))
-                }
-                LogicalPlan::Subquery(_) => todo!(),
-                LogicalPlan::EmptyRelation(EmptyRelation {
-                    produce_one_row: false,
-                    schema,
-                }) => Ok(Arc::new(EmptyExec::new(
-                    SchemaRef::new(schema.as_ref().to_owned().into()),
-                ))),
-                LogicalPlan::EmptyRelation(EmptyRelation {
-                    produce_one_row: true,
-                    schema,
-                }) => Ok(Arc::new(PlaceholderRowExec::new(
-                    SchemaRef::new(schema.as_ref().to_owned().into()),
-                ))),
-                LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
-                    self.create_initial_plan(input, session_state).await
-                }
-                LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
-                    let input = self.create_initial_plan(input, 
session_state).await?;
-
-                    // GlobalLimitExec requires a single partition for input
-                    let input = if 
input.output_partitioning().partition_count() == 1 {
-                        input
-                    } else {
-                        // Apply a LocalLimitExec to each partition. The 
optimizer will also insert
-                        // a CoalescePartitionsExec between the 
GlobalLimitExec and LocalLimitExec
-                        if let Some(fetch) = fetch {
-                            Arc::new(LocalLimitExec::new(input, *fetch + skip))
+                    _ => None,
+                };
+
+                let prefer_hash_join =
+                    session_state.config_options().optimizer.prefer_hash_join;
+
+                let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
+                    // there is no equal join condition, use the nested loop 
join
+                    // TODO optimize the plan, and use the config of 
`target_partitions` and `repartition_joins`
+                    Arc::new(NestedLoopJoinExec::try_new(
+                        physical_left,
+                        physical_right,
+                        join_filter,
+                        join_type,
+                    )?)
+                } else if session_state.config().target_partitions() > 1
+                    && session_state.config().repartition_joins()
+                    && !prefer_hash_join
+                {
+                    // Use SortMergeJoin if hash join is not preferred
+                    // Sort-Merge join support currently is experimental
+
+                    let join_on_len = join_on.len();
+                    Arc::new(SortMergeJoinExec::try_new(
+                        physical_left,
+                        physical_right,
+                        join_on,
+                        join_filter,
+                        *join_type,
+                        vec![SortOptions::default(); join_on_len],
+                        null_equals_null,
+                    )?)
+                } else if session_state.config().target_partitions() > 1
+                    && session_state.config().repartition_joins()
+                    && prefer_hash_join
+                {
+                    let partition_mode = {
+                        if session_state.config().collect_statistics() {
+                            PartitionMode::Auto
                         } else {
-                            input
+                            PartitionMode::Partitioned
                         }
                     };
-
-                    Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
-                }
-                LogicalPlan::Unnest(Unnest { input, columns, schema, options 
}) => {
-                    let input = self.create_initial_plan(input, 
session_state).await?;
-                    let column_execs = columns.iter().map(|column| {
-                        schema.index_of_column(column).map(|idx| 
Column::new(&column.name, idx))
-                    }).collect::<Result<_>>()?;
-                    let schema = 
SchemaRef::new(schema.as_ref().to_owned().into());
-                    Ok(Arc::new(UnnestExec::new(input, column_execs, schema, 
options.clone())))
-                }
-                LogicalPlan::Ddl(ddl) => {
-                    // There is no default plan for DDl statements --
-                    // it must be handled at a higher level (so that
-                    // the appropriate table can be registered with
-                    // the context)
-                    let name = ddl.name();
-                    not_impl_err!(
-                        "Unsupported logical plan: {name}"
-                    )
-                }
-                LogicalPlan::Prepare(_) => {
-                    // There is no default plan for "PREPARE" -- it must be
-                    // handled at a higher level (so that the appropriate
-                    // statement can be prepared)
-                    not_impl_err!(
-                        "Unsupported logical plan: Prepare"
-                    )
+                    Arc::new(HashJoinExec::try_new(
+                        physical_left,
+                        physical_right,
+                        join_on,
+                        join_filter,
+                        join_type,
+                        None,
+                        partition_mode,
+                        null_equals_null,
+                    )?)
+                } else {
+                    Arc::new(HashJoinExec::try_new(
+                        physical_left,
+                        physical_right,
+                        join_on,
+                        join_filter,
+                        join_type,
+                        None,
+                        PartitionMode::CollectLeft,
+                        null_equals_null,
+                    )?)
+                };
+
+                // If plan was mutated previously then need to create the 
ExecutionPlan
+                // for the new Projection that was applied on top.
+                if let Some((input, expr)) = new_project {
+                    self.create_project_physical_exec(session_state, join, 
input, expr)?
+                } else {
+                    join
                 }
-                LogicalPlan::Dml(dml) => {
-                    // DataFusion is a read-only query engine, but also a 
library, so consumers may implement this
-                    not_impl_err!(
-                        "Unsupported logical plan: Dml({0})", dml.op
-                    )
-                }
-                LogicalPlan::Statement(statement) => {
-                    // DataFusion is a read-only query engine, but also a 
library, so consumers may implement this
-                    let name = statement.name();
-                    not_impl_err!(
-                        "Unsupported logical plan: Statement({name})"
-                    )
-                }
-                LogicalPlan::DescribeTable(DescribeTable { schema, 
output_schema}) => {
-                    let output_schema: Schema = output_schema.as_ref().into();
-                    self.plan_describe(schema.clone(), Arc::new(output_schema))
-                }
-                LogicalPlan::Explain(_) => internal_err!(
-                    "Unsupported logical plan: Explain must be root of the 
plan"
-                ),
-                LogicalPlan::Distinct(_) => {
-                    internal_err!(
-                        "Unsupported logical plan: Distinct should be replaced 
to Aggregate"
-                    )
-                }
-                LogicalPlan::Analyze(_) => internal_err!(
-                    "Unsupported logical plan: Analyze must be root of the 
plan"
-                ),
-                LogicalPlan::Extension(e) => {
-                    let physical_inputs = 
self.create_initial_plan_multi(e.node.inputs(), session_state).await?;
+            }
+            LogicalPlan::CrossJoin(_) => {
+                let [left, right] = children.two()?;
+                Arc::new(CrossJoinExec::new(left, right))
+            }
+            LogicalPlan::RecursiveQuery(RecursiveQuery {
+                name, is_distinct, ..
+            }) => {
+                let [static_term, recursive_term] = children.two()?;
+                Arc::new(RecursiveQueryExec::try_new(
+                    name.clone(),
+                    static_term,
+                    recursive_term,
+                    *is_distinct,
+                )?)
+            }
 
-                    let mut maybe_plan = None;
-                    for planner in &self.extension_planners {
-                        if maybe_plan.is_some() {
-                            break;
-                        }
+            // N Children
+            LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())),
+            LogicalPlan::Extension(Extension { node }) => {
+                let mut maybe_plan = None;
+                let children = children.vec();
+                for planner in &self.extension_planners {
+                    if maybe_plan.is_some() {
+                        break;
+                    }
 
-                        let logical_input = e.node.inputs();
-                        maybe_plan = planner.plan_extension(
+                    let logical_input = node.inputs();
+                    maybe_plan = planner
+                        .plan_extension(
                             self,
-                            e.node.as_ref(),
+                            node.as_ref(),
                             &logical_input,
-                            &physical_inputs,
+                            &children,
                             session_state,
-                        ).await?;
-                    }
+                        )
+                        .await?;
+                }
 
-                    let plan = match maybe_plan {
+                let plan = match maybe_plan {
                         Some(v) => Ok(v),
-                        _ => plan_err!("No installed planner was able to 
convert the custom node to an execution plan: {:?}", e.node)
+                        _ => plan_err!("No installed planner was able to 
convert the custom node to an execution plan: {:?}", node)
                     }?;
 
-                    // Ensure the ExecutionPlan's schema matches the
-                    // declared logical schema to catch and warn about
-                    // logic errors when creating user defined plans.
-                    if !e.node.schema().matches_arrow_schema(&plan.schema()) {
-                        plan_err!(
+                // Ensure the ExecutionPlan's schema matches the
+                // declared logical schema to catch and warn about
+                // logic errors when creating user defined plans.
+                if !node.schema().matches_arrow_schema(&plan.schema()) {
+                    return plan_err!(
                             "Extension planner for {:?} created an 
ExecutionPlan with mismatched schema. \
                             LogicalPlan schema: {:?}, ExecutionPlan schema: 
{:?}",
-                            e.node, e.node.schema(), plan.schema()
-                        )
-                    } else {
-                        Ok(plan)
-                    }
-                }
-                LogicalPlan::RecursiveQuery(RecursiveQuery { name, 
static_term, recursive_term, is_distinct,.. }) => {
-                    let static_term = self.create_initial_plan(static_term, 
session_state).await?;
-                    let recursive_term = 
self.create_initial_plan(recursive_term, session_state).await?;
-                    Ok(Arc::new(RecursiveQueryExec::try_new(name.clone(), 
static_term, recursive_term, *is_distinct)?))
+                            node, node.schema(), plan.schema()
+                        );
+                } else {
+                    plan
                 }
-            };
-            exec_plan
-        }.boxed()
+            }
+
+            // Other
+            LogicalPlan::Statement(statement) => {
+                // DataFusion is a read-only query engine, but also a library, 
so consumers may implement this
+                let name = statement.name();
+                return not_impl_err!("Unsupported logical plan: 
Statement({name})");
+            }
+            LogicalPlan::Prepare(_) => {
+                // There is no default plan for "PREPARE" -- it must be
+                // handled at a higher level (so that the appropriate
+                // statement can be prepared)
+                return not_impl_err!("Unsupported logical plan: Prepare");
+            }
+            LogicalPlan::Dml(dml) => {
+                // DataFusion is a read-only query engine, but also a library, 
so consumers may implement this
+                return not_impl_err!("Unsupported logical plan: Dml({0})", 
dml.op);
+            }
+            LogicalPlan::Ddl(ddl) => {
+                // There is no default plan for DDl statements --
+                // it must be handled at a higher level (so that
+                // the appropriate table can be registered with
+                // the context)
+                let name = ddl.name();
+                return not_impl_err!("Unsupported logical plan: {name}");
+            }
+            LogicalPlan::Explain(_) => {
+                return internal_err!(
+                    "Unsupported logical plan: Explain must be root of the 
plan"
+                )
+            }
+            LogicalPlan::Distinct(_) => {
+                return internal_err!(
+                    "Unsupported logical plan: Distinct should be replaced to 
Aggregate"
+                )
+            }
+            LogicalPlan::Analyze(_) => {
+                return internal_err!(
+                    "Unsupported logical plan: Analyze must be root of the 
plan"
+                )
+            }
+        };
+        Ok(exec_node)
     }
 
     fn create_grouping_physical_expr(
@@ -1942,6 +2222,59 @@ impl DefaultPhysicalPlanner {
         let mem_exec = MemoryExec::try_new(&partitions, schema, projection)?;
         Ok(Arc::new(mem_exec))
     }
+
+    fn create_project_physical_exec(
+        &self,
+        session_state: &SessionState,
+        input_exec: Arc<dyn ExecutionPlan>,
+        input: &Arc<LogicalPlan>,
+        expr: &[Expr],
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let input_schema = input.as_ref().schema();
+
+        let physical_exprs = expr
+            .iter()
+            .map(|e| {
+                // For projections, SQL planner and logical plan builder may 
convert user
+                // provided expressions into logical Column expressions if 
their results
+                // are already provided from the input plans. Because we work 
with
+                // qualified columns in logical plane, derived columns involve 
operators or
+                // functions will contain qualifiers as well. This will result 
in logical
+                // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
+                //
+                // If we run these logical columns through physical_name 
function, we will
+                // get physical names with column qualifiers, which violates 
DataFusion's
+                // field name semantics. To account for this, we need to 
derive the
+                // physical name from physical input instead.
+                //
+                // This depends on the invariant that logical schema field 
index MUST match
+                // with physical schema field index.
+                let physical_name = if let Expr::Column(col) = e {
+                    match input_schema.index_of_column(col) {
+                        Ok(idx) => {
+                            // index physical field using logical field index
+                            
Ok(input_exec.schema().field(idx).name().to_string())
+                        }
+                        // logical column is not a derived column, safe to 
pass along to
+                        // physical_name
+                        Err(_) => physical_name(e),
+                    }
+                } else {
+                    physical_name(e)
+                };
+
+                tuple_err((
+                    self.create_physical_expr(e, input_schema, session_state),
+                    physical_name,
+                ))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Arc::new(ProjectionExec::try_new(
+            physical_exprs,
+            input_exec,
+        )?))
+    }
 }
 
 fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
@@ -1953,54 +2286,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> 
Result<(T, R)> {
     }
 }
 
-/// Adding physical projection to join if has expression equijoin keys.
-fn project_expr_join_keys(
-    keys: &[(Expr, Expr)],
-    left: &Arc<LogicalPlan>,
-    right: &Arc<LogicalPlan>,
-    logical_plan: &LogicalPlan,
-    join_schema: &Arc<DFSchema>,
-) -> Result<LogicalPlan> {
-    let left_keys = keys.iter().map(|(l, _r)| l).cloned().collect::<Vec<_>>();
-    let right_keys = keys.iter().map(|(_l, r)| r).cloned().collect::<Vec<_>>();
-    let (left, right, column_on, added_project) = {
-        let (left, left_col_keys, left_projected) =
-            wrap_projection_for_join_if_necessary(
-                left_keys.as_slice(),
-                left.as_ref().clone(),
-            )?;
-        let (right, right_col_keys, right_projected) =
-            wrap_projection_for_join_if_necessary(&right_keys, 
right.as_ref().clone())?;
-        (
-            left,
-            right,
-            (left_col_keys, right_col_keys),
-            left_projected || right_projected,
-        )
-    };
-
-    let join_plan = LogicalPlan::Join(Join::try_new_with_project_input(
-        logical_plan,
-        Arc::new(left),
-        Arc::new(right),
-        column_on,
-    )?);
-
-    // Remove temporary projected columns
-    if added_project {
-        let final_join_result = join_schema
-            .iter()
-            .map(|(qualifier, field)| {
-                Expr::Column(datafusion_common::Column::from((qualifier, 
field.as_ref())))
-            })
-            .collect::<Vec<_>>();
-        let projection = Projection::try_new(final_join_result, 
Arc::new(join_plan))?;
-        Ok(LogicalPlan::Projection(projection))
-    } else {
-        Ok(join_plan)
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use std::any::Any;
diff --git a/datafusion/core/tests/tpcds_planning.rs 
b/datafusion/core/tests/tpcds_planning.rs
index a4a85c6bd1..237771248f 100644
--- a/datafusion/core/tests/tpcds_planning.rs
+++ b/datafusion/core/tests/tpcds_planning.rs
@@ -846,7 +846,6 @@ async fn tpcds_physical_q63() -> Result<()> {
     create_physical_plan(63).await
 }
 
-#[ignore] // thread 'q64' has overflowed its stack
 #[tokio::test]
 async fn tpcds_physical_q64() -> Result<()> {
     create_physical_plan(64).await


Reply via email to