alamb commented on a change in pull request #1013:
URL: https://github.com/apache/arrow-datafusion/pull/1013#discussion_r710966115



##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -296,520 +301,536 @@ impl DefaultPhysicalPlanner {
     }
 
     /// Create a physical plan from a logical plan
-    fn create_initial_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        ctx_state: &ExecutionContextState,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let batch_size = ctx_state.config.batch_size;
-
-        match logical_plan {
-            LogicalPlan::TableScan {
-                source,
-                projection,
-                filters,
-                limit,
-                ..
-            } => {
-                // Remove all qualifiers from the scan as the provider
-                // doesn't know (nor should care) how the relation was
-                // referred to in the query
-                let filters = unnormalize_cols(filters.iter().cloned());
-                source.scan(projection, batch_size, &filters, *limit)
-            }
-            LogicalPlan::Window {
-                input, window_expr, ..
-            } => {
-                if window_expr.is_empty() {
-                    return Err(DataFusionError::Internal(
-                        "Impossibly got empty window expression".to_owned(),
-                    ));
+    fn create_initial_plan<'a>(
+        &'a self,
+        logical_plan: &'a LogicalPlan,
+        ctx_state: &'a ExecutionContextState,
+    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
+        async move {
+            let batch_size = ctx_state.config.batch_size;
+
+            // TODO make this configurable
+            let parallelism = 4;

Review comment:
       looks like left over todo -- I recommend removing this change to how 
parallelism works from this PR and make it separately (I think it is related to 
the changes to UNion below)

##########
File path: datafusion/src/execution/context.rs
##########
@@ -532,17 +533,27 @@ impl ExecutionContext {
     }
 
     /// Creates a physical plan from a logical plan.
-    pub fn create_physical_plan(
+    pub async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let mut state = self.state.lock().unwrap();
-        state.execution_props.start_execution();
+        let (state, planner) = {
+            let mut state = self.state.lock().unwrap();
+            state.execution_props.start_execution();
+
+            // We need to clone `state` to release the lock that is not 
`Send`. We could
+            // make the lock `Send` by using `tokio::sync::Mutex`, but that 
would require to
+            // propagate async even to the `LogicalPlan` building methods.
+            // Cloning `state` here is fine as we then pass it as immutable 
`&state`, which
+            // means that we avoid write consistency issues as the cloned 
version will not
+            // be written to. As for eventual modifications that would be 
applied to the
+            // original state after it has been cloned, they will not be 
picked up by the
+            // clone but that is okay, as it is equivalent to postponing the 
state update
+            // by keeping the lock until the end of the function scope.
+            (state.clone(), Arc::clone(&state.config.query_planner))
+        };
 
-        state
-            .config
-            .query_planner
-            .create_physical_plan(logical_plan, &state)
+        planner.create_physical_plan(logical_plan, &state).await

Review comment:
       I think the only potential problem is if the same logical plan gets 
translated into two different physical plans with the same context 
simultaneously which doesn't seem like a valid usecase to me

##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -296,520 +301,536 @@ impl DefaultPhysicalPlanner {
     }
 
     /// Create a physical plan from a logical plan
-    fn create_initial_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        ctx_state: &ExecutionContextState,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let batch_size = ctx_state.config.batch_size;
-
-        match logical_plan {
-            LogicalPlan::TableScan {
-                source,
-                projection,
-                filters,
-                limit,
-                ..
-            } => {
-                // Remove all qualifiers from the scan as the provider
-                // doesn't know (nor should care) how the relation was
-                // referred to in the query
-                let filters = unnormalize_cols(filters.iter().cloned());
-                source.scan(projection, batch_size, &filters, *limit)
-            }
-            LogicalPlan::Window {
-                input, window_expr, ..
-            } => {
-                if window_expr.is_empty() {
-                    return Err(DataFusionError::Internal(
-                        "Impossibly got empty window expression".to_owned(),
-                    ));
+    fn create_initial_plan<'a>(
+        &'a self,
+        logical_plan: &'a LogicalPlan,
+        ctx_state: &'a ExecutionContextState,
+    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {

Review comment:
       why can't this function signature be made `async fn 
create_initial_plan....` ? Why does it need a boxed future?
   
   FWIW I found it easier to understand the diffs without whitespace: 
https://github.com/apache/arrow-datafusion/pull/1013/files?w=1

##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -296,520 +301,536 @@ impl DefaultPhysicalPlanner {
     }
 
     /// Create a physical plan from a logical plan
-    fn create_initial_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        ctx_state: &ExecutionContextState,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let batch_size = ctx_state.config.batch_size;
-
-        match logical_plan {
-            LogicalPlan::TableScan {
-                source,
-                projection,
-                filters,
-                limit,
-                ..
-            } => {
-                // Remove all qualifiers from the scan as the provider
-                // doesn't know (nor should care) how the relation was
-                // referred to in the query
-                let filters = unnormalize_cols(filters.iter().cloned());
-                source.scan(projection, batch_size, &filters, *limit)
-            }
-            LogicalPlan::Window {
-                input, window_expr, ..
-            } => {
-                if window_expr.is_empty() {
-                    return Err(DataFusionError::Internal(
-                        "Impossibly got empty window expression".to_owned(),
-                    ));
+    fn create_initial_plan<'a>(
+        &'a self,
+        logical_plan: &'a LogicalPlan,
+        ctx_state: &'a ExecutionContextState,
+    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
+        async move {
+            let batch_size = ctx_state.config.batch_size;
+
+            // TODO make this configurable
+            let parallelism = 4;
+
+            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
+                LogicalPlan::TableScan {
+                    source,
+                    projection,
+                    filters,
+                    limit,
+                    ..
+                } => {
+                    // Remove all qualifiers from the scan as the provider
+                    // doesn't know (nor should care) how the relation was
+                    // referred to in the query
+                    let filters = unnormalize_cols(filters.iter().cloned());
+                    source.scan(projection, batch_size, &filters, *limit).await
                 }
+                LogicalPlan::Window {
+                    input, window_expr, ..
+                } => {
+                    if window_expr.is_empty() {
+                        return Err(DataFusionError::Internal(
+                            "Impossibly got empty window 
expression".to_owned(),
+                        ));
+                    }
+
+                    let input_exec = self.create_initial_plan(input, 
ctx_state).await?;
 
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
+                    // at this moment we are guaranteed by the logical planner
+                    // to have all the window_expr to have equal sort key
+                    let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
 
-                // at this moment we are guaranteed by the logical planner
-                // to have all the window_expr to have equal sort key
-                let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
+                    let can_repartition = !partition_keys.is_empty()
+                        && ctx_state.config.target_partitions > 1
+                        && ctx_state.config.repartition_windows;
+
+                    let input_exec = if can_repartition {
+                        let partition_keys = partition_keys
+                            .iter()
+                            .map(|e| {
+                                self.create_physical_expr(
+                                    e,
+                                    input.schema(),
+                                    &input_exec.schema(),
+                                    ctx_state,
+                                )
+                            })
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+                        Arc::new(RepartitionExec::try_new(
+                            input_exec,
+                            Partitioning::Hash(
+                                partition_keys,
+                                ctx_state.config.target_partitions,
+                            ),
+                        )?)
+                    } else {
+                        input_exec
+                    };
+
+                    // add a sort phase
+                    let get_sort_keys = |expr: &Expr| match expr {
+                        Expr::WindowFunction {
+                            ref partition_by,
+                            ref order_by,
+                            ..
+                        } => generate_sort_key(partition_by, order_by),
+                        _ => unreachable!(),
+                    };
+                    let sort_keys = get_sort_keys(&window_expr[0]);
+                    if window_expr.len() > 1 {
+                        debug_assert!(
+                            window_expr[1..]
+                                .iter()
+                                .all(|expr| get_sort_keys(expr) == sort_keys),
+                            "all window expressions shall have the same sort 
keys, as guaranteed by logical planning"
+                        );
+                    }
 
-                let can_repartition = !partition_keys.is_empty()
-                    && ctx_state.config.target_partitions > 1
-                    && ctx_state.config.repartition_windows;
+                    let logical_input_schema = input.schema();
 
-                let input_exec = if can_repartition {
-                    let partition_keys = partition_keys
+                    let input_exec = if sort_keys.is_empty() {
+                        input_exec
+                    } else {
+                        let physical_input_schema = input_exec.schema();
+                        let sort_keys = sort_keys
+                            .iter()
+                            .map(|e| match e {
+                                Expr::Sort {
+                                    expr,
+                                    asc,
+                                    nulls_first,
+                                } => self.create_physical_sort_expr(
+                                    expr,
+                                    logical_input_schema,
+                                    &physical_input_schema,
+                                    SortOptions {
+                                        descending: !*asc,
+                                        nulls_first: *nulls_first,
+                                    },
+                                    ctx_state,
+                                ),
+                                _ => unreachable!(),
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+                        Arc::new(if can_repartition {
+                            SortExec::new_with_partitioning(sort_keys, 
input_exec, true)
+                        } else {
+                            SortExec::try_new(sort_keys, input_exec)?
+                        })
+                    };
+
+                    let physical_input_schema = input_exec.schema();
+                    let window_expr = window_expr
                         .iter()
                         .map(|e| {
-                            self.create_physical_expr(
+                            self.create_window_expr(
                                 e,
-                                input.schema(),
-                                &input_exec.schema(),
+                                logical_input_schema,
+                                &physical_input_schema,
                                 ctx_state,
                             )
                         })
-                        .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
-                    Arc::new(RepartitionExec::try_new(
-                        input_exec,
-                        Partitioning::Hash(
-                            partition_keys,
-                            ctx_state.config.target_partitions,
-                        ),
-                    )?)
-                } else {
-                    input_exec
-                };
+                        .collect::<Result<Vec<_>>>()?;
 
-                // add a sort phase
-                let get_sort_keys = |expr: &Expr| match expr {
-                    Expr::WindowFunction {
-                        ref partition_by,
-                        ref order_by,
-                        ..
-                    } => generate_sort_key(partition_by, order_by),
-                    _ => unreachable!(),
-                };
-                let sort_keys = get_sort_keys(&window_expr[0]);
-                if window_expr.len() > 1 {
-                    debug_assert!(
-                        window_expr[1..]
-                            .iter()
-                            .all(|expr| get_sort_keys(expr) == sort_keys),
-                        "all window expressions shall have the same sort keys, 
as guaranteed by logical planning"
-                    );
+                    Ok(Arc::new(WindowAggExec::try_new(
+                        window_expr,
+                        input_exec,
+                        physical_input_schema,
+                    )?) )
                 }
-
-                let logical_input_schema = input.schema();
-
-                let input_exec = if sort_keys.is_empty() {
-                    input_exec
-                } else {
+                LogicalPlan::Aggregate {
+                    input,
+                    group_expr,
+                    aggr_expr,
+                    ..
+                } => {
+                    // Initially need to perform the aggregate and then merge 
the partitions
+                    let input_exec = self.create_initial_plan(input, 
ctx_state).await?;
                     let physical_input_schema = input_exec.schema();
-                    let sort_keys = sort_keys
+                    let logical_input_schema = input.as_ref().schema();
+
+                    let groups = group_expr
                         .iter()
-                        .map(|e| match e {
-                            Expr::Sort {
-                                expr,
-                                asc,
-                                nulls_first,
-                            } => self.create_physical_sort_expr(
-                                expr,
+                        .map(|e| {
+                            tuple_err((
+                                self.create_physical_expr(
+                                    e,
+                                    logical_input_schema,
+                                    &physical_input_schema,
+                                    ctx_state,
+                                ),
+                                physical_name(e),
+                            ))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    let aggregates = aggr_expr
+                        .iter()
+                        .map(|e| {
+                            self.create_aggregate_expr(
+                                e,
                                 logical_input_schema,
                                 &physical_input_schema,
-                                SortOptions {
-                                    descending: !*asc,
-                                    nulls_first: *nulls_first,
-                                },
                                 ctx_state,
-                            ),
-                            _ => unreachable!(),
+                            )
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    Arc::new(if can_repartition {
-                        SortExec::new_with_partitioning(sort_keys, input_exec, 
true)
-                    } else {
-                        SortExec::try_new(sort_keys, input_exec)?
-                    })
-                };
 
-                let physical_input_schema = input_exec.schema();
-                let window_expr = window_expr
-                    .iter()
-                    .map(|e| {
-                        self.create_window_expr(
-                            e,
-                            logical_input_schema,
-                            &physical_input_schema,
-                            ctx_state,
-                        )
-                    })
-                    .collect::<Result<Vec<_>>>()?;
+                    let initial_aggr = Arc::new(HashAggregateExec::try_new(
+                        AggregateMode::Partial,
+                        groups.clone(),
+                        aggregates.clone(),
+                        input_exec,
+                        physical_input_schema.clone(),
+                    )?);
 
-                Ok(Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    input_exec,
-                    physical_input_schema,
-                )?))
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => {
-                // Initially need to perform the aggregate and then merge the 
partitions
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
-                let physical_input_schema = input_exec.schema();
-                let logical_input_schema = input.as_ref().schema();
+                    // update group column indices based on partial aggregate 
plan evaluation
+                    let final_group: Vec<Arc<dyn PhysicalExpr>> = 
(0..groups.len())
+                        .map(|i| col(&groups[i].1, &initial_aggr.schema()))
+                        .collect::<Result<_>>()?;
 
-                let groups = group_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                logical_input_schema,
-                                &physical_input_schema,
-                                ctx_state,
+                    // TODO: dictionary type not yet supported in Hash 
Repartition
+                    let contains_dict = groups
+                        .iter()
+                        .flat_map(|x| 
x.0.data_type(physical_input_schema.as_ref()))
+                        .any(|x| matches!(x, DataType::Dictionary(_, _)));
+
+                    let can_repartition = !groups.is_empty()
+                        && ctx_state.config.target_partitions > 1
+                        && ctx_state.config.repartition_aggregations
+                        && !contains_dict;
+
+                    let (initial_aggr, next_partition_mode): (
+                        Arc<dyn ExecutionPlan>,
+                        AggregateMode,
+                    ) = if can_repartition {
+                        // Divide partial hash aggregates into multiple 
partitions by hash key
+                        let hash_repartition = 
Arc::new(RepartitionExec::try_new(
+                            initial_aggr,
+                            Partitioning::Hash(
+                                final_group.clone(),
+                                ctx_state.config.target_partitions,
                             ),
-                            physical_name(e),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                let aggregates = aggr_expr
-                    .iter()
-                    .map(|e| {
-                        self.create_aggregate_expr(
-                            e,
-                            logical_input_schema,
-                            &physical_input_schema,
-                            ctx_state,
-                        )
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let initial_aggr = Arc::new(HashAggregateExec::try_new(
-                    AggregateMode::Partial,
-                    groups.clone(),
-                    aggregates.clone(),
-                    input_exec,
-                    physical_input_schema.clone(),
-                )?);
-
-                // update group column indices based on partial aggregate plan 
evaluation
-                let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
-                    .map(|i| col(&groups[i].1, &initial_aggr.schema()))
-                    .collect::<Result<_>>()?;
-
-                // TODO: dictionary type not yet supported in Hash Repartition
-                let contains_dict = groups
-                    .iter()
-                    .flat_map(|x| 
x.0.data_type(physical_input_schema.as_ref()))
-                    .any(|x| matches!(x, DataType::Dictionary(_, _)));
-
-                let can_repartition = !groups.is_empty()
-                    && ctx_state.config.target_partitions > 1
-                    && ctx_state.config.repartition_aggregations
-                    && !contains_dict;
-
-                let (initial_aggr, next_partition_mode): (
-                    Arc<dyn ExecutionPlan>,
-                    AggregateMode,
-                ) = if can_repartition {
-                    // Divide partial hash aggregates into multiple partitions 
by hash key
-                    let hash_repartition = Arc::new(RepartitionExec::try_new(
+                        )?);
+                        // Combine hash aggregates within the partition
+                        (hash_repartition, AggregateMode::FinalPartitioned)
+                    } else {
+                        // construct a second aggregation, keeping the final 
column name equal to the
+                        // first aggregation and the expressions corresponding 
to the respective aggregate
+                        (initial_aggr, AggregateMode::Final)
+                    };
+
+                    Ok(Arc::new(HashAggregateExec::try_new(
+                        next_partition_mode,
+                        final_group
+                            .iter()
+                            .enumerate()
+                            .map(|(i, expr)| (expr.clone(), 
groups[i].1.clone()))
+                            .collect(),
+                        aggregates,
                         initial_aggr,
-                        Partitioning::Hash(
-                            final_group.clone(),
-                            ctx_state.config.target_partitions,
-                        ),
-                    )?);
-                    // Combine hash aggregates within the partition
-                    (hash_repartition, AggregateMode::FinalPartitioned)
-                } else {
-                    // construct a second aggregation, keeping the final 
column name equal to the
-                    // first aggregation and the expressions corresponding to 
the respective aggregate
-                    (initial_aggr, AggregateMode::Final)
-                };
+                        physical_input_schema.clone(),
+                    )?) )
+                }
+                LogicalPlan::Projection { input, expr, .. } => {
+                    let input_exec = self.create_initial_plan(input, 
ctx_state).await?;
+                    let input_schema = input.as_ref().schema();
 
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    next_partition_mode,
-                    final_group
+                    let physical_exprs = expr
                         .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    aggregates,
-                    initial_aggr,
-                    physical_input_schema.clone(),
-                )?))
-            }
-            LogicalPlan::Projection { input, expr, .. } => {
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
-                let input_schema = input.as_ref().schema();
-
-                let physical_exprs = expr
-                    .iter()
-                    .map(|e| {
-                        // For projections, SQL planner and logical plan 
builder may convert user
-                        // provided expressions into logical Column 
expressions if their results
-                        // are already provided from the input plans. Because 
we work with
-                        // qualified columns in logical plane, derived columns 
involve operators or
-                        // functions will contain qualifers as well. This will 
result in logical
-                        // columns with names like `SUM(t1.c1)`, `t1.c1 + 
t1.c2`, etc.
-                        //
-                        // If we run these logical columns through 
physical_name function, we will
-                        // get physical names with column qualifiers, which 
violates Datafusion's
-                        // field name semantics. To account for this, we need 
to derive the
-                        // physical name from physical input instead.
-                        //
-                        // This depends on the invariant that logical schema 
field index MUST match
-                        // with physical schema field index.
-                        let physical_name = if let Expr::Column(col) = e {
-                            match input_schema.index_of_column(col) {
-                                Ok(idx) => {
-                                    // index physical field using logical 
field index
-                                    
Ok(input_exec.schema().field(idx).name().to_string())
+                        .map(|e| {
+                            // For projections, SQL planner and logical plan 
builder may convert user
+                            // provided expressions into logical Column 
expressions if their results
+                            // are already provided from the input plans. 
Because we work with
+                            // qualified columns in logical plane, derived 
columns involve operators or
+                            // functions will contain qualifers as well. This 
will result in logical
+                            // columns with names like `SUM(t1.c1)`, `t1.c1 + 
t1.c2`, etc.
+                            //
+                            // If we run these logical columns through 
physical_name function, we will
+                            // get physical names with column qualifiers, 
which violates Datafusion's
+                            // field name semantics. To account for this, we 
need to derive the
+                            // physical name from physical input instead.
+                            //
+                            // This depends on the invariant that logical 
schema field index MUST match
+                            // with physical schema field index.
+                            let physical_name = if let Expr::Column(col) = e {
+                                match input_schema.index_of_column(col) {
+                                    Ok(idx) => {
+                                        // index physical field using logical 
field index
+                                        
Ok(input_exec.schema().field(idx).name().to_string())
+                                    }
+                                    // logical column is not a derived column, 
safe to pass along to
+                                    // physical_name
+                                    Err(_) => physical_name(e),
                                 }
-                                // logical column is not a derived column, 
safe to pass along to
-                                // physical_name
-                                Err(_) => physical_name(e),
-                            }
-                        } else {
-                            physical_name(e)
-                        };
+                            } else {
+                                physical_name(e)
+                            };
 
-                        tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                input_schema,
-                                &input_exec.schema(),
-                                ctx_state,
-                            ),
-                            physical_name,
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(ProjectionExec::try_new(
-                    physical_exprs,
-                    input_exec,
-                )?))
-            }
-            LogicalPlan::Filter {
-                input, predicate, ..
-            } => {
-                let physical_input = self.create_initial_plan(input, 
ctx_state)?;
-                let input_schema = physical_input.as_ref().schema();
-                let input_dfschema = input.as_ref().schema();
-                let runtime_expr = self.create_physical_expr(
-                    predicate,
-                    input_dfschema,
-                    &input_schema,
-                    ctx_state,
-                )?;
-                Ok(Arc::new(FilterExec::try_new(runtime_expr, 
physical_input)?))
-            }
-            LogicalPlan::Union { inputs, .. } => {
-                let physical_plans = inputs
-                    .iter()
-                    .map(|input| self.create_initial_plan(input, ctx_state))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(Arc::new(UnionExec::new(physical_plans)))
-            }
-            LogicalPlan::Repartition {
-                input,
-                partitioning_scheme,
-            } => {
-                let physical_input = self.create_initial_plan(input, 
ctx_state)?;
-                let input_schema = physical_input.schema();
-                let input_dfschema = input.as_ref().schema();
-                let physical_partitioning = match partitioning_scheme {
-                    LogicalPartitioning::RoundRobinBatch(n) => {
-                        Partitioning::RoundRobinBatch(*n)
-                    }
-                    LogicalPartitioning::Hash(expr, n) => {
-                        let runtime_expr = expr
-                            .iter()
-                            .map(|e| {
+                            tuple_err((
                                 self.create_physical_expr(
                                     e,
-                                    input_dfschema,
-                                    &input_schema,
+                                    input_schema,
+                                    &input_exec.schema(),
                                     ctx_state,
-                                )
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        Partitioning::Hash(runtime_expr, *n)
-                    }
-                };
-                Ok(Arc::new(RepartitionExec::try_new(
-                    physical_input,
-                    physical_partitioning,
-                )?))
-            }
-            LogicalPlan::Sort { expr, input, .. } => {
-                let physical_input = self.create_initial_plan(input, 
ctx_state)?;
-                let input_schema = physical_input.as_ref().schema();
-                let input_dfschema = input.as_ref().schema();
+                                ),
+                                physical_name,
+                            ))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
 
-                let sort_expr = expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort {
-                            expr,
-                            asc,
-                            nulls_first,
-                        } => self.create_physical_sort_expr(
-                            expr,
-                            input_dfschema,
-                            &input_schema,
-                            SortOptions {
-                                descending: !*asc,
-                                nulls_first: *nulls_first,
-                            },
-                            ctx_state,
-                        ),
-                        _ => Err(DataFusionError::Plan(
-                            "Sort only accepts sort expressions".to_string(),
-                        )),
-                    })
-                    .collect::<Result<Vec<_>>>()?;
+                    Ok(Arc::new(ProjectionExec::try_new(
+                        physical_exprs,
+                        input_exec,
+                    )?) )
+                }
+                LogicalPlan::Filter {
+                    input, predicate, ..
+                } => {
+                    let physical_input = self.create_initial_plan(input, 
ctx_state).await?;
+                    let input_schema = physical_input.as_ref().schema();
+                    let input_dfschema = input.as_ref().schema();
+                    let runtime_expr = self.create_physical_expr(
+                        predicate,
+                        input_dfschema,
+                        &input_schema,
+                        ctx_state,
+                    )?;
+                    Ok(Arc::new(FilterExec::try_new(runtime_expr, 
physical_input)?) )
+                }
+                LogicalPlan::Union { inputs, .. } => {
+                    let physical_plan_futures = inputs

Review comment:
       I recommend breaking this change to union planning into a separate PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to