rdettai commented on a change in pull request #1013:
URL: https://github.com/apache/arrow-datafusion/pull/1013#discussion_r711000229
##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -296,520 +301,536 @@ impl DefaultPhysicalPlanner {
}
/// Create a physical plan from a logical plan
- fn create_initial_plan(
- &self,
- logical_plan: &LogicalPlan,
- ctx_state: &ExecutionContextState,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let batch_size = ctx_state.config.batch_size;
-
- match logical_plan {
- LogicalPlan::TableScan {
- source,
- projection,
- filters,
- limit,
- ..
- } => {
- // Remove all qualifiers from the scan as the provider
- // doesn't know (nor should care) how the relation was
- // referred to in the query
- let filters = unnormalize_cols(filters.iter().cloned());
- source.scan(projection, batch_size, &filters, *limit)
- }
- LogicalPlan::Window {
- input, window_expr, ..
- } => {
- if window_expr.is_empty() {
- return Err(DataFusionError::Internal(
- "Impossibly got empty window expression".to_owned(),
- ));
+ fn create_initial_plan<'a>(
+ &'a self,
+ logical_plan: &'a LogicalPlan,
+ ctx_state: &'a ExecutionContextState,
+ ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
+ async move {
+ let batch_size = ctx_state.config.batch_size;
+
+ // TODO make this configurable
+ let parallelism = 4;
+
+ let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan
{
+ LogicalPlan::TableScan {
+ source,
+ projection,
+ filters,
+ limit,
+ ..
+ } => {
+ // Remove all qualifiers from the scan as the provider
+ // doesn't know (nor should care) how the relation was
+ // referred to in the query
+ let filters = unnormalize_cols(filters.iter().cloned());
+ source.scan(projection, batch_size, &filters, *limit).await
}
+ LogicalPlan::Window {
+ input, window_expr, ..
+ } => {
+ if window_expr.is_empty() {
+ return Err(DataFusionError::Internal(
+ "Impossibly got empty window
expression".to_owned(),
+ ));
+ }
+
+ let input_exec = self.create_initial_plan(input,
ctx_state).await?;
- let input_exec = self.create_initial_plan(input, ctx_state)?;
+ // at this moment we are guaranteed by the logical planner
+ // to have all the window_expr to have equal sort key
+ let partition_keys =
window_expr_common_partition_keys(window_expr)?;
- // at this moment we are guaranteed by the logical planner
- // to have all the window_expr to have equal sort key
- let partition_keys =
window_expr_common_partition_keys(window_expr)?;
+ let can_repartition = !partition_keys.is_empty()
+ && ctx_state.config.target_partitions > 1
+ && ctx_state.config.repartition_windows;
+
+ let input_exec = if can_repartition {
+ let partition_keys = partition_keys
+ .iter()
+ .map(|e| {
+ self.create_physical_expr(
+ e,
+ input.schema(),
+ &input_exec.schema(),
+ ctx_state,
+ )
+ })
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+ Arc::new(RepartitionExec::try_new(
+ input_exec,
+ Partitioning::Hash(
+ partition_keys,
+ ctx_state.config.target_partitions,
+ ),
+ )?)
+ } else {
+ input_exec
+ };
+
+ // add a sort phase
+ let get_sort_keys = |expr: &Expr| match expr {
+ Expr::WindowFunction {
+ ref partition_by,
+ ref order_by,
+ ..
+ } => generate_sort_key(partition_by, order_by),
+ _ => unreachable!(),
+ };
+ let sort_keys = get_sort_keys(&window_expr[0]);
+ if window_expr.len() > 1 {
+ debug_assert!(
+ window_expr[1..]
+ .iter()
+ .all(|expr| get_sort_keys(expr) == sort_keys),
+ "all window expressions shall have the same sort
keys, as guaranteed by logical planning"
+ );
+ }
- let can_repartition = !partition_keys.is_empty()
- && ctx_state.config.target_partitions > 1
- && ctx_state.config.repartition_windows;
+ let logical_input_schema = input.schema();
- let input_exec = if can_repartition {
- let partition_keys = partition_keys
+ let input_exec = if sort_keys.is_empty() {
+ input_exec
+ } else {
+ let physical_input_schema = input_exec.schema();
+ let sort_keys = sort_keys
+ .iter()
+ .map(|e| match e {
+ Expr::Sort {
+ expr,
+ asc,
+ nulls_first,
+ } => self.create_physical_sort_expr(
+ expr,
+ logical_input_schema,
+ &physical_input_schema,
+ SortOptions {
+ descending: !*asc,
+ nulls_first: *nulls_first,
+ },
+ ctx_state,
+ ),
+ _ => unreachable!(),
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Arc::new(if can_repartition {
+ SortExec::new_with_partitioning(sort_keys,
input_exec, true)
+ } else {
+ SortExec::try_new(sort_keys, input_exec)?
+ })
+ };
+
+ let physical_input_schema = input_exec.schema();
+ let window_expr = window_expr
.iter()
.map(|e| {
- self.create_physical_expr(
+ self.create_window_expr(
e,
- input.schema(),
- &input_exec.schema(),
+ logical_input_schema,
+ &physical_input_schema,
ctx_state,
)
})
- .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
- Arc::new(RepartitionExec::try_new(
- input_exec,
- Partitioning::Hash(
- partition_keys,
- ctx_state.config.target_partitions,
- ),
- )?)
- } else {
- input_exec
- };
+ .collect::<Result<Vec<_>>>()?;
- // add a sort phase
- let get_sort_keys = |expr: &Expr| match expr {
- Expr::WindowFunction {
- ref partition_by,
- ref order_by,
- ..
- } => generate_sort_key(partition_by, order_by),
- _ => unreachable!(),
- };
- let sort_keys = get_sort_keys(&window_expr[0]);
- if window_expr.len() > 1 {
- debug_assert!(
- window_expr[1..]
- .iter()
- .all(|expr| get_sort_keys(expr) == sort_keys),
- "all window expressions shall have the same sort keys,
as guaranteed by logical planning"
- );
+ Ok(Arc::new(WindowAggExec::try_new(
+ window_expr,
+ input_exec,
+ physical_input_schema,
+ )?) )
}
-
- let logical_input_schema = input.schema();
-
- let input_exec = if sort_keys.is_empty() {
- input_exec
- } else {
+ LogicalPlan::Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ ..
+ } => {
+ // Initially need to perform the aggregate and then merge
the partitions
+ let input_exec = self.create_initial_plan(input,
ctx_state).await?;
let physical_input_schema = input_exec.schema();
- let sort_keys = sort_keys
+ let logical_input_schema = input.as_ref().schema();
+
+ let groups = group_expr
.iter()
- .map(|e| match e {
- Expr::Sort {
- expr,
- asc,
- nulls_first,
- } => self.create_physical_sort_expr(
- expr,
+ .map(|e| {
+ tuple_err((
+ self.create_physical_expr(
+ e,
+ logical_input_schema,
+ &physical_input_schema,
+ ctx_state,
+ ),
+ physical_name(e),
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let aggregates = aggr_expr
+ .iter()
+ .map(|e| {
+ self.create_aggregate_expr(
+ e,
logical_input_schema,
&physical_input_schema,
- SortOptions {
- descending: !*asc,
- nulls_first: *nulls_first,
- },
ctx_state,
- ),
- _ => unreachable!(),
+ )
})
.collect::<Result<Vec<_>>>()?;
- Arc::new(if can_repartition {
- SortExec::new_with_partitioning(sort_keys, input_exec,
true)
- } else {
- SortExec::try_new(sort_keys, input_exec)?
- })
- };
- let physical_input_schema = input_exec.schema();
- let window_expr = window_expr
- .iter()
- .map(|e| {
- self.create_window_expr(
- e,
- logical_input_schema,
- &physical_input_schema,
- ctx_state,
- )
- })
- .collect::<Result<Vec<_>>>()?;
+ let initial_aggr = Arc::new(HashAggregateExec::try_new(
+ AggregateMode::Partial,
+ groups.clone(),
+ aggregates.clone(),
+ input_exec,
+ physical_input_schema.clone(),
+ )?);
- Ok(Arc::new(WindowAggExec::try_new(
- window_expr,
- input_exec,
- physical_input_schema,
- )?))
- }
- LogicalPlan::Aggregate {
- input,
- group_expr,
- aggr_expr,
- ..
- } => {
- // Initially need to perform the aggregate and then merge the
partitions
- let input_exec = self.create_initial_plan(input, ctx_state)?;
- let physical_input_schema = input_exec.schema();
- let logical_input_schema = input.as_ref().schema();
+ // update group column indices based on partial aggregate
plan evaluation
+ let final_group: Vec<Arc<dyn PhysicalExpr>> =
(0..groups.len())
+ .map(|i| col(&groups[i].1, &initial_aggr.schema()))
+ .collect::<Result<_>>()?;
- let groups = group_expr
- .iter()
- .map(|e| {
- tuple_err((
- self.create_physical_expr(
- e,
- logical_input_schema,
- &physical_input_schema,
- ctx_state,
+ // TODO: dictionary type not yet supported in Hash
Repartition
+ let contains_dict = groups
+ .iter()
+ .flat_map(|x|
x.0.data_type(physical_input_schema.as_ref()))
+ .any(|x| matches!(x, DataType::Dictionary(_, _)));
+
+ let can_repartition = !groups.is_empty()
+ && ctx_state.config.target_partitions > 1
+ && ctx_state.config.repartition_aggregations
+ && !contains_dict;
+
+ let (initial_aggr, next_partition_mode): (
+ Arc<dyn ExecutionPlan>,
+ AggregateMode,
+ ) = if can_repartition {
+ // Divide partial hash aggregates into multiple
partitions by hash key
+ let hash_repartition =
Arc::new(RepartitionExec::try_new(
+ initial_aggr,
+ Partitioning::Hash(
+ final_group.clone(),
+ ctx_state.config.target_partitions,
),
- physical_name(e),
- ))
- })
- .collect::<Result<Vec<_>>>()?;
- let aggregates = aggr_expr
- .iter()
- .map(|e| {
- self.create_aggregate_expr(
- e,
- logical_input_schema,
- &physical_input_schema,
- ctx_state,
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- let initial_aggr = Arc::new(HashAggregateExec::try_new(
- AggregateMode::Partial,
- groups.clone(),
- aggregates.clone(),
- input_exec,
- physical_input_schema.clone(),
- )?);
-
- // update group column indices based on partial aggregate plan
evaluation
- let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
- .map(|i| col(&groups[i].1, &initial_aggr.schema()))
- .collect::<Result<_>>()?;
-
- // TODO: dictionary type not yet supported in Hash Repartition
- let contains_dict = groups
- .iter()
- .flat_map(|x|
x.0.data_type(physical_input_schema.as_ref()))
- .any(|x| matches!(x, DataType::Dictionary(_, _)));
-
- let can_repartition = !groups.is_empty()
- && ctx_state.config.target_partitions > 1
- && ctx_state.config.repartition_aggregations
- && !contains_dict;
-
- let (initial_aggr, next_partition_mode): (
- Arc<dyn ExecutionPlan>,
- AggregateMode,
- ) = if can_repartition {
- // Divide partial hash aggregates into multiple partitions
by hash key
- let hash_repartition = Arc::new(RepartitionExec::try_new(
+ )?);
+ // Combine hash aggregates within the partition
+ (hash_repartition, AggregateMode::FinalPartitioned)
+ } else {
+ // construct a second aggregation, keeping the final
column name equal to the
+ // first aggregation and the expressions corresponding
to the respective aggregate
+ (initial_aggr, AggregateMode::Final)
+ };
+
+ Ok(Arc::new(HashAggregateExec::try_new(
+ next_partition_mode,
+ final_group
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| (expr.clone(),
groups[i].1.clone()))
+ .collect(),
+ aggregates,
initial_aggr,
- Partitioning::Hash(
- final_group.clone(),
- ctx_state.config.target_partitions,
- ),
- )?);
- // Combine hash aggregates within the partition
- (hash_repartition, AggregateMode::FinalPartitioned)
- } else {
- // construct a second aggregation, keeping the final
column name equal to the
- // first aggregation and the expressions corresponding to
the respective aggregate
- (initial_aggr, AggregateMode::Final)
- };
+ physical_input_schema.clone(),
+ )?) )
+ }
+ LogicalPlan::Projection { input, expr, .. } => {
+ let input_exec = self.create_initial_plan(input,
ctx_state).await?;
+ let input_schema = input.as_ref().schema();
- Ok(Arc::new(HashAggregateExec::try_new(
- next_partition_mode,
- final_group
+ let physical_exprs = expr
.iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
- .collect(),
- aggregates,
- initial_aggr,
- physical_input_schema.clone(),
- )?))
- }
- LogicalPlan::Projection { input, expr, .. } => {
- let input_exec = self.create_initial_plan(input, ctx_state)?;
- let input_schema = input.as_ref().schema();
-
- let physical_exprs = expr
- .iter()
- .map(|e| {
- // For projections, SQL planner and logical plan
builder may convert user
- // provided expressions into logical Column
expressions if their results
- // are already provided from the input plans. Because
we work with
- // qualified columns in logical plane, derived columns
involve operators or
- // functions will contain qualifers as well. This will
result in logical
- // columns with names like `SUM(t1.c1)`, `t1.c1 +
t1.c2`, etc.
- //
- // If we run these logical columns through
physical_name function, we will
- // get physical names with column qualifiers, which
violates Datafusion's
- // field name semantics. To account for this, we need
to derive the
- // physical name from physical input instead.
- //
- // This depends on the invariant that logical schema
field index MUST match
- // with physical schema field index.
- let physical_name = if let Expr::Column(col) = e {
- match input_schema.index_of_column(col) {
- Ok(idx) => {
- // index physical field using logical
field index
-
Ok(input_exec.schema().field(idx).name().to_string())
+ .map(|e| {
+ // For projections, SQL planner and logical plan
builder may convert user
+ // provided expressions into logical Column
expressions if their results
+ // are already provided from the input plans.
Because we work with
+ // qualified columns in logical plane, derived
columns involve operators or
+ // functions will contain qualifers as well. This
will result in logical
+ // columns with names like `SUM(t1.c1)`, `t1.c1 +
t1.c2`, etc.
+ //
+ // If we run these logical columns through
physical_name function, we will
+ // get physical names with column qualifiers,
which violates Datafusion's
+ // field name semantics. To account for this, we
need to derive the
+ // physical name from physical input instead.
+ //
+ // This depends on the invariant that logical
schema field index MUST match
+ // with physical schema field index.
+ let physical_name = if let Expr::Column(col) = e {
+ match input_schema.index_of_column(col) {
+ Ok(idx) => {
+ // index physical field using logical
field index
+
Ok(input_exec.schema().field(idx).name().to_string())
+ }
+ // logical column is not a derived column,
safe to pass along to
+ // physical_name
+ Err(_) => physical_name(e),
}
- // logical column is not a derived column,
safe to pass along to
- // physical_name
- Err(_) => physical_name(e),
- }
- } else {
- physical_name(e)
- };
+ } else {
+ physical_name(e)
+ };
- tuple_err((
- self.create_physical_expr(
- e,
- input_schema,
- &input_exec.schema(),
- ctx_state,
- ),
- physical_name,
- ))
- })
- .collect::<Result<Vec<_>>>()?;
-
- Ok(Arc::new(ProjectionExec::try_new(
- physical_exprs,
- input_exec,
- )?))
- }
- LogicalPlan::Filter {
- input, predicate, ..
- } => {
- let physical_input = self.create_initial_plan(input,
ctx_state)?;
- let input_schema = physical_input.as_ref().schema();
- let input_dfschema = input.as_ref().schema();
- let runtime_expr = self.create_physical_expr(
- predicate,
- input_dfschema,
- &input_schema,
- ctx_state,
- )?;
- Ok(Arc::new(FilterExec::try_new(runtime_expr,
physical_input)?))
- }
- LogicalPlan::Union { inputs, .. } => {
- let physical_plans = inputs
- .iter()
- .map(|input| self.create_initial_plan(input, ctx_state))
- .collect::<Result<Vec<_>>>()?;
- Ok(Arc::new(UnionExec::new(physical_plans)))
- }
- LogicalPlan::Repartition {
- input,
- partitioning_scheme,
- } => {
- let physical_input = self.create_initial_plan(input,
ctx_state)?;
- let input_schema = physical_input.schema();
- let input_dfschema = input.as_ref().schema();
- let physical_partitioning = match partitioning_scheme {
- LogicalPartitioning::RoundRobinBatch(n) => {
- Partitioning::RoundRobinBatch(*n)
- }
- LogicalPartitioning::Hash(expr, n) => {
- let runtime_expr = expr
- .iter()
- .map(|e| {
+ tuple_err((
self.create_physical_expr(
e,
- input_dfschema,
- &input_schema,
+ input_schema,
+ &input_exec.schema(),
ctx_state,
- )
- })
- .collect::<Result<Vec<_>>>()?;
- Partitioning::Hash(runtime_expr, *n)
- }
- };
- Ok(Arc::new(RepartitionExec::try_new(
- physical_input,
- physical_partitioning,
- )?))
- }
- LogicalPlan::Sort { expr, input, .. } => {
- let physical_input = self.create_initial_plan(input,
ctx_state)?;
- let input_schema = physical_input.as_ref().schema();
- let input_dfschema = input.as_ref().schema();
+ ),
+ physical_name,
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?;
- let sort_expr = expr
- .iter()
- .map(|e| match e {
- Expr::Sort {
- expr,
- asc,
- nulls_first,
- } => self.create_physical_sort_expr(
- expr,
- input_dfschema,
- &input_schema,
- SortOptions {
- descending: !*asc,
- nulls_first: *nulls_first,
- },
- ctx_state,
- ),
- _ => Err(DataFusionError::Plan(
- "Sort only accepts sort expressions".to_string(),
- )),
- })
- .collect::<Result<Vec<_>>>()?;
+ Ok(Arc::new(ProjectionExec::try_new(
+ physical_exprs,
+ input_exec,
+ )?) )
+ }
+ LogicalPlan::Filter {
+ input, predicate, ..
+ } => {
+ let physical_input = self.create_initial_plan(input,
ctx_state).await?;
+ let input_schema = physical_input.as_ref().schema();
+ let input_dfschema = input.as_ref().schema();
+ let runtime_expr = self.create_physical_expr(
+ predicate,
+ input_dfschema,
+ &input_schema,
+ ctx_state,
+ )?;
+ Ok(Arc::new(FilterExec::try_new(runtime_expr,
physical_input)?) )
+ }
+ LogicalPlan::Union { inputs, .. } => {
+ let physical_plan_futures = inputs
Review comment:
Actually, this was not planned to be a change, it just comes from the
fact that you cannot await in a `map` closure, you need to convert it to a
stream. Then the `buffered` part is somehow required to meet the `try_collect`
trait bounds. So I guess that if I use `buffered(1)`, that would keep the exact
same behavior as before, right? Do you have a more elegant solution?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]