rdettai commented on a change in pull request #1013:
URL: https://github.com/apache/arrow-datafusion/pull/1013#discussion_r711015638



##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -296,520 +301,536 @@ impl DefaultPhysicalPlanner {
     }
 
     /// Create a physical plan from a logical plan
-    fn create_initial_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        ctx_state: &ExecutionContextState,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let batch_size = ctx_state.config.batch_size;
-
-        match logical_plan {
-            LogicalPlan::TableScan {
-                source,
-                projection,
-                filters,
-                limit,
-                ..
-            } => {
-                // Remove all qualifiers from the scan as the provider
-                // doesn't know (nor should care) how the relation was
-                // referred to in the query
-                let filters = unnormalize_cols(filters.iter().cloned());
-                source.scan(projection, batch_size, &filters, *limit)
-            }
-            LogicalPlan::Window {
-                input, window_expr, ..
-            } => {
-                if window_expr.is_empty() {
-                    return Err(DataFusionError::Internal(
-                        "Impossibly got empty window expression".to_owned(),
-                    ));
+    fn create_initial_plan<'a>(
+        &'a self,
+        logical_plan: &'a LogicalPlan,
+        ctx_state: &'a ExecutionContextState,
+    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
+        async move {
+            let batch_size = ctx_state.config.batch_size;
+
+            // TODO make this configurable
+            let parallelism = 4;
+
+            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan 
{
+                LogicalPlan::TableScan {
+                    source,
+                    projection,
+                    filters,
+                    limit,
+                    ..
+                } => {
+                    // Remove all qualifiers from the scan as the provider
+                    // doesn't know (nor should care) how the relation was
+                    // referred to in the query
+                    let filters = unnormalize_cols(filters.iter().cloned());
+                    source.scan(projection, batch_size, &filters, *limit).await
                 }
+                LogicalPlan::Window {
+                    input, window_expr, ..
+                } => {
+                    if window_expr.is_empty() {
+                        return Err(DataFusionError::Internal(
+                            "Impossibly got empty window 
expression".to_owned(),
+                        ));
+                    }
+
+                    let input_exec = self.create_initial_plan(input, 
ctx_state).await?;
 
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
+                    // at this moment we are guaranteed by the logical planner
+                    // to have all the window_expr to have equal sort key
+                    let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
 
-                // at this moment we are guaranteed by the logical planner
-                // to have all the window_expr to have equal sort key
-                let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
+                    let can_repartition = !partition_keys.is_empty()
+                        && ctx_state.config.target_partitions > 1
+                        && ctx_state.config.repartition_windows;
+
+                    let input_exec = if can_repartition {
+                        let partition_keys = partition_keys
+                            .iter()
+                            .map(|e| {
+                                self.create_physical_expr(
+                                    e,
+                                    input.schema(),
+                                    &input_exec.schema(),
+                                    ctx_state,
+                                )
+                            })
+                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+                        Arc::new(RepartitionExec::try_new(
+                            input_exec,
+                            Partitioning::Hash(
+                                partition_keys,
+                                ctx_state.config.target_partitions,
+                            ),
+                        )?)
+                    } else {
+                        input_exec
+                    };
+
+                    // add a sort phase
+                    let get_sort_keys = |expr: &Expr| match expr {
+                        Expr::WindowFunction {
+                            ref partition_by,
+                            ref order_by,
+                            ..
+                        } => generate_sort_key(partition_by, order_by),
+                        _ => unreachable!(),
+                    };
+                    let sort_keys = get_sort_keys(&window_expr[0]);
+                    if window_expr.len() > 1 {
+                        debug_assert!(
+                            window_expr[1..]
+                                .iter()
+                                .all(|expr| get_sort_keys(expr) == sort_keys),
+                            "all window expressions shall have the same sort 
keys, as guaranteed by logical planning"
+                        );
+                    }
 
-                let can_repartition = !partition_keys.is_empty()
-                    && ctx_state.config.target_partitions > 1
-                    && ctx_state.config.repartition_windows;
+                    let logical_input_schema = input.schema();
 
-                let input_exec = if can_repartition {
-                    let partition_keys = partition_keys
+                    let input_exec = if sort_keys.is_empty() {
+                        input_exec
+                    } else {
+                        let physical_input_schema = input_exec.schema();
+                        let sort_keys = sort_keys
+                            .iter()
+                            .map(|e| match e {
+                                Expr::Sort {
+                                    expr,
+                                    asc,
+                                    nulls_first,
+                                } => self.create_physical_sort_expr(
+                                    expr,
+                                    logical_input_schema,
+                                    &physical_input_schema,
+                                    SortOptions {
+                                        descending: !*asc,
+                                        nulls_first: *nulls_first,
+                                    },
+                                    ctx_state,
+                                ),
+                                _ => unreachable!(),
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+                        Arc::new(if can_repartition {
+                            SortExec::new_with_partitioning(sort_keys, 
input_exec, true)
+                        } else {
+                            SortExec::try_new(sort_keys, input_exec)?
+                        })
+                    };
+
+                    let physical_input_schema = input_exec.schema();
+                    let window_expr = window_expr
                         .iter()
                         .map(|e| {
-                            self.create_physical_expr(
+                            self.create_window_expr(
                                 e,
-                                input.schema(),
-                                &input_exec.schema(),
+                                logical_input_schema,
+                                &physical_input_schema,
                                 ctx_state,
                             )
                         })
-                        .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
-                    Arc::new(RepartitionExec::try_new(
-                        input_exec,
-                        Partitioning::Hash(
-                            partition_keys,
-                            ctx_state.config.target_partitions,
-                        ),
-                    )?)
-                } else {
-                    input_exec
-                };
+                        .collect::<Result<Vec<_>>>()?;
 
-                // add a sort phase
-                let get_sort_keys = |expr: &Expr| match expr {
-                    Expr::WindowFunction {
-                        ref partition_by,
-                        ref order_by,
-                        ..
-                    } => generate_sort_key(partition_by, order_by),
-                    _ => unreachable!(),
-                };
-                let sort_keys = get_sort_keys(&window_expr[0]);
-                if window_expr.len() > 1 {
-                    debug_assert!(
-                        window_expr[1..]
-                            .iter()
-                            .all(|expr| get_sort_keys(expr) == sort_keys),
-                        "all window expressions shall have the same sort keys, 
as guaranteed by logical planning"
-                    );
+                    Ok(Arc::new(WindowAggExec::try_new(
+                        window_expr,
+                        input_exec,
+                        physical_input_schema,
+                    )?) )
                 }
-
-                let logical_input_schema = input.schema();
-
-                let input_exec = if sort_keys.is_empty() {
-                    input_exec
-                } else {
+                LogicalPlan::Aggregate {
+                    input,
+                    group_expr,
+                    aggr_expr,
+                    ..
+                } => {
+                    // Initially need to perform the aggregate and then merge 
the partitions
+                    let input_exec = self.create_initial_plan(input, 
ctx_state).await?;
                     let physical_input_schema = input_exec.schema();
-                    let sort_keys = sort_keys
+                    let logical_input_schema = input.as_ref().schema();
+
+                    let groups = group_expr
                         .iter()
-                        .map(|e| match e {
-                            Expr::Sort {
-                                expr,
-                                asc,
-                                nulls_first,
-                            } => self.create_physical_sort_expr(
-                                expr,
+                        .map(|e| {
+                            tuple_err((
+                                self.create_physical_expr(
+                                    e,
+                                    logical_input_schema,
+                                    &physical_input_schema,
+                                    ctx_state,
+                                ),
+                                physical_name(e),
+                            ))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    let aggregates = aggr_expr
+                        .iter()
+                        .map(|e| {
+                            self.create_aggregate_expr(
+                                e,
                                 logical_input_schema,
                                 &physical_input_schema,
-                                SortOptions {
-                                    descending: !*asc,
-                                    nulls_first: *nulls_first,
-                                },
                                 ctx_state,
-                            ),
-                            _ => unreachable!(),
+                            )
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    Arc::new(if can_repartition {
-                        SortExec::new_with_partitioning(sort_keys, input_exec, 
true)
-                    } else {
-                        SortExec::try_new(sort_keys, input_exec)?
-                    })
-                };
 
-                let physical_input_schema = input_exec.schema();
-                let window_expr = window_expr
-                    .iter()
-                    .map(|e| {
-                        self.create_window_expr(
-                            e,
-                            logical_input_schema,
-                            &physical_input_schema,
-                            ctx_state,
-                        )
-                    })
-                    .collect::<Result<Vec<_>>>()?;
+                    let initial_aggr = Arc::new(HashAggregateExec::try_new(
+                        AggregateMode::Partial,
+                        groups.clone(),
+                        aggregates.clone(),
+                        input_exec,
+                        physical_input_schema.clone(),
+                    )?);
 
-                Ok(Arc::new(WindowAggExec::try_new(
-                    window_expr,
-                    input_exec,
-                    physical_input_schema,
-                )?))
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => {
-                // Initially need to perform the aggregate and then merge the 
partitions
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
-                let physical_input_schema = input_exec.schema();
-                let logical_input_schema = input.as_ref().schema();
+                    // update group column indices based on partial aggregate 
plan evaluation
+                    let final_group: Vec<Arc<dyn PhysicalExpr>> = 
(0..groups.len())
+                        .map(|i| col(&groups[i].1, &initial_aggr.schema()))
+                        .collect::<Result<_>>()?;
 
-                let groups = group_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                logical_input_schema,
-                                &physical_input_schema,
-                                ctx_state,
+                    // TODO: dictionary type not yet supported in Hash 
Repartition
+                    let contains_dict = groups
+                        .iter()
+                        .flat_map(|x| 
x.0.data_type(physical_input_schema.as_ref()))
+                        .any(|x| matches!(x, DataType::Dictionary(_, _)));
+
+                    let can_repartition = !groups.is_empty()
+                        && ctx_state.config.target_partitions > 1
+                        && ctx_state.config.repartition_aggregations
+                        && !contains_dict;
+
+                    let (initial_aggr, next_partition_mode): (
+                        Arc<dyn ExecutionPlan>,
+                        AggregateMode,
+                    ) = if can_repartition {
+                        // Divide partial hash aggregates into multiple 
partitions by hash key
+                        let hash_repartition = 
Arc::new(RepartitionExec::try_new(
+                            initial_aggr,
+                            Partitioning::Hash(
+                                final_group.clone(),
+                                ctx_state.config.target_partitions,
                             ),
-                            physical_name(e),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                let aggregates = aggr_expr
-                    .iter()
-                    .map(|e| {
-                        self.create_aggregate_expr(
-                            e,
-                            logical_input_schema,
-                            &physical_input_schema,
-                            ctx_state,
-                        )
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let initial_aggr = Arc::new(HashAggregateExec::try_new(
-                    AggregateMode::Partial,
-                    groups.clone(),
-                    aggregates.clone(),
-                    input_exec,
-                    physical_input_schema.clone(),
-                )?);
-
-                // update group column indices based on partial aggregate plan 
evaluation
-                let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
-                    .map(|i| col(&groups[i].1, &initial_aggr.schema()))
-                    .collect::<Result<_>>()?;
-
-                // TODO: dictionary type not yet supported in Hash Repartition
-                let contains_dict = groups
-                    .iter()
-                    .flat_map(|x| 
x.0.data_type(physical_input_schema.as_ref()))
-                    .any(|x| matches!(x, DataType::Dictionary(_, _)));
-
-                let can_repartition = !groups.is_empty()
-                    && ctx_state.config.target_partitions > 1
-                    && ctx_state.config.repartition_aggregations
-                    && !contains_dict;
-
-                let (initial_aggr, next_partition_mode): (
-                    Arc<dyn ExecutionPlan>,
-                    AggregateMode,
-                ) = if can_repartition {
-                    // Divide partial hash aggregates into multiple partitions 
by hash key
-                    let hash_repartition = Arc::new(RepartitionExec::try_new(
+                        )?);
+                        // Combine hash aggregates within the partition
+                        (hash_repartition, AggregateMode::FinalPartitioned)
+                    } else {
+                        // construct a second aggregation, keeping the final 
column name equal to the
+                        // first aggregation and the expressions corresponding 
to the respective aggregate
+                        (initial_aggr, AggregateMode::Final)
+                    };
+
+                    Ok(Arc::new(HashAggregateExec::try_new(
+                        next_partition_mode,
+                        final_group
+                            .iter()
+                            .enumerate()
+                            .map(|(i, expr)| (expr.clone(), 
groups[i].1.clone()))
+                            .collect(),
+                        aggregates,
                         initial_aggr,
-                        Partitioning::Hash(
-                            final_group.clone(),
-                            ctx_state.config.target_partitions,
-                        ),
-                    )?);
-                    // Combine hash aggregates within the partition
-                    (hash_repartition, AggregateMode::FinalPartitioned)
-                } else {
-                    // construct a second aggregation, keeping the final 
column name equal to the
-                    // first aggregation and the expressions corresponding to 
the respective aggregate
-                    (initial_aggr, AggregateMode::Final)
-                };
+                        physical_input_schema.clone(),
+                    )?) )
+                }
+                LogicalPlan::Projection { input, expr, .. } => {
+                    let input_exec = self.create_initial_plan(input, 
ctx_state).await?;
+                    let input_schema = input.as_ref().schema();
 
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    next_partition_mode,
-                    final_group
+                    let physical_exprs = expr
                         .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    aggregates,
-                    initial_aggr,
-                    physical_input_schema.clone(),
-                )?))
-            }
-            LogicalPlan::Projection { input, expr, .. } => {
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
-                let input_schema = input.as_ref().schema();
-
-                let physical_exprs = expr
-                    .iter()
-                    .map(|e| {
-                        // For projections, SQL planner and logical plan 
builder may convert user
-                        // provided expressions into logical Column 
expressions if their results
-                        // are already provided from the input plans. Because 
we work with
-                        // qualified columns in logical plane, derived columns 
involve operators or
-                        // functions will contain qualifers as well. This will 
result in logical
-                        // columns with names like `SUM(t1.c1)`, `t1.c1 + 
t1.c2`, etc.
-                        //
-                        // If we run these logical columns through 
physical_name function, we will
-                        // get physical names with column qualifiers, which 
violates Datafusion's
-                        // field name semantics. To account for this, we need 
to derive the
-                        // physical name from physical input instead.
-                        //
-                        // This depends on the invariant that logical schema 
field index MUST match
-                        // with physical schema field index.
-                        let physical_name = if let Expr::Column(col) = e {
-                            match input_schema.index_of_column(col) {
-                                Ok(idx) => {
-                                    // index physical field using logical 
field index
-                                    
Ok(input_exec.schema().field(idx).name().to_string())
+                        .map(|e| {
+                            // For projections, SQL planner and logical plan 
builder may convert user
+                            // provided expressions into logical Column 
expressions if their results
+                            // are already provided from the input plans. 
Because we work with
+                            // qualified columns in logical plane, derived 
columns involve operators or
+                            // functions will contain qualifers as well. This 
will result in logical
+                            // columns with names like `SUM(t1.c1)`, `t1.c1 + 
t1.c2`, etc.
+                            //
+                            // If we run these logical columns through 
physical_name function, we will
+                            // get physical names with column qualifiers, 
which violates Datafusion's
+                            // field name semantics. To account for this, we 
need to derive the
+                            // physical name from physical input instead.
+                            //
+                            // This depends on the invariant that logical 
schema field index MUST match
+                            // with physical schema field index.
+                            let physical_name = if let Expr::Column(col) = e {
+                                match input_schema.index_of_column(col) {
+                                    Ok(idx) => {
+                                        // index physical field using logical 
field index
+                                        
Ok(input_exec.schema().field(idx).name().to_string())
+                                    }
+                                    // logical column is not a derived column, 
safe to pass along to
+                                    // physical_name
+                                    Err(_) => physical_name(e),
                                 }
-                                // logical column is not a derived column, 
safe to pass along to
-                                // physical_name
-                                Err(_) => physical_name(e),
-                            }
-                        } else {
-                            physical_name(e)
-                        };
+                            } else {
+                                physical_name(e)
+                            };
 
-                        tuple_err((
-                            self.create_physical_expr(
-                                e,
-                                input_schema,
-                                &input_exec.schema(),
-                                ctx_state,
-                            ),
-                            physical_name,
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(ProjectionExec::try_new(
-                    physical_exprs,
-                    input_exec,
-                )?))
-            }
-            LogicalPlan::Filter {
-                input, predicate, ..
-            } => {
-                let physical_input = self.create_initial_plan(input, 
ctx_state)?;
-                let input_schema = physical_input.as_ref().schema();
-                let input_dfschema = input.as_ref().schema();
-                let runtime_expr = self.create_physical_expr(
-                    predicate,
-                    input_dfschema,
-                    &input_schema,
-                    ctx_state,
-                )?;
-                Ok(Arc::new(FilterExec::try_new(runtime_expr, 
physical_input)?))
-            }
-            LogicalPlan::Union { inputs, .. } => {
-                let physical_plans = inputs
-                    .iter()
-                    .map(|input| self.create_initial_plan(input, ctx_state))
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(Arc::new(UnionExec::new(physical_plans)))
-            }
-            LogicalPlan::Repartition {
-                input,
-                partitioning_scheme,
-            } => {
-                let physical_input = self.create_initial_plan(input, 
ctx_state)?;
-                let input_schema = physical_input.schema();
-                let input_dfschema = input.as_ref().schema();
-                let physical_partitioning = match partitioning_scheme {
-                    LogicalPartitioning::RoundRobinBatch(n) => {
-                        Partitioning::RoundRobinBatch(*n)
-                    }
-                    LogicalPartitioning::Hash(expr, n) => {
-                        let runtime_expr = expr
-                            .iter()
-                            .map(|e| {
+                            tuple_err((
                                 self.create_physical_expr(
                                     e,
-                                    input_dfschema,
-                                    &input_schema,
+                                    input_schema,
+                                    &input_exec.schema(),
                                     ctx_state,
-                                )
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        Partitioning::Hash(runtime_expr, *n)
-                    }
-                };
-                Ok(Arc::new(RepartitionExec::try_new(
-                    physical_input,
-                    physical_partitioning,
-                )?))
-            }
-            LogicalPlan::Sort { expr, input, .. } => {
-                let physical_input = self.create_initial_plan(input, 
ctx_state)?;
-                let input_schema = physical_input.as_ref().schema();
-                let input_dfschema = input.as_ref().schema();
+                                ),
+                                physical_name,
+                            ))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
 
-                let sort_expr = expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort {
-                            expr,
-                            asc,
-                            nulls_first,
-                        } => self.create_physical_sort_expr(
-                            expr,
-                            input_dfschema,
-                            &input_schema,
-                            SortOptions {
-                                descending: !*asc,
-                                nulls_first: *nulls_first,
-                            },
-                            ctx_state,
-                        ),
-                        _ => Err(DataFusionError::Plan(
-                            "Sort only accepts sort expressions".to_string(),
-                        )),
-                    })
-                    .collect::<Result<Vec<_>>>()?;
+                    Ok(Arc::new(ProjectionExec::try_new(
+                        physical_exprs,
+                        input_exec,
+                    )?) )
+                }
+                LogicalPlan::Filter {
+                    input, predicate, ..
+                } => {
+                    let physical_input = self.create_initial_plan(input, 
ctx_state).await?;
+                    let input_schema = physical_input.as_ref().schema();
+                    let input_dfschema = input.as_ref().schema();
+                    let runtime_expr = self.create_physical_expr(
+                        predicate,
+                        input_dfschema,
+                        &input_schema,
+                        ctx_state,
+                    )?;
+                    Ok(Arc::new(FilterExec::try_new(runtime_expr, 
physical_input)?) )
+                }
+                LogicalPlan::Union { inputs, .. } => {
+                    let physical_plan_futures = inputs

Review comment:
       I found it!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to