Jefffrey commented on code in PR #10023:
URL:
https://github.com/apache/arrow-datafusion/pull/10023#discussion_r1559153113
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +515,957 @@ impl DefaultPhysicalPlanner {
Self { extension_planners }
}
- /// Create a physical plans for multiple logical plans.
- ///
- /// This is the same as [`create_initial_plan`](Self::create_initial_plan)
but runs the planning concurrently.
- ///
- /// The result order is the same as the input order.
- fn create_initial_plan_multi<'a>(
- &'a self,
- logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
- session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
- async move {
- // First build futures with as little references as possible, then
performing some stream magic.
- // Otherwise rustc bails out w/:
- //
- // error: higher-ranked lifetime error
- // ...
- // note: could not prove `[async block@...]: std::marker::Send`
- let futures = logical_plans
- .into_iter()
- .enumerate()
- .map(|(idx, lp)| async move {
- let plan = self.create_initial_plan(lp,
session_state).await?;
- Ok((idx, plan)) as Result<_>
- })
- .collect::<Vec<_>>();
-
- let mut physical_plans = futures::stream::iter(futures)
- .buffer_unordered(
- session_state
- .config_options()
- .execution
- .planning_concurrency,
- )
- .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
- .await?;
- physical_plans.sort_by_key(|(idx, _plan)| *idx);
- let physical_plans = physical_plans
- .into_iter()
- .map(|(_idx, plan)| plan)
- .collect::<Vec<_>>();
- Ok(physical_plans)
- }
- .boxed()
+ /// Create a physical plan from a logical plan
+ async fn create_initial_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // DFS the tree to flatten it into a Vec.
+ // This will allow us to build the Physical Plan from the leaves up
+ // to avoid recursion, and also to make it easier to build a valid
+ // Physical Plan from the start and not rely on some intermediate
+ // representation (since parents need to know their children at
+ // construction time).
+ let mut flat_tree = vec![];
+ let mut dfs_visit_stack = vec![(None, logical_plan)];
+ // Use this to be able to find the leaves to start construction bottom
+ // up concurrently.
+ let mut flat_tree_leaf_indices = vec![];
+ while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+ let current_index = flat_tree.len();
+ // Because of how we extend the visit stack here, we visit the
children
+ // in reverse order of how they appeart, so later we need to
reverse
+ // the order of children when building the nodes.
+ dfs_visit_stack
+ .extend(node.inputs().iter().map(|&n| (Some(current_index),
n)));
+ let state = match node.inputs().len() {
+ 0 => {
+ flat_tree_leaf_indices.push(current_index);
+ NodeState::ZeroOrOneChild
+ }
+ 1 => NodeState::ZeroOrOneChild,
+ _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])),
+ };
+ let node = LogicalNode {
+ node,
+ parent_index,
+ state,
+ };
+ flat_tree.push(node);
+ }
+ let flat_tree = Arc::new(flat_tree);
+
+ let planning_concurrency = session_state
+ .config_options()
+ .execution
+ .planning_concurrency;
+ // Can never spawn more tasks than leaves in the tree, as these tasks
must
+ // all converge down to the root node, which can only be processed by a
+ // single task.
+ let max_concurrency =
planning_concurrency.min(flat_tree_leaf_indices.len());
Review Comment:
I think this may be more accurate than what is currently present:
https://github.com/apache/arrow-datafusion/blob/215f30f74a12e91fd7dca0d30e37014c8c3493ed/datafusion/core/src/physical_planner.rs#L499-L542
Because the current `create_initial_plan_multi` could be called multiple
times, and the `planning_concurrency` is only enforced within this function
call, so if its called multiple times it can spawn more tasks than is
configured by `planning_concurrency`. Maybe this is intended?
Either way, with this new code, it will actually limit how many tasks are
building the tree for the entire initial planning process.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]