Jefffrey commented on code in PR #10023:
URL:
https://github.com/apache/arrow-datafusion/pull/10023#discussion_r1559156686
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1941,6 +2141,59 @@ impl DefaultPhysicalPlanner {
let mem_exec = MemoryExec::try_new(&partitions, schema, projection)?;
Ok(Arc::new(mem_exec))
}
+
+ fn create_project_physical_exec(
+ &self,
+ session_state: &SessionState,
+ input_exec: Arc<dyn ExecutionPlan>,
+ input: &Arc<LogicalPlan>,
+ expr: &[Expr],
+ ) -> Result<Arc<dyn ExecutionPlan>> {
Review Comment:
Just extracted this into separate function as it was also used by `Join` to
create the physical projections that are added during this planning if join has
expression equijoin keys.
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +515,957 @@ impl DefaultPhysicalPlanner {
Self { extension_planners }
}
- /// Create a physical plans for multiple logical plans.
- ///
- /// This is the same as [`create_initial_plan`](Self::create_initial_plan)
but runs the planning concurrently.
- ///
- /// The result order is the same as the input order.
- fn create_initial_plan_multi<'a>(
- &'a self,
- logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
- session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
- async move {
- // First build futures with as little references as possible, then
performing some stream magic.
- // Otherwise rustc bails out w/:
- //
- // error: higher-ranked lifetime error
- // ...
- // note: could not prove `[async block@...]: std::marker::Send`
- let futures = logical_plans
- .into_iter()
- .enumerate()
- .map(|(idx, lp)| async move {
- let plan = self.create_initial_plan(lp,
session_state).await?;
- Ok((idx, plan)) as Result<_>
- })
- .collect::<Vec<_>>();
-
- let mut physical_plans = futures::stream::iter(futures)
- .buffer_unordered(
- session_state
- .config_options()
- .execution
- .planning_concurrency,
- )
- .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
- .await?;
- physical_plans.sort_by_key(|(idx, _plan)| *idx);
- let physical_plans = physical_plans
- .into_iter()
- .map(|(_idx, plan)| plan)
- .collect::<Vec<_>>();
- Ok(physical_plans)
- }
- .boxed()
+ /// Create a physical plan from a logical plan
+ async fn create_initial_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // DFS the tree to flatten it into a Vec.
+ // This will allow us to build the Physical Plan from the leaves up
+ // to avoid recursion, and also to make it easier to build a valid
+ // Physical Plan from the start and not rely on some intermediate
+ // representation (since parents need to know their children at
+ // construction time).
+ let mut flat_tree = vec![];
+ let mut dfs_visit_stack = vec![(None, logical_plan)];
+ // Use this to be able to find the leaves to start construction bottom
+ // up concurrently.
+ let mut flat_tree_leaf_indices = vec![];
+ while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+ let current_index = flat_tree.len();
+ // Because of how we extend the visit stack here, we visit the
children
+ // in reverse order of how they appeart, so later we need to
reverse
+ // the order of children when building the nodes.
+ dfs_visit_stack
+ .extend(node.inputs().iter().map(|&n| (Some(current_index),
n)));
+ let state = match node.inputs().len() {
+ 0 => {
+ flat_tree_leaf_indices.push(current_index);
+ NodeState::ZeroOrOneChild
+ }
+ 1 => NodeState::ZeroOrOneChild,
+ _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])),
+ };
+ let node = LogicalNode {
+ node,
+ parent_index,
+ state,
+ };
+ flat_tree.push(node);
+ }
+ let flat_tree = Arc::new(flat_tree);
+
+ let planning_concurrency = session_state
+ .config_options()
+ .execution
+ .planning_concurrency;
+ // Can never spawn more tasks than leaves in the tree, as these tasks
must
+ // all converge down to the root node, which can only be processed by a
+ // single task.
+ let max_concurrency =
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+ // Spawning tasks which will traverse leaf up to the root.
+ let tasks = flat_tree_leaf_indices
+ .into_iter()
+ .map(|index| self.task_helper(index, flat_tree.clone(),
session_state))
+ .collect::<Vec<_>>();
+ let mut outputs = futures::stream::iter(tasks)
+ .buffer_unordered(max_concurrency)
+ .try_collect::<Vec<_>>()
+ .await?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>();
+ // Ideally this never happens if we have a valid LogicalPlan tree
+ assert!(
+ outputs.len() == 1,
+ "Invalid physical plan created from logical plan"
+ );
+ let plan = outputs.pop().unwrap();
+ Ok(plan)
}
- /// Create a physical plan from a logical plan
- fn create_initial_plan<'a>(
+ /// These tasks start at a leaf and traverse up the tree towards the root,
building
+ /// an ExecutionPlan as they go. When they reach a node with two or more
children,
+ /// they append their current result (a child of the parent node) to the
children
+ /// vector, and if this is sufficient to create the parent then continues
traversing
+ /// the tree to create nodes. Otherwise, the task terminates.
+ async fn task_helper<'a>(
&'a self,
- logical_plan: &'a LogicalPlan,
+ leaf_starter_index: usize,
+ flat_tree: Arc<Vec<LogicalNode<'a>>>,
session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
- async move {
- let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan
{
- LogicalPlan::TableScan(TableScan {
- source,
- projection,
- filters,
- fetch,
- ..
- }) => {
- let source = source_as_provider(source)?;
- // Remove all qualifiers from the scan as the provider
- // doesn't know (nor should care) how the relation was
- // referred to in the query
- let filters = unnormalize_cols(filters.iter().cloned());
- source.scan(session_state, projection.as_ref(), &filters,
*fetch).await
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ // We always start with a leaf, so can ignore status and pass empty
children
+ let mut node = &flat_tree[leaf_starter_index];
+ let mut plan = self
+ .map_logical_node_to_physical(node.node, session_state, vec![])
+ .await?;
+ let mut current_index = leaf_starter_index;
+ // parent_index is None only for root
+ while let Some(parent_index) = node.parent_index {
+ node = &flat_tree[parent_index];
+ match &node.state {
+ NodeState::ZeroOrOneChild => {
+ plan = self
+ .map_logical_node_to_physical(
+ node.node,
+ session_state,
+ vec![plan],
+ )
+ .await?;
}
- LogicalPlan::Copy(CopyTo{
- input,
- output_url,
- format_options,
- partition_by,
- options: source_option_tuples
- }) => {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- let parsed_url = ListingTableUrl::parse(output_url)?;
- let object_store_url = parsed_url.object_store();
-
- let schema: Schema = (**input.schema()).clone().into();
-
- // Note: the DataType passed here is ignored for the
purposes of writing and inferred instead
- // from the schema of the RecordBatch being written. This
allows COPY statements to specify only
- // the column name rather than column name + explicit data
type.
- let table_partition_cols = partition_by.iter()
- .map(|s| (s.to_string(), arrow_schema::DataType::Null))
- .collect::<Vec<_>>();
-
- // Set file sink related options
- let config = FileSinkConfig {
- object_store_url,
- table_paths: vec![parsed_url],
- file_groups: vec![],
- output_schema: Arc::new(schema),
- table_partition_cols,
- overwrite: false,
- };
- let mut table_options =
session_state.default_table_options();
- let sink_format: Arc<dyn FileFormat> = match
format_options {
- FormatOptions::CSV(options) => {
- table_options.csv = options.clone();
- table_options.set_file_format(FileType::CSV);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(CsvFormat::default().with_options(table_options.csv))
- },
- FormatOptions::JSON(options) => {
- table_options.json = options.clone();
- table_options.set_file_format(FileType::JSON);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(JsonFormat::default().with_options(table_options.json))
- },
- #[cfg(feature = "parquet")]
- FormatOptions::PARQUET(options) => {
- table_options.parquet = options.clone();
- table_options.set_file_format(FileType::PARQUET);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
- },
- FormatOptions::AVRO => Arc::new(AvroFormat {} ),
- FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+ // See if we have all children to build the node.
+ NodeState::TwoOrMoreChildren(children) => {
+ let mut children = {
+ let mut children = children.lock().unwrap();
+ // Add our contribution to this parent node.
+ children.push((current_index, plan));
+ if children.len() < node.node.inputs().len() {
+ // This node is not ready yet, still pending more
children.
+ // This task is finished forever.
+ return Ok(None);
+ } else {
+ // With this task's contribution we have enough
children.
+ // This task is the only one building this node
now, and thus
+ // no other task will need the Mutex for this node.
+
+ // TODO: How to do this without a clone? Take from
the inner Mutex?
+ children.clone()
+ }
};
- sink_format.create_writer_physical_plan(input_exec,
session_state, config, None).await
+ // Indices refer to position in flat tree Vec, which means
they are
+ // guaranteed to be unique, hence unstable sort used.
+ // We reverse sort because of how we visited the node in
the initial
+ // DFS traversal (see above).
+ children.sort_unstable_by_key(|(index, _)|
std::cmp::Reverse(*index));
+ let children =
+ children.iter().map(|(_, plan)|
plan.clone()).collect();
+
+ plan = self
+ .map_logical_node_to_physical(node.node,
session_state, children)
+ .await?;
}
- LogicalPlan::Dml(DmlStatement {
- table_name,
- op: WriteOp::InsertInto,
- input,
- ..
- }) => {
- let name = table_name.table();
- let schema =
session_state.schema_for_ref(table_name.clone())?;
- if let Some(provider) = schema.table(name).await? {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- provider.insert_into(session_state, input_exec,
false).await
- } else {
- return exec_err!(
- "Table '{table_name}' does not exist"
- );
- }
- }
- LogicalPlan::Dml(DmlStatement {
- table_name,
- op: WriteOp::InsertOverwrite,
- input,
- ..
- }) => {
- let name = table_name.table();
- let schema =
session_state.schema_for_ref(table_name.clone())?;
- if let Some(provider) = schema.table(name).await? {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- provider.insert_into(session_state, input_exec,
true).await
- } else {
- return exec_err!(
- "Table '{table_name}' does not exist"
- );
- }
- }
- LogicalPlan::Values(Values {
- values,
- schema,
- }) => {
- let exec_schema = schema.as_ref().to_owned().into();
- let exprs = values.iter()
- .map(|row| {
- row.iter().map(|expr| {
- self.create_physical_expr(
- expr,
- schema,
- session_state,
- )
+ }
+ current_index = parent_index;
+ }
+ // Only one task should ever reach this point for a valid LogicalPlan
tree.
+ Ok(Some(plan))
+ }
+
+ /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan
counterpart.
+ async fn map_logical_node_to_physical(
+ &self,
+ node: &LogicalPlan,
+ session_state: &SessionState,
+ // TODO: refactor to not use Vec? Wasted for leaves/1 child
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let exec_node: Arc<dyn ExecutionPlan> = match node {
+ // Leaves (no children)
+ LogicalPlan::TableScan(TableScan {
+ source,
+ projection,
+ filters,
+ fetch,
+ ..
+ }) => {
+ let source = source_as_provider(source)?;
Review Comment:
This is largely unchanged, except for some minor changes to accommodate
passing in the children, and also for joins (see next note). Can extract to
separate PR to make things cleaner?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +515,957 @@ impl DefaultPhysicalPlanner {
Self { extension_planners }
}
- /// Create a physical plans for multiple logical plans.
- ///
- /// This is the same as [`create_initial_plan`](Self::create_initial_plan)
but runs the planning concurrently.
- ///
- /// The result order is the same as the input order.
- fn create_initial_plan_multi<'a>(
- &'a self,
- logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
- session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
- async move {
- // First build futures with as little references as possible, then
performing some stream magic.
- // Otherwise rustc bails out w/:
- //
- // error: higher-ranked lifetime error
- // ...
- // note: could not prove `[async block@...]: std::marker::Send`
- let futures = logical_plans
- .into_iter()
- .enumerate()
- .map(|(idx, lp)| async move {
- let plan = self.create_initial_plan(lp,
session_state).await?;
- Ok((idx, plan)) as Result<_>
- })
- .collect::<Vec<_>>();
-
- let mut physical_plans = futures::stream::iter(futures)
- .buffer_unordered(
- session_state
- .config_options()
- .execution
- .planning_concurrency,
- )
- .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
- .await?;
- physical_plans.sort_by_key(|(idx, _plan)| *idx);
- let physical_plans = physical_plans
- .into_iter()
- .map(|(_idx, plan)| plan)
- .collect::<Vec<_>>();
- Ok(physical_plans)
- }
- .boxed()
+ /// Create a physical plan from a logical plan
+ async fn create_initial_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // DFS the tree to flatten it into a Vec.
+ // This will allow us to build the Physical Plan from the leaves up
+ // to avoid recursion, and also to make it easier to build a valid
+ // Physical Plan from the start and not rely on some intermediate
+ // representation (since parents need to know their children at
+ // construction time).
+ let mut flat_tree = vec![];
+ let mut dfs_visit_stack = vec![(None, logical_plan)];
+ // Use this to be able to find the leaves to start construction bottom
+ // up concurrently.
+ let mut flat_tree_leaf_indices = vec![];
+ while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+ let current_index = flat_tree.len();
+ // Because of how we extend the visit stack here, we visit the
children
+ // in reverse order of how they appeart, so later we need to
reverse
+ // the order of children when building the nodes.
+ dfs_visit_stack
+ .extend(node.inputs().iter().map(|&n| (Some(current_index),
n)));
+ let state = match node.inputs().len() {
+ 0 => {
+ flat_tree_leaf_indices.push(current_index);
+ NodeState::ZeroOrOneChild
+ }
+ 1 => NodeState::ZeroOrOneChild,
+ _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])),
+ };
+ let node = LogicalNode {
+ node,
+ parent_index,
+ state,
+ };
+ flat_tree.push(node);
+ }
+ let flat_tree = Arc::new(flat_tree);
+
+ let planning_concurrency = session_state
+ .config_options()
+ .execution
+ .planning_concurrency;
+ // Can never spawn more tasks than leaves in the tree, as these tasks
must
+ // all converge down to the root node, which can only be processed by a
+ // single task.
+ let max_concurrency =
planning_concurrency.min(flat_tree_leaf_indices.len());
+
+ // Spawning tasks which will traverse leaf up to the root.
+ let tasks = flat_tree_leaf_indices
+ .into_iter()
+ .map(|index| self.task_helper(index, flat_tree.clone(),
session_state))
+ .collect::<Vec<_>>();
+ let mut outputs = futures::stream::iter(tasks)
+ .buffer_unordered(max_concurrency)
+ .try_collect::<Vec<_>>()
+ .await?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>();
+ // Ideally this never happens if we have a valid LogicalPlan tree
+ assert!(
+ outputs.len() == 1,
+ "Invalid physical plan created from logical plan"
+ );
+ let plan = outputs.pop().unwrap();
+ Ok(plan)
}
- /// Create a physical plan from a logical plan
- fn create_initial_plan<'a>(
+ /// These tasks start at a leaf and traverse up the tree towards the root,
building
+ /// an ExecutionPlan as they go. When they reach a node with two or more
children,
+ /// they append their current result (a child of the parent node) to the
children
+ /// vector, and if this is sufficient to create the parent then continues
traversing
+ /// the tree to create nodes. Otherwise, the task terminates.
+ async fn task_helper<'a>(
&'a self,
- logical_plan: &'a LogicalPlan,
+ leaf_starter_index: usize,
+ flat_tree: Arc<Vec<LogicalNode<'a>>>,
session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
- async move {
- let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan
{
- LogicalPlan::TableScan(TableScan {
- source,
- projection,
- filters,
- fetch,
- ..
- }) => {
- let source = source_as_provider(source)?;
- // Remove all qualifiers from the scan as the provider
- // doesn't know (nor should care) how the relation was
- // referred to in the query
- let filters = unnormalize_cols(filters.iter().cloned());
- source.scan(session_state, projection.as_ref(), &filters,
*fetch).await
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ // We always start with a leaf, so can ignore status and pass empty
children
+ let mut node = &flat_tree[leaf_starter_index];
+ let mut plan = self
+ .map_logical_node_to_physical(node.node, session_state, vec![])
+ .await?;
+ let mut current_index = leaf_starter_index;
+ // parent_index is None only for root
+ while let Some(parent_index) = node.parent_index {
+ node = &flat_tree[parent_index];
+ match &node.state {
+ NodeState::ZeroOrOneChild => {
+ plan = self
+ .map_logical_node_to_physical(
+ node.node,
+ session_state,
+ vec![plan],
+ )
+ .await?;
}
- LogicalPlan::Copy(CopyTo{
- input,
- output_url,
- format_options,
- partition_by,
- options: source_option_tuples
- }) => {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- let parsed_url = ListingTableUrl::parse(output_url)?;
- let object_store_url = parsed_url.object_store();
-
- let schema: Schema = (**input.schema()).clone().into();
-
- // Note: the DataType passed here is ignored for the
purposes of writing and inferred instead
- // from the schema of the RecordBatch being written. This
allows COPY statements to specify only
- // the column name rather than column name + explicit data
type.
- let table_partition_cols = partition_by.iter()
- .map(|s| (s.to_string(), arrow_schema::DataType::Null))
- .collect::<Vec<_>>();
-
- // Set file sink related options
- let config = FileSinkConfig {
- object_store_url,
- table_paths: vec![parsed_url],
- file_groups: vec![],
- output_schema: Arc::new(schema),
- table_partition_cols,
- overwrite: false,
- };
- let mut table_options =
session_state.default_table_options();
- let sink_format: Arc<dyn FileFormat> = match
format_options {
- FormatOptions::CSV(options) => {
- table_options.csv = options.clone();
- table_options.set_file_format(FileType::CSV);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(CsvFormat::default().with_options(table_options.csv))
- },
- FormatOptions::JSON(options) => {
- table_options.json = options.clone();
- table_options.set_file_format(FileType::JSON);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(JsonFormat::default().with_options(table_options.json))
- },
- #[cfg(feature = "parquet")]
- FormatOptions::PARQUET(options) => {
- table_options.parquet = options.clone();
- table_options.set_file_format(FileType::PARQUET);
-
table_options.alter_with_string_hash_map(source_option_tuples)?;
-
Arc::new(ParquetFormat::default().with_options(table_options.parquet))
- },
- FormatOptions::AVRO => Arc::new(AvroFormat {} ),
- FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+ // See if we have all children to build the node.
+ NodeState::TwoOrMoreChildren(children) => {
+ let mut children = {
+ let mut children = children.lock().unwrap();
+ // Add our contribution to this parent node.
+ children.push((current_index, plan));
+ if children.len() < node.node.inputs().len() {
+ // This node is not ready yet, still pending more
children.
+ // This task is finished forever.
+ return Ok(None);
+ } else {
+ // With this task's contribution we have enough
children.
+ // This task is the only one building this node
now, and thus
+ // no other task will need the Mutex for this node.
+
+ // TODO: How to do this without a clone? Take from
the inner Mutex?
+ children.clone()
+ }
};
- sink_format.create_writer_physical_plan(input_exec,
session_state, config, None).await
+ // Indices refer to position in flat tree Vec, which means
they are
+ // guaranteed to be unique, hence unstable sort used.
+ // We reverse sort because of how we visited the node in
the initial
+ // DFS traversal (see above).
+ children.sort_unstable_by_key(|(index, _)|
std::cmp::Reverse(*index));
+ let children =
+ children.iter().map(|(_, plan)|
plan.clone()).collect();
+
+ plan = self
+ .map_logical_node_to_physical(node.node,
session_state, children)
+ .await?;
}
- LogicalPlan::Dml(DmlStatement {
- table_name,
- op: WriteOp::InsertInto,
- input,
- ..
- }) => {
- let name = table_name.table();
- let schema =
session_state.schema_for_ref(table_name.clone())?;
- if let Some(provider) = schema.table(name).await? {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- provider.insert_into(session_state, input_exec,
false).await
- } else {
- return exec_err!(
- "Table '{table_name}' does not exist"
- );
- }
- }
- LogicalPlan::Dml(DmlStatement {
- table_name,
- op: WriteOp::InsertOverwrite,
- input,
- ..
- }) => {
- let name = table_name.table();
- let schema =
session_state.schema_for_ref(table_name.clone())?;
- if let Some(provider) = schema.table(name).await? {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- provider.insert_into(session_state, input_exec,
true).await
- } else {
- return exec_err!(
- "Table '{table_name}' does not exist"
- );
- }
- }
- LogicalPlan::Values(Values {
- values,
- schema,
- }) => {
- let exec_schema = schema.as_ref().to_owned().into();
- let exprs = values.iter()
- .map(|row| {
- row.iter().map(|expr| {
- self.create_physical_expr(
- expr,
- schema,
- session_state,
- )
+ }
+ current_index = parent_index;
+ }
+ // Only one task should ever reach this point for a valid LogicalPlan
tree.
+ Ok(Some(plan))
+ }
+
+ /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan
counterpart.
+ async fn map_logical_node_to_physical(
+ &self,
+ node: &LogicalPlan,
+ session_state: &SessionState,
+ // TODO: refactor to not use Vec? Wasted for leaves/1 child
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let exec_node: Arc<dyn ExecutionPlan> = match node {
+ // Leaves (no children)
+ LogicalPlan::TableScan(TableScan {
+ source,
+ projection,
+ filters,
+ fetch,
+ ..
+ }) => {
+ let source = source_as_provider(source)?;
+ // Remove all qualifiers from the scan as the provider
+ // doesn't know (nor should care) how the relation was
+ // referred to in the query
+ let filters = unnormalize_cols(filters.iter().cloned());
+ source
+ .scan(session_state, projection.as_ref(), &filters, *fetch)
+ .await?
+ }
+ LogicalPlan::Values(Values { values, schema }) => {
+ let exec_schema = schema.as_ref().to_owned().into();
+ let exprs = values
+ .iter()
+ .map(|row| {
+ row.iter()
+ .map(|expr| {
+ self.create_physical_expr(expr, schema,
session_state)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
- })
- .collect::<Result<Vec<_>>>()?;
- let value_exec = ValuesExec::try_new(
- SchemaRef::new(exec_schema),
- exprs,
- )?;
- Ok(Arc::new(value_exec))
- }
- LogicalPlan::Window(Window {
- input, window_expr, ..
- }) => {
- if window_expr.is_empty() {
- return internal_err!(
- "Impossibly got empty window expression"
- );
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let value_exec =
ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?;
+ Arc::new(value_exec)
+ }
+ LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: false,
+ schema,
+ }) => Arc::new(EmptyExec::new(SchemaRef::new(
+ schema.as_ref().to_owned().into(),
+ ))),
+ LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: true,
+ schema,
+ }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new(
+ schema.as_ref().to_owned().into(),
+ ))),
+ LogicalPlan::DescribeTable(DescribeTable {
+ schema,
+ output_schema,
+ }) => {
+ let output_schema: Schema = output_schema.as_ref().into();
+ self.plan_describe(schema.clone(), Arc::new(output_schema))?
+ }
+
+ // 1 Child
+ LogicalPlan::Copy(CopyTo {
+ input,
+ output_url,
+ format_options,
+ partition_by,
+ options: source_option_tuples,
+ }) => {
+ let input_exec = children.pop().unwrap();
+ let parsed_url = ListingTableUrl::parse(output_url)?;
+ let object_store_url = parsed_url.object_store();
+
+ let schema: Schema = (**input.schema()).clone().into();
+
+ // Note: the DataType passed here is ignored for the purposes
of writing and inferred instead
+ // from the schema of the RecordBatch being written. This
allows COPY statements to specify only
+ // the column name rather than column name + explicit data
type.
+ let table_partition_cols = partition_by
+ .iter()
+ .map(|s| (s.to_string(), arrow_schema::DataType::Null))
+ .collect::<Vec<_>>();
+
+ // Set file sink related options
+ let config = FileSinkConfig {
+ object_store_url,
+ table_paths: vec![parsed_url],
+ file_groups: vec![],
+ output_schema: Arc::new(schema),
+ table_partition_cols,
+ overwrite: false,
+ };
+ let mut table_options = session_state.default_table_options();
+ let sink_format: Arc<dyn FileFormat> = match format_options {
+ FormatOptions::CSV(options) => {
+ table_options.csv = options.clone();
+ table_options.set_file_format(FileType::CSV);
+
table_options.alter_with_string_hash_map(source_option_tuples)?;
+
Arc::new(CsvFormat::default().with_options(table_options.csv))
}
+ FormatOptions::JSON(options) => {
+ table_options.json = options.clone();
+ table_options.set_file_format(FileType::JSON);
+
table_options.alter_with_string_hash_map(source_option_tuples)?;
+
Arc::new(JsonFormat::default().with_options(table_options.json))
+ }
+ #[cfg(feature = "parquet")]
+ FormatOptions::PARQUET(options) => {
+ table_options.parquet = options.clone();
+ table_options.set_file_format(FileType::PARQUET);
+
table_options.alter_with_string_hash_map(source_option_tuples)?;
+ Arc::new(
+
ParquetFormat::default().with_options(table_options.parquet),
+ )
+ }
+ FormatOptions::AVRO => Arc::new(AvroFormat {}),
+ FormatOptions::ARROW => Arc::new(ArrowFormat {}),
+ };
- let input_exec = self.create_initial_plan(input,
session_state).await?;
+ sink_format
+ .create_writer_physical_plan(input_exec, session_state,
config, None)
+ .await?
+ }
+ LogicalPlan::Dml(DmlStatement {
+ table_name,
+ op: WriteOp::InsertInto,
+ ..
+ }) => {
+ let name = table_name.table();
+ let schema = session_state.schema_for_ref(table_name.clone())?;
+ if let Some(provider) = schema.table(name).await? {
+ let input_exec = children.pop().unwrap();
+ provider
+ .insert_into(session_state, input_exec, false)
+ .await?
+ } else {
+ return exec_err!("Table '{table_name}' does not exist");
+ }
+ }
+ LogicalPlan::Dml(DmlStatement {
+ table_name,
+ op: WriteOp::InsertOverwrite,
+ ..
+ }) => {
+ let name = table_name.table();
+ let schema = session_state.schema_for_ref(table_name.clone())?;
+ if let Some(provider) = schema.table(name).await? {
+ let input_exec = children.pop().unwrap();
+ provider
+ .insert_into(session_state, input_exec, true)
+ .await?
+ } else {
+ return exec_err!("Table '{table_name}' does not exist");
+ }
+ }
+ LogicalPlan::Window(Window {
+ input, window_expr, ..
+ }) => {
+ if window_expr.is_empty() {
+ return internal_err!("Impossibly got empty window
expression");
+ }
- // at this moment we are guaranteed by the logical planner
- // to have all the window_expr to have equal sort key
- let partition_keys =
window_expr_common_partition_keys(window_expr)?;
+ let input_exec = children.pop().unwrap();
- let can_repartition = !partition_keys.is_empty()
- && session_state.config().target_partitions() > 1
- &&
session_state.config().repartition_window_functions();
+ // at this moment we are guaranteed by the logical planner
+ // to have all the window_expr to have equal sort key
+ let partition_keys =
window_expr_common_partition_keys(window_expr)?;
- let physical_partition_keys = if can_repartition
- {
- partition_keys
- .iter()
- .map(|e| {
- self.create_physical_expr(
- e,
- input.schema(),
- session_state,
- )
- })
- .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
- } else {
- vec![]
- };
+ let can_repartition = !partition_keys.is_empty()
+ && session_state.config().target_partitions() > 1
+ && session_state.config().repartition_window_functions();
- let get_sort_keys = |expr: &Expr| match expr {
- Expr::WindowFunction(WindowFunction{
- ref partition_by,
- ref order_by,
- ..
- }) => generate_sort_key(partition_by, order_by),
- Expr::Alias(Alias{expr,..}) => {
- // Convert &Box<T> to &T
- match &**expr {
- Expr::WindowFunction(WindowFunction{
- ref partition_by,
- ref order_by,
- ..}) => generate_sort_key(partition_by,
order_by),
- _ => unreachable!(),
- }
+ let physical_partition_keys = if can_repartition {
+ partition_keys
+ .iter()
+ .map(|e| {
+ self.create_physical_expr(e, input.schema(),
session_state)
+ })
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
+ } else {
+ vec![]
+ };
+
+ let get_sort_keys = |expr: &Expr| match expr {
+ Expr::WindowFunction(WindowFunction {
+ ref partition_by,
+ ref order_by,
+ ..
+ }) => generate_sort_key(partition_by, order_by),
+ Expr::Alias(Alias { expr, .. }) => {
+ // Convert &Box<T> to &T
+ match &**expr {
+ Expr::WindowFunction(WindowFunction {
+ ref partition_by,
+ ref order_by,
+ ..
+ }) => generate_sort_key(partition_by, order_by),
+ _ => unreachable!(),
}
- _ => unreachable!(),
- };
- let sort_keys = get_sort_keys(&window_expr[0])?;
- if window_expr.len() > 1 {
- debug_assert!(
+ }
+ _ => unreachable!(),
+ };
+ let sort_keys = get_sort_keys(&window_expr[0])?;
+ if window_expr.len() > 1 {
+ debug_assert!(
window_expr[1..]
.iter()
.all(|expr| get_sort_keys(expr).unwrap() ==
sort_keys),
"all window expressions shall have the same sort
keys, as guaranteed by logical planning"
);
- }
-
- let logical_schema = logical_plan.schema();
- let window_expr = window_expr
- .iter()
- .map(|e| {
- create_window_expr(
- e,
- logical_schema,
- session_state.execution_props(),
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- let uses_bounded_memory = window_expr
- .iter()
- .all(|e| e.uses_bounded_memory());
- // If all window expressions can run with bounded memory,
- // choose the bounded window variant:
- Ok(if uses_bounded_memory {
- Arc::new(BoundedWindowAggExec::try_new(
- window_expr,
- input_exec,
- physical_partition_keys,
- InputOrderMode::Sorted,
- )?)
- } else {
- Arc::new(WindowAggExec::try_new(
- window_expr,
- input_exec,
- physical_partition_keys,
- )?)
- })
}
- LogicalPlan::Aggregate(Aggregate {
- input,
- group_expr,
- aggr_expr,
- ..
- }) => {
- // Initially need to perform the aggregate and then merge
the partitions
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- let physical_input_schema = input_exec.schema();
- let logical_input_schema = input.as_ref().schema();
-
- let groups = self.create_grouping_physical_expr(
- group_expr,
- logical_input_schema,
- &physical_input_schema,
- session_state)?;
-
- let agg_filter = aggr_expr
- .iter()
- .map(|e| {
- create_aggregate_expr_and_maybe_filter(
- e,
- logical_input_schema,
- &physical_input_schema,
- session_state.execution_props(),
- )
- })
- .collect::<Result<Vec<_>>>()?;
- let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>,
Vec<_>) = multiunzip(agg_filter);
+ let logical_schema = node.schema();
+ let window_expr = window_expr
+ .iter()
+ .map(|e| {
+ create_window_expr(
+ e,
+ logical_schema,
+ session_state.execution_props(),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
- let initial_aggr = Arc::new(AggregateExec::try_new(
- AggregateMode::Partial,
- groups.clone(),
- aggregates.clone(),
- filters.clone(),
+ let uses_bounded_memory =
+ window_expr.iter().all(|e| e.uses_bounded_memory());
+ // If all window expressions can run with bounded memory,
+ // choose the bounded window variant:
+ if uses_bounded_memory {
+ Arc::new(BoundedWindowAggExec::try_new(
+ window_expr,
input_exec,
- physical_input_schema.clone(),
- )?);
-
- // update group column indices based on partial aggregate
plan evaluation
- let final_group: Vec<Arc<dyn PhysicalExpr>> =
initial_aggr.output_group_expr();
-
- let can_repartition = !groups.is_empty()
- && session_state.config().target_partitions() > 1
- && session_state.config().repartition_aggregations();
-
- // Some aggregators may be modified during initialization
for
- // optimization purposes. For example, a FIRST_VALUE may
turn
- // into a LAST_VALUE with the reverse ordering requirement.
- // To reflect such changes to subsequent stages, use the
updated
- // `AggregateExpr`/`PhysicalSortExpr` objects.
- let updated_aggregates = initial_aggr.aggr_expr().to_vec();
-
- let next_partition_mode = if can_repartition {
- // construct a second aggregation with
'AggregateMode::FinalPartitioned'
- AggregateMode::FinalPartitioned
- } else {
- // construct a second aggregation, keeping the final
column name equal to the
- // first aggregation and the expressions corresponding
to the respective aggregate
- AggregateMode::Final
- };
-
- let final_grouping_set = PhysicalGroupBy::new_single(
- final_group
- .iter()
- .enumerate()
- .map(|(i, expr)| (expr.clone(),
groups.expr()[i].1.clone()))
- .collect()
- );
-
- Ok(Arc::new(AggregateExec::try_new(
- next_partition_mode,
- final_grouping_set,
- updated_aggregates,
- filters,
- initial_aggr,
- physical_input_schema.clone(),
- )?))
+ physical_partition_keys,
+ InputOrderMode::Sorted,
+ )?)
+ } else {
+ Arc::new(WindowAggExec::try_new(
+ window_expr,
+ input_exec,
+ physical_partition_keys,
+ )?)
}
- LogicalPlan::Projection(Projection { input, expr, .. }) => {
- let input_exec = self.create_initial_plan(input,
session_state).await?;
- let input_schema = input.as_ref().schema();
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ ..
+ }) => {
+ // Initially need to perform the aggregate and then merge the
partitions
+ let input_exec = children.pop().unwrap();
+ let physical_input_schema = input_exec.schema();
+ let logical_input_schema = input.as_ref().schema();
+
+ let groups = self.create_grouping_physical_expr(
+ group_expr,
+ logical_input_schema,
+ &physical_input_schema,
+ session_state,
+ )?;
- let physical_exprs = expr
- .iter()
- .map(|e| {
- // For projections, SQL planner and logical plan
builder may convert user
- // provided expressions into logical Column
expressions if their results
- // are already provided from the input plans.
Because we work with
- // qualified columns in logical plane, derived
columns involve operators or
- // functions will contain qualifiers as well. This
will result in logical
- // columns with names like `SUM(t1.c1)`, `t1.c1 +
t1.c2`, etc.
- //
- // If we run these logical columns through
physical_name function, we will
- // get physical names with column qualifiers,
which violates DataFusion's
- // field name semantics. To account for this, we
need to derive the
- // physical name from physical input instead.
- //
- // This depends on the invariant that logical
schema field index MUST match
- // with physical schema field index.
- let physical_name = if let Expr::Column(col) = e {
- match input_schema.index_of_column(col) {
- Ok(idx) => {
- // index physical field using logical
field index
-
Ok(input_exec.schema().field(idx).name().to_string())
- }
- // logical column is not a derived column,
safe to pass along to
- // physical_name
- Err(_) => physical_name(e),
- }
- } else {
- physical_name(e)
- };
+ let agg_filter = aggr_expr
+ .iter()
+ .map(|e| {
+ create_aggregate_expr_and_maybe_filter(
+ e,
+ logical_input_schema,
+ &physical_input_schema,
+ session_state.execution_props(),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
- tuple_err((
+ let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>,
Vec<_>) =
+ multiunzip(agg_filter);
+
+ let initial_aggr = Arc::new(AggregateExec::try_new(
+ AggregateMode::Partial,
+ groups.clone(),
+ aggregates.clone(),
+ filters.clone(),
+ input_exec,
+ physical_input_schema.clone(),
+ )?);
+
+ // update group column indices based on partial aggregate plan
evaluation
+ let final_group: Vec<Arc<dyn PhysicalExpr>> =
+ initial_aggr.output_group_expr();
+
+ let can_repartition = !groups.is_empty()
+ && session_state.config().target_partitions() > 1
+ && session_state.config().repartition_aggregations();
+
+ // Some aggregators may be modified during initialization for
+ // optimization purposes. For example, a FIRST_VALUE may turn
+ // into a LAST_VALUE with the reverse ordering requirement.
+ // To reflect such changes to subsequent stages, use the
updated
+ // `AggregateExpr`/`PhysicalSortExpr` objects.
+ let updated_aggregates = initial_aggr.aggr_expr().to_vec();
+
+ let next_partition_mode = if can_repartition {
+ // construct a second aggregation with
'AggregateMode::FinalPartitioned'
+ AggregateMode::FinalPartitioned
+ } else {
+ // construct a second aggregation, keeping the final
column name equal to the
+ // first aggregation and the expressions corresponding to
the respective aggregate
+ AggregateMode::Final
+ };
+
+ let final_grouping_set = PhysicalGroupBy::new_single(
+ final_group
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| (expr.clone(),
groups.expr()[i].1.clone()))
+ .collect(),
+ );
+
+ Arc::new(AggregateExec::try_new(
+ next_partition_mode,
+ final_grouping_set,
+ updated_aggregates,
+ filters,
+ initial_aggr,
+ physical_input_schema.clone(),
+ )?)
+ }
+ LogicalPlan::Projection(Projection { input, expr, .. }) => self
+ .create_project_physical_exec(
+ session_state,
+ children.pop().unwrap(),
+ input,
+ expr,
+ )?,
+ LogicalPlan::Filter(Filter {
+ predicate, input, ..
+ }) => {
+ let physical_input = children.pop().unwrap();
+ let input_dfschema = input.schema();
+
+ let runtime_expr =
+ self.create_physical_expr(predicate, input_dfschema,
session_state)?;
+ let selectivity = session_state
+ .config()
+ .options()
+ .optimizer
+ .default_filter_selectivity;
+ let filter = FilterExec::try_new(runtime_expr,
physical_input)?;
+ Arc::new(filter.with_default_selectivity(selectivity)?)
+ }
+ LogicalPlan::Repartition(Repartition {
+ input,
+ partitioning_scheme,
+ }) => {
+ let physical_input = children.pop().unwrap();
+ let input_dfschema = input.as_ref().schema();
+ let physical_partitioning = match partitioning_scheme {
+ LogicalPartitioning::RoundRobinBatch(n) => {
+ Partitioning::RoundRobinBatch(*n)
+ }
+ LogicalPartitioning::Hash(expr, n) => {
+ let runtime_expr = expr
+ .iter()
+ .map(|e| {
self.create_physical_expr(
e,
- input_schema,
+ input_dfschema,
session_state,
- ),
- physical_name,
- ))
- })
- .collect::<Result<Vec<_>>>()?;
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Partitioning::Hash(runtime_expr, *n)
+ }
+ LogicalPartitioning::DistributeBy(_) => {
+ return not_impl_err!(
+ "Physical plan does not support DistributeBy
partitioning"
+ );
+ }
+ };
+ Arc::new(RepartitionExec::try_new(
+ physical_input,
+ physical_partitioning,
+ )?)
+ }
+ LogicalPlan::Sort(Sort {
+ expr, input, fetch, ..
+ }) => {
+ let physical_input = children.pop().unwrap();
+ let input_dfschema = input.as_ref().schema();
+ let sort_expr = create_physical_sort_exprs(
+ expr,
+ input_dfschema,
+ session_state.execution_props(),
+ )?;
+ let new_sort =
+ SortExec::new(sort_expr,
physical_input).with_fetch(*fetch);
+ Arc::new(new_sort)
+ }
+ LogicalPlan::Subquery(_) => todo!(),
+ LogicalPlan::SubqueryAlias(_) => children.pop().unwrap(),
+ LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
+ let input = children.pop().unwrap();
+
+ // GlobalLimitExec requires a single partition for input
+ let input = if input.output_partitioning().partition_count()
== 1 {
+ input
+ } else {
+ // Apply a LocalLimitExec to each partition. The optimizer
will also insert
+ // a CoalescePartitionsExec between the GlobalLimitExec
and LocalLimitExec
+ if let Some(fetch) = fetch {
+ Arc::new(LocalLimitExec::new(input, *fetch + skip))
+ } else {
+ input
+ }
+ };
- Ok(Arc::new(ProjectionExec::try_new(
- physical_exprs,
- input_exec,
- )?))
- }
- LogicalPlan::Filter(filter) => {
- let physical_input =
self.create_initial_plan(&filter.input, session_state).await?;
- let input_dfschema = filter.input.schema();
+ Arc::new(GlobalLimitExec::new(input, *skip, *fetch))
+ }
+ LogicalPlan::Unnest(Unnest {
+ column,
+ schema,
+ options,
+ ..
+ }) => {
+ let input = children.pop().unwrap();
+ let column_exec = schema
+ .index_of_column(column)
+ .map(|idx| Column::new(&column.name, idx))?;
+ let schema = SchemaRef::new(schema.as_ref().to_owned().into());
+ Arc::new(UnnestExec::new(input, column_exec, schema,
options.clone()))
+ }
- let runtime_expr = self.create_physical_expr(
- &filter.predicate,
- input_dfschema,
- session_state,
- )?;
- let selectivity =
session_state.config().options().optimizer.default_filter_selectivity;
- let filter = FilterExec::try_new(runtime_expr,
physical_input)?;
- Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
- }
- LogicalPlan::Union(Union { inputs, schema: _ }) => {
- let physical_plans =
self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()),
session_state).await?;
+ // 2 Children
+ LogicalPlan::Join(Join {
+ left,
+ right,
+ on: keys,
+ filter,
+ join_type,
+ null_equals_null,
+ schema: join_schema,
+ ..
+ }) => {
+ let null_equals_null = *null_equals_null;
+
+ let physical_right = children.pop().unwrap();
+ let physical_left = children.pop().unwrap();
+
+ // If join has expression equijoin keys, add physical
projection.
+ let has_expr_join_key = keys.iter().any(|(l, r)| {
+ !(matches!(l, Expr::Column(_)) && matches!(r,
Expr::Column(_)))
+ });
+ let (new_logical, physical_left, physical_right) = if
has_expr_join_key {
+ // TODO: Can we extract this transformation to somewhere
before physical plan
+ // creation?
+ let (left_keys, right_keys): (Vec<_>, Vec<_>) =
+ keys.iter().cloned().unzip();
+
+ let (left, left_col_keys, left_projected) =
+ wrap_projection_for_join_if_necessary(
+ &left_keys,
+ left.as_ref().clone(),
+ )?;
+ let (right, right_col_keys, right_projected) =
+ wrap_projection_for_join_if_necessary(
+ &right_keys,
+ right.as_ref().clone(),
+ )?;
+ let column_on = (left_col_keys, right_col_keys);
+
+ let left = Arc::new(left);
+ let right = Arc::new(right);
+ let new_join =
LogicalPlan::Join(Join::try_new_with_project_input(
+ node,
+ left.clone(),
+ right.clone(),
+ column_on,
+ )?);
- Ok(Arc::new(UnionExec::new(physical_plans)))
- }
- LogicalPlan::Repartition(Repartition {
- input,
- partitioning_scheme,
- }) => {
- let physical_input = self.create_initial_plan(input,
session_state).await?;
- let input_dfschema = input.as_ref().schema();
- let physical_partitioning = match partitioning_scheme {
- LogicalPartitioning::RoundRobinBatch(n) => {
- Partitioning::RoundRobinBatch(*n)
- }
- LogicalPartitioning::Hash(expr, n) => {
- let runtime_expr = expr
- .iter()
- .map(|e| {
- self.create_physical_expr(
- e,
- input_dfschema,
- session_state,
- )
- })
- .collect::<Result<Vec<_>>>()?;
- Partitioning::Hash(runtime_expr, *n)
- }
- LogicalPartitioning::DistributeBy(_) => {
- return not_impl_err!("Physical plan does not
support DistributeBy partitioning");
- }
+ // If inputs were projected then create ExecutionPlan for
these new
+ // LogicalPlan nodes.
+ let physical_left = match (left_projected, left.as_ref()) {
+ // If left_projected is true we are guaranteed that
left is a Projection
+ (
+ true,
+ LogicalPlan::Projection(Projection { input, expr,
.. }),
+ ) => self.create_project_physical_exec(
+ session_state,
+ physical_left,
+ input,
+ expr,
+ )?,
+ _ => physical_left,
Review Comment:
This is kinda nasty, as I mentioned here
https://github.com/apache/arrow-datafusion/issues/9573#issuecomment-2041357274
Maybe can split this off too?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -496,776 +515,957 @@ impl DefaultPhysicalPlanner {
Self { extension_planners }
}
- /// Create a physical plans for multiple logical plans.
- ///
- /// This is the same as [`create_initial_plan`](Self::create_initial_plan)
but runs the planning concurrently.
- ///
- /// The result order is the same as the input order.
- fn create_initial_plan_multi<'a>(
- &'a self,
- logical_plans: impl IntoIterator<Item = &'a LogicalPlan> + Send + 'a,
- session_state: &'a SessionState,
- ) -> BoxFuture<'a, Result<Vec<Arc<dyn ExecutionPlan>>>> {
- async move {
- // First build futures with as little references as possible, then
performing some stream magic.
- // Otherwise rustc bails out w/:
- //
- // error: higher-ranked lifetime error
- // ...
- // note: could not prove `[async block@...]: std::marker::Send`
- let futures = logical_plans
- .into_iter()
- .enumerate()
- .map(|(idx, lp)| async move {
- let plan = self.create_initial_plan(lp,
session_state).await?;
- Ok((idx, plan)) as Result<_>
- })
- .collect::<Vec<_>>();
-
- let mut physical_plans = futures::stream::iter(futures)
- .buffer_unordered(
- session_state
- .config_options()
- .execution
- .planning_concurrency,
- )
- .try_collect::<Vec<(usize, Arc<dyn ExecutionPlan>)>>()
- .await?;
- physical_plans.sort_by_key(|(idx, _plan)| *idx);
- let physical_plans = physical_plans
- .into_iter()
- .map(|(_idx, plan)| plan)
- .collect::<Vec<_>>();
- Ok(physical_plans)
- }
- .boxed()
+ /// Create a physical plan from a logical plan
+ async fn create_initial_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // DFS the tree to flatten it into a Vec.
+ // This will allow us to build the Physical Plan from the leaves up
+ // to avoid recursion, and also to make it easier to build a valid
+ // Physical Plan from the start and not rely on some intermediate
+ // representation (since parents need to know their children at
+ // construction time).
+ let mut flat_tree = vec![];
+ let mut dfs_visit_stack = vec![(None, logical_plan)];
+ // Use this to be able to find the leaves to start construction bottom
+ // up concurrently.
+ let mut flat_tree_leaf_indices = vec![];
+ while let Some((parent_index, node)) = dfs_visit_stack.pop() {
+ let current_index = flat_tree.len();
+ // Because of how we extend the visit stack here, we visit the
children
+ // in reverse order of how they appeart, so later we need to
reverse
+ // the order of children when building the nodes.
+ dfs_visit_stack
+ .extend(node.inputs().iter().map(|&n| (Some(current_index),
n)));
+ let state = match node.inputs().len() {
+ 0 => {
+ flat_tree_leaf_indices.push(current_index);
+ NodeState::ZeroOrOneChild
+ }
+ 1 => NodeState::ZeroOrOneChild,
+ _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])),
+ };
+ let node = LogicalNode {
+ node,
+ parent_index,
+ state,
+ };
+ flat_tree.push(node);
+ }
+ let flat_tree = Arc::new(flat_tree);
+
+ let planning_concurrency = session_state
+ .config_options()
+ .execution
+ .planning_concurrency;
+ // Can never spawn more tasks than leaves in the tree, as these tasks
must
+ // all converge down to the root node, which can only be processed by a
+ // single task.
+ let max_concurrency =
planning_concurrency.min(flat_tree_leaf_indices.len());
Review Comment:
I think this may be more accurate than what is currently present:
https://github.com/apache/arrow-datafusion/blob/215f30f74a12e91fd7dca0d30e37014c8c3493ed/datafusion/core/src/physical_planner.rs#L499-L542
Because the current `create_initial_plan_multi` could be called multiple
times, and this `planning_concurrency` is only enforced within this function
call, so if its called multiple times it can spawn more tasks than is
configured by `planning_concurrency`. Maybe this is intended?
Either way, with this new code, it will actually limit how many tasks are
building the tree for the entire initial planning process.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]