This is an automated email from the ASF dual-hosted git repository. blaginin pushed a commit to branch annarose/dict-coercion in repository https://gitbox.apache.org/repos/asf/datafusion-sandbox.git
commit d3ac7a351a3ad0e97f64d700d532bacab7b419dc Author: Albert Skalt <[email protected]> AuthorDate: Thu Feb 5 00:30:22 2026 +0300 Wrap immutable plan parts into Arc (make creating `ExecutionPlan`s less costly) (#19893) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes https://github.com/apache/datafusion/issues/19852 ## Rationale for this change Improve performance of query planning and plan state re-set by making node clone cheap. ## What changes are included in this PR? - Store projection as `Option<Arc<[usize]>>` instead of `Option<Vec<usize>>` in `FilterExec`, `HashJoinExec`, `NestedLoopJoinExec`. - Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in `ProjectionExprs`. - Store arced aggregation, filter, group by expressions within `AggregateExec`. --- datafusion/common/src/stats.rs | 13 +- datafusion/common/src/utils/mod.rs | 4 +- datafusion/core/src/physical_planner.rs | 2 +- .../tests/physical_optimizer/join_selection.rs | 2 +- datafusion/physical-expr/src/projection.rs | 95 +++++-- .../physical-optimizer/src/enforce_distribution.rs | 44 ++-- .../src/enforce_sorting/sort_pushdown.rs | 2 +- .../physical-optimizer/src/join_selection.rs | 50 ++-- .../physical-optimizer/src/projection_pushdown.rs | 2 +- datafusion/physical-plan/src/aggregates/mod.rs | 43 ++-- .../physical-plan/src/aggregates/no_grouping.rs | 6 +- .../physical-plan/src/aggregates/row_hash.rs | 12 +- .../physical-plan/src/aggregates/topk_stream.rs | 4 +- datafusion/physical-plan/src/common.rs | 2 +- datafusion/physical-plan/src/filter.rs | 60 ++--- .../physical-plan/src/joins/hash_join/exec.rs | 276 ++++++++++++++------- .../physical-plan/src/joins/hash_join/mod.rs | 2 +- datafusion/physical-plan/src/joins/mod.rs | 6 +- .../physical-plan/src/joins/nested_loop_join.rs | 150 +++++++---- datafusion/physical-plan/src/joins/utils.rs | 4 +- datafusion/physical-plan/src/projection.rs | 18 +- datafusion/proto/src/physical_plan/mod.rs | 2 +- 22 files changed, 509 insertions(+), 290 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392..cecf1d034 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,8 +391,13 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self { - let Some(projection) = projection else { + pub fn project(self, projection: Option<&impl AsRef<[usize]>>) -> Self { + let projection = projection.map(AsRef::as_ref); + self.project_impl(projection) + } + + fn project_impl(mut self, projection: Option<&[usize]>) -> Self { + let Some(projection) = projection.map(AsRef::as_ref) else { return self; }; @@ -410,7 +415,7 @@ impl Statistics { .map(Slot::Present) .collect(); - for idx in projection { + for idx in projection.iter() { let next_idx = self.column_statistics.len(); let slot = std::mem::replace( columns.get_mut(*idx).expect("projection out of bounds"), @@ -1066,7 +1071,7 @@ mod tests { #[test] fn test_project_none() { - let projection = None; + let projection: Option<Vec<usize>> = None; let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); assert_eq!(stats, make_stats(vec![10, 20, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 03310a7bd..016b188e3 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -70,10 +70,10 @@ use std::thread::available_parallelism; /// ``` pub fn project_schema( schema: &SchemaRef, - projection: Option<&Vec<usize>>, + projection: Option<&impl AsRef<[usize]>>, ) -> Result<SchemaRef> { let schema = match projection { - Some(columns) => Arc::new(schema.project(columns)?), + Some(columns) => Arc::new(schema.project(columns.as_ref())?), None => Arc::clone(schema), }; Ok(schema) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b1aa85028..6765b7f79 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1007,7 +1007,7 @@ impl DefaultPhysicalPlanner { // project the output columns excluding the async functions // The async functions are always appended to the end of the schema. .apply_projection(Some( - (0..input.schema().fields().len()).collect(), + (0..input.schema().fields().len()).collect::<Vec<_>>(), ))? .with_batch_size(session_state.config().batch_size()) .build()? diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 9234a9559..b640159ca 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); - assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!(swapped_join.projection.as_deref().unwrap(), &[0_usize]); assert_eq!(swapped.schema().fields.len(), 1); assert_eq!(swapped.schema().fields[0].name(), "small_col"); Ok(()) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c..ae83a7462 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -29,7 +29,8 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::stats::{ColumnStatistics, Precision}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ - Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err, + Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err, + plan_err, }; use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet; @@ -125,7 +126,8 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) { /// indices. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionExprs { - exprs: Vec<ProjectionExpr>, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + exprs: Arc<[ProjectionExpr]>, } impl std::fmt::Display for ProjectionExprs { @@ -137,14 +139,16 @@ impl std::fmt::Display for ProjectionExprs { impl From<Vec<ProjectionExpr>> for ProjectionExprs { fn from(value: Vec<ProjectionExpr>) -> Self { - Self { exprs: value } + Self { + exprs: value.into(), + } } } impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { - exprs: value.to_vec(), + exprs: value.iter().cloned().collect(), } } } @@ -152,7 +156,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { impl FromIterator<ProjectionExpr> for ProjectionExprs { fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self { Self { - exprs: exprs.into_iter().collect::<Vec<_>>(), + exprs: exprs.into_iter().collect(), } } } @@ -164,12 +168,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs { } impl ProjectionExprs { - pub fn new<I>(exprs: I) -> Self - where - I: IntoIterator<Item = ProjectionExpr>, - { + /// Make a new [`ProjectionExprs`] from expressions iterator. + pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self { + Self { + exprs: exprs.into_iter().collect(), + } + } + + /// Make a new [`ProjectionExprs`] from expressions. + pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self { Self { - exprs: exprs.into_iter().collect::<Vec<_>>(), + exprs: exprs.into(), } } @@ -285,13 +294,14 @@ impl ProjectionExprs { { let exprs = self .exprs - .into_iter() + .iter() + .cloned() .map(|mut proj| { proj.expr = f(proj.expr)?; Ok(proj) }) - .collect::<Result<Vec<_>>>()?; - Ok(Self::new(exprs)) + .collect::<Result<Arc<_>>>()?; + Ok(Self::from_expressions(exprs)) } /// Apply another projection on top of this projection, returning the combined projection. @@ -361,7 +371,7 @@ impl ProjectionExprs { /// applied on top of this projection. pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> { let mut new_exprs = Vec::with_capacity(other.exprs.len()); - for proj_expr in &other.exprs { + for proj_expr in other.exprs.iter() { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? .ok_or_else(|| { internal_datafusion_err!( @@ -602,12 +612,12 @@ impl ProjectionExprs { /// ``` pub fn project_statistics( &self, - mut stats: datafusion_common::Statistics, + mut stats: Statistics, output_schema: &Schema, - ) -> Result<datafusion_common::Statistics> { + ) -> Result<Statistics> { let mut column_statistics = vec![]; - for proj_expr in &self.exprs { + for proj_expr in self.exprs.iter() { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() { std::mem::take(&mut stats.column_statistics[col.index()]) @@ -754,13 +764,52 @@ impl Projector { } } -impl IntoIterator for ProjectionExprs { - type Item = ProjectionExpr; - type IntoIter = std::vec::IntoIter<ProjectionExpr>; +/// Describes an immutable reference counted projection. +/// +/// This structure represents projecting a set of columns by index. +/// [`Arc`] is used to make it cheap to clone. +pub type ProjectionRef = Arc<[usize]>; - fn into_iter(self) -> Self::IntoIter { - self.exprs.into_iter() - } +/// Combine two projections. +/// +/// If `p1` is [`None`] then there are no changes. +/// Otherwise, if passed `p2` is not [`None`] then it is remapped +/// according to the `p1`. Otherwise, there are no changes. +/// +/// # Example +/// +/// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`, +/// then the resulting projection will be [0, 3]. +/// +/// # Error +/// +/// Returns an internal error if `p1` contains index that is greater than `p2` len. +/// +pub fn combine_projections( + p1: Option<&ProjectionRef>, + p2: Option<&ProjectionRef>, +) -> Result<Option<ProjectionRef>> { + let Some(p1) = p1 else { + return Ok(None); + }; + let Some(p2) = p2 else { + return Ok(Some(Arc::clone(p1))); + }; + + Ok(Some( + p1.iter() + .map(|i| { + let idx = *i; + assert_or_internal_err!( + idx < p2.len(), + "unable to apply projection: index {} is greater than new projection len {}", + idx, + p2.len(), + ); + Ok(p2[*i]) + }) + .collect::<Result<Arc<[usize]>>>()?, + )) } /// The function operates in two modes: diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index acb1c5880..790669b5c 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -49,7 +49,7 @@ use datafusion_physical_plan::aggregates::{ use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::joins::{ - CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec, + CrossJoinExec, HashJoinExec, HashJoinExecBuilder, PartitionMode, SortMergeJoinExec, }; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -305,18 +305,19 @@ pub fn adjust_input_keys_ordering( Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>, )| { - HashJoinExec::try_new( + HashJoinExecBuilder::new( Arc::clone(left), Arc::clone(right), new_conditions.0, - filter.clone(), - join_type, - // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter. - projection.clone(), - PartitionMode::Partitioned, - *null_equality, - *null_aware, + *join_type, ) + .with_filter(filter.clone()) + // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter. + .with_projection_ref(projection.clone()) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(*null_equality) + .with_null_aware(*null_aware) + .build() .map(|e| Arc::new(e) as _) }; return reorder_partitioned_join_keys( @@ -638,17 +639,20 @@ pub fn reorder_join_keys_to_inputs( right_keys, } = join_keys; let new_join_on = new_join_conditions(&left_keys, &right_keys); - return Ok(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - new_join_on, - filter.clone(), - join_type, - projection.clone(), - PartitionMode::Partitioned, - *null_equality, - *null_aware, - )?)); + return Ok(Arc::new( + HashJoinExecBuilder::new( + Arc::clone(left), + Arc::clone(right), + new_join_on, + *join_type, + ) + .with_filter(filter.clone()) + .with_projection_ref(projection.clone()) + .with_partition_mode(PartitionMode::Partitioned) + .with_null_equality(*null_equality) + .with_null_aware(*null_aware) + .build()?, + )); } } } else if let Some(SortMergeJoinExec { diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 698fdea8e..2dc61ba24 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -723,7 +723,7 @@ fn handle_hash_join( .collect(); let column_indices = build_join_column_index(plan); - let projected_indices: Vec<_> = if let Some(projection) = &plan.projection { + let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() { projection.iter().map(|&i| &column_indices[i]).collect() } else { column_indices.iter().collect() diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 7412d0ba9..02ef378d7 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::joins::utils::ColumnIndex; use datafusion_physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, + CrossJoinExec, HashJoinExec, HashJoinExecBuilder, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, SymmetricHashJoinExec, }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -191,30 +191,18 @@ pub(crate) fn try_collect_left( { Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?)) } else { - Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - PartitionMode::CollectLeft, - hash_join.null_equality(), - hash_join.null_aware, - )?))) + Ok(Some(Arc::new( + HashJoinExecBuilder::from(hash_join) + .with_partition_mode(PartitionMode::CollectLeft) + .build()?, + ))) } } - (true, false) => Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - PartitionMode::CollectLeft, - hash_join.null_equality(), - hash_join.null_aware, - )?))), + (true, false) => Ok(Some(Arc::new( + HashJoinExecBuilder::from(hash_join) + .with_partition_mode(PartitionMode::CollectLeft) + .build()?, + ))), (false, true) => { // Don't swap null-aware anti joins as they have specific side requirements if hash_join.join_type().supports_swap() && !hash_join.null_aware { @@ -254,17 +242,11 @@ pub(crate) fn partitioned_hash_join( PartitionMode::Partitioned }; - Ok(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - partition_mode, - hash_join.null_equality(), - hash_join.null_aware, - )?)) + Ok(Arc::new( + HashJoinExecBuilder::from(hash_join) + .with_partition_mode(partition_mode) + .build()?, + )) } } diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs index 99922ba07..44d0926a8 100644 --- a/datafusion/physical-optimizer/src/projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -135,7 +135,7 @@ fn try_push_down_join_filter( ); let new_lhs_length = lhs_rewrite.data.0.schema().fields.len(); - let projections = match projections { + let projections = match projections.as_ref() { None => match join.join_type() { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { // Build projections that ignore the newly projected columns. diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index aa0f5a236..dcfa0456a 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -626,11 +626,14 @@ pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, /// Group by expressions - group_by: PhysicalGroupBy, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + group_by: Arc<PhysicalGroupBy>, /// Aggregate expressions - aggr_expr: Vec<Arc<AggregateFunctionExpr>>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + aggr_expr: Arc<[Arc<AggregateFunctionExpr>]>, /// FILTER (WHERE clause) expression for each aggregate expression - filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + filter_expr: Arc<[Option<Arc<dyn PhysicalExpr>>]>, /// Configuration for limit-based optimizations limit_options: Option<LimitOptions>, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -664,18 +667,18 @@ impl AggregateExec { /// Rewrites aggregate exec with new aggregate expressions. pub fn with_new_aggr_exprs( &self, - aggr_expr: Vec<Arc<AggregateFunctionExpr>>, + aggr_expr: impl Into<Arc<[Arc<AggregateFunctionExpr>]>>, ) -> Self { Self { - aggr_expr, + aggr_expr: aggr_expr.into(), // clone the rest of the fields required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + filter_expr: Arc::clone(&self.filter_expr), limit_options: self.limit_options, input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), @@ -694,9 +697,9 @@ impl AggregateExec { input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - aggr_expr: self.aggr_expr.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + aggr_expr: Arc::clone(&self.aggr_expr), + filter_expr: Arc::clone(&self.filter_expr), input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), @@ -711,12 +714,13 @@ impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into<Arc<PhysicalGroupBy>>, aggr_expr: Vec<Arc<AggregateFunctionExpr>>, filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>, input: Arc<dyn ExecutionPlan>, input_schema: SchemaRef, ) -> Result<Self> { + let group_by = group_by.into(); let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; let schema = Arc::new(schema); @@ -741,13 +745,16 @@ impl AggregateExec { /// the schema in such cases. fn try_new_with_schema( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into<Arc<PhysicalGroupBy>>, mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>, - filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>, + filter_expr: impl Into<Arc<[Option<Arc<dyn PhysicalExpr>>]>>, input: Arc<dyn ExecutionPlan>, input_schema: SchemaRef, schema: SchemaRef, ) -> Result<Self> { + let group_by = group_by.into(); + let filter_expr = filter_expr.into(); + // Make sure arguments are consistent in size assert_eq_or_internal_err!( aggr_expr.len(), @@ -814,13 +821,13 @@ impl AggregateExec { &group_expr_mapping, &mode, &input_order_mode, - aggr_expr.as_slice(), + aggr_expr.as_ref(), )?; let mut exec = AggregateExec { mode, group_by, - aggr_expr, + aggr_expr: aggr_expr.into(), filter_expr, input, schema, @@ -1370,9 +1377,9 @@ impl ExecutionPlan for AggregateExec { ) -> Result<Arc<dyn ExecutionPlan>> { let mut me = AggregateExec::try_new_with_schema( self.mode, - self.group_by.clone(), - self.aggr_expr.clone(), - self.filter_expr.clone(), + Arc::clone(&self.group_by), + self.aggr_expr.to_vec(), + Arc::clone(&self.filter_expr), Arc::clone(&children[0]), Arc::clone(&self.input_schema), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index eb9b6766a..fe8942097 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -62,7 +62,7 @@ struct AggregateStreamInner { mode: AggregateMode, input: SendableRecordBatchStream, aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>, - filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>, + filter_expressions: Arc<[Option<Arc<dyn PhysicalExpr>>]>, // ==== Runtime States/Buffers ==== accumulators: Vec<AccumulatorItem>, @@ -277,7 +277,7 @@ impl AggregateStream { partition: usize, ) -> Result<Self> { let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -285,7 +285,7 @@ impl AggregateStream { let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?; let filter_expressions = match agg.mode.input_mode() { AggregateInputMode::Raw => agg_filter_expr, - AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()], + AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()].into(), }; let accumulators = create_accumulators(&agg.aggr_expr)?; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index b2cf396b1..de857370c 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -377,10 +377,10 @@ pub(crate) struct GroupedHashAggregateStream { /// /// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`, /// the filter expression is `x > 100`. - filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>, + filter_expressions: Arc<[Option<Arc<dyn PhysicalExpr>>]>, /// GROUP BY expressions - group_by: PhysicalGroupBy, + group_by: Arc<PhysicalGroupBy>, /// max rows in output RecordBatches batch_size: usize, @@ -465,8 +465,8 @@ impl GroupedHashAggregateStream { ) -> Result<Self> { debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); - let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_group_by = Arc::clone(&agg.group_by); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); + let aggregate_exprs = Arc::clone(&agg.aggr_expr); // arguments for each aggregate, one vec of expressions per // aggregate @@ -493,7 +493,7 @@ impl GroupedHashAggregateStream { let filter_expressions = match agg.mode.input_mode() { AggregateInputMode::Raw => agg_filter_expr, - AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()], + AggregateInputMode::Partial => vec![None; agg.aggr_expr.len()].into(), }; // Instantiate the accumulators diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 72c5d0c86..4aa566ccf 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -50,7 +50,7 @@ pub struct GroupedTopKAggregateStream { baseline_metrics: BaselineMetrics, group_by_metrics: GroupByMetrics, aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>, - group_by: PhysicalGroupBy, + group_by: Arc<PhysicalGroupBy>, priority_map: PriorityMap, } @@ -62,7 +62,7 @@ impl GroupedTopKAggregateStream { limit: usize, ) -> Result<Self> { let agg_schema = Arc::clone(&aggr.schema); - let group_by = aggr.group_by.clone(); + let group_by = Arc::clone(&aggr.group_by); let input = aggr.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition); let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition); diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 32dc60b56..590f6f09e 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -181,7 +181,7 @@ pub fn compute_record_batch_statistics( /// Checks if the given projection is valid for the given schema. pub fn can_project( schema: &arrow::datatypes::SchemaRef, - projection: Option<&Vec<usize>>, + projection: Option<&[usize]>, ) -> Result<()> { match projection { Some(columns) => { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 0acf419e6..abd7b72fc 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -20,6 +20,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; +use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use itertools::Itertools; use super::{ @@ -85,7 +86,7 @@ pub struct FilterExec { /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, /// The projection indices of the columns in the output schema of join - projection: Option<Vec<usize>>, + projection: Option<ProjectionRef>, /// Target batch size for output batches batch_size: usize, /// Number of rows to fetch @@ -96,7 +97,7 @@ pub struct FilterExec { pub struct FilterExecBuilder { predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn ExecutionPlan>, - projection: Option<Vec<usize>>, + projection: Option<ProjectionRef>, default_selectivity: u8, batch_size: usize, fetch: Option<usize>, @@ -136,18 +137,19 @@ impl FilterExecBuilder { /// /// If no projection is currently set, the new projection is used directly. /// If `None` is passed, the projection is cleared. - pub fn apply_projection(mut self, projection: Option<Vec<usize>>) -> Result<Self> { + pub fn apply_projection(self, projection: Option<Vec<usize>>) -> Result<Self> { + let projection = projection.map(Into::into); + self.apply_projection_by_ref(projection.as_ref()) + } + + /// The same as [`Self::apply_projection`] but takes projection shared reference. + pub fn apply_projection_by_ref( + mut self, + projection: Option<&ProjectionRef>, + ) -> Result<Self> { // Check if the projection is valid against current output schema - can_project(&self.input.schema(), projection.as_ref())?; - self.projection = match projection { - Some(new_proj) => match &self.projection { - Some(existing_proj) => { - Some(new_proj.iter().map(|i| existing_proj[*i]).collect()) - } - None => Some(new_proj), - }, - None => None, - }; + can_project(&self.input.schema(), projection.map(AsRef::as_ref))?; + self.projection = combine_projections(projection, self.projection.as_ref())?; Ok(self) } @@ -189,16 +191,14 @@ impl FilterExecBuilder { } // Validate projection if provided - if let Some(ref proj) = self.projection { - can_project(&self.input.schema(), Some(proj))?; - } + can_project(&self.input.schema(), self.projection.as_deref())?; // Compute properties once with all parameters let cache = FilterExec::compute_properties( &self.input, &self.predicate, self.default_selectivity, - self.projection.as_ref(), + self.projection.as_deref(), )?; Ok(FilterExec { @@ -302,8 +302,8 @@ impl FilterExec { } /// Projection - pub fn projection(&self) -> Option<&Vec<usize>> { - self.projection.as_ref() + pub fn projection(&self) -> &Option<ProjectionRef> { + &self.projection } /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. @@ -380,7 +380,7 @@ impl FilterExec { input: &Arc<dyn ExecutionPlan>, predicate: &Arc<dyn PhysicalExpr>, default_selectivity: u8, - projection: Option<&Vec<usize>>, + projection: Option<&[usize]>, ) -> Result<PlanProperties> { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -419,7 +419,7 @@ impl FilterExec { if let Some(projection) = projection { let schema = eq_properties.schema(); let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; + let out_schema = project_schema(schema, Some(&projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -664,7 +664,7 @@ impl ExecutionPlan for FilterExec { let new_predicate = conjunction(unhandled_filters); let updated_node = if new_predicate.eq(&lit(true)) { // FilterExec is no longer needed, but we may need to leave a projection in place - match self.projection() { + match self.projection().as_ref() { Some(projection_indices) => { let filter_child_schema = filter_input.schema(); let proj_exprs = projection_indices @@ -700,7 +700,7 @@ impl ExecutionPlan for FilterExec { &filter_input, &new_predicate, self.default_selectivity, - self.projection.as_ref(), + self.projection.as_deref(), )?, projection: self.projection.clone(), batch_size: self.batch_size, @@ -812,7 +812,7 @@ struct FilterExecStream { /// Runtime metrics recording metrics: FilterExecMetrics, /// The projection indices of the columns in the input schema - projection: Option<Vec<usize>>, + projection: Option<ProjectionRef>, /// Batch coalescer to combine small batches batch_coalescer: LimitedBatchCoalescer, } @@ -903,8 +903,8 @@ impl Stream for FilterExecStream { .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match self.projection { - Some(ref projection) => { + Ok(match self.projection.as_ref() { + Some(projection) => { let projected_batch = batch.project(projection)?; (array, projected_batch) }, @@ -1725,7 +1725,7 @@ mod tests { .build()?; // Verify projection is set correctly - assert_eq!(filter.projection(), Some(&vec![0, 2])); + assert_eq!(filter.projection(), &Some([0, 2].into())); // Verify schema contains only projected columns let output_schema = filter.schema(); @@ -1755,7 +1755,7 @@ mod tests { let filter = FilterExecBuilder::new(predicate, input).build()?; // Verify no projection is set - assert_eq!(filter.projection(), None); + assert!(filter.projection().is_none()); // Verify schema contains all columns let output_schema = filter.schema(); @@ -1969,7 +1969,7 @@ mod tests { .build()?; // Verify composed projection is [0, 3] - assert_eq!(filter.projection(), Some(&vec![0, 3])); + assert_eq!(filter.projection(), &Some([0, 3].into())); // Verify schema contains only columns a and d let output_schema = filter.schema(); @@ -2003,7 +2003,7 @@ mod tests { .build()?; // Projection should be cleared - assert_eq!(filter.projection(), None); + assert_eq!(filter.projection(), &None); // Schema should have all columns let output_schema = filter.schema(); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c249dfb10..a330ad54c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -81,6 +81,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; @@ -246,6 +247,172 @@ impl JoinLeftData { } } +/// Helps to build [`HashJoinExec`]. +pub struct HashJoinExecBuilder { + left: Arc<dyn ExecutionPlan>, + right: Arc<dyn ExecutionPlan>, + on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + join_type: JoinType, + filter: Option<JoinFilter>, + projection: Option<ProjectionRef>, + partition_mode: PartitionMode, + null_equality: NullEquality, + null_aware: bool, +} + +impl HashJoinExecBuilder { + /// Make a new [`HashJoinExecBuilder`]. + pub fn new( + left: Arc<dyn ExecutionPlan>, + right: Arc<dyn ExecutionPlan>, + on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + join_type: JoinType, + ) -> Self { + Self { + left, + right, + on, + filter: None, + projection: None, + partition_mode: PartitionMode::Auto, + join_type, + null_equality: NullEquality::NullEqualsNothing, + null_aware: false, + } + } + + /// Set projection from the vector. + pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self { + self.with_projection_ref(projection.map(Into::into)) + } + + /// Set projection from the shared reference. + pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) -> Self { + self.projection = projection; + self + } + + /// Set optional filter. + pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self { + self.filter = filter; + self + } + + /// Set partition mode. + pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self { + self.partition_mode = mode; + self + } + + /// Set null equality property. + pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self { + self.null_equality = null_equality; + self + } + + /// Set null aware property. + pub fn with_null_aware(mut self, null_aware: bool) -> Self { + self.null_aware = null_aware; + self + } + + /// Build resulting execution plan. + pub fn build(self) -> Result<HashJoinExec> { + let Self { + left, + right, + on, + join_type, + filter, + projection, + partition_mode, + null_equality, + null_aware, + } = self; + + let left_schema = left.schema(); + let right_schema = right.schema(); + if on.is_empty() { + return plan_err!("On constraints in HashJoinExec should be non-empty"); + } + + check_join_is_valid(&left_schema, &right_schema, &on)?; + + // Validate null_aware flag + if null_aware { + if !matches!(join_type, JoinType::LeftAnti) { + return plan_err!( + "null_aware can only be true for LeftAnti joins, got {join_type}" + ); + } + if on.len() != 1 { + return plan_err!( + "null_aware anti join only supports single column join key, got {} columns", + on.len() + ); + } + } + + let (join_schema, column_indices) = + build_join_schema(&left_schema, &right_schema, &join_type); + + let random_state = HASH_JOIN_SEED; + + let join_schema = Arc::new(join_schema); + + // check if the projection is valid + can_project(&join_schema, projection.as_deref())?; + + let cache = HashJoinExec::compute_properties( + &left, + &right, + &join_schema, + join_type, + &on, + partition_mode, + projection.as_deref(), + )?; + + // Initialize both dynamic filter and bounds accumulator to None + // They will be set later if dynamic filtering is enabled + + Ok(HashJoinExec { + left, + right, + on, + filter, + join_type, + join_schema, + left_fut: Default::default(), + random_state, + mode: partition_mode, + metrics: ExecutionPlanMetricsSet::new(), + projection, + column_indices, + null_equality, + null_aware, + cache, + dynamic_filter: None, + }) + } +} + +impl From<&HashJoinExec> for HashJoinExecBuilder { + fn from(exec: &HashJoinExec) -> Self { + Self { + left: Arc::clone(exec.left()), + right: Arc::clone(exec.right()), + on: exec.on.clone(), + join_type: exec.join_type, + filter: exec.filter.clone(), + projection: exec.projection.clone(), + partition_mode: exec.mode, + null_equality: exec.null_equality, + null_aware: exec.null_aware, + } + } +} + #[expect(rustdoc::private_intra_doc_links)] /// Join execution plan: Evaluates equijoin predicates in parallel on multiple /// partitions using a hash table and an optional filter list to apply post @@ -466,7 +633,7 @@ pub struct HashJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join - pub projection: Option<Vec<usize>>, + pub projection: Option<ProjectionRef>, /// Information of index and left / right placement of columns column_indices: Vec<ColumnIndex>, /// The equality null-handling behavior of the join algorithm. @@ -535,70 +702,13 @@ impl HashJoinExec { null_equality: NullEquality, null_aware: bool, ) -> Result<Self> { - let left_schema = left.schema(); - let right_schema = right.schema(); - if on.is_empty() { - return plan_err!("On constraints in HashJoinExec should be non-empty"); - } - - check_join_is_valid(&left_schema, &right_schema, &on)?; - - // Validate null_aware flag - if null_aware { - if !matches!(join_type, JoinType::LeftAnti) { - return plan_err!( - "null_aware can only be true for LeftAnti joins, got {join_type}" - ); - } - if on.len() != 1 { - return plan_err!( - "null_aware anti join only supports single column join key, got {} columns", - on.len() - ); - } - } - - let (join_schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); - - let random_state = HASH_JOIN_SEED; - - let join_schema = Arc::new(join_schema); - - // check if the projection is valid - can_project(&join_schema, projection.as_ref())?; - - let cache = Self::compute_properties( - &left, - &right, - &join_schema, - *join_type, - &on, - partition_mode, - projection.as_ref(), - )?; - - // Initialize both dynamic filter and bounds accumulator to None - // They will be set later if dynamic filtering is enabled - - Ok(HashJoinExec { - left, - right, - on, - filter, - join_type: *join_type, - join_schema, - left_fut: Default::default(), - random_state, - mode: partition_mode, - metrics: ExecutionPlanMetricsSet::new(), - projection, - column_indices, - null_equality, - null_aware, - cache, - dynamic_filter: None, - }) + HashJoinExecBuilder::new(left, right, on, *join_type) + .with_filter(filter) + .with_projection(projection) + .with_partition_mode(partition_mode) + .with_null_equality(null_equality) + .with_null_aware(null_aware) + .build() } fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> { @@ -687,26 +797,14 @@ impl HashJoinExec { /// Return new instance of [HashJoinExec] with the given projection. pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> { + let projection = projection.map(Into::into); // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - Self::try_new( - Arc::clone(&self.left), - Arc::clone(&self.right), - self.on.clone(), - self.filter.clone(), - &self.join_type, - projection, - self.mode, - self.null_equality, - self.null_aware, - ) + can_project(&self.schema(), projection.as_deref())?; + let projection = + combine_projections(projection.as_ref(), self.projection.as_ref())?; + HashJoinExecBuilder::from(self) + .with_projection_ref(projection) + .build() } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -717,7 +815,7 @@ impl HashJoinExec { join_type: JoinType, on: JoinOnRef, mode: PartitionMode, - projection: Option<&Vec<usize>>, + projection: Option<&[usize]>, ) -> Result<PlanProperties> { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -769,7 +867,7 @@ impl HashJoinExec { if let Some(projection) = projection { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; + let out_schema = project_schema(schema, Some(&projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -824,7 +922,7 @@ impl HashJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), partition_mode, @@ -1020,7 +1118,7 @@ impl ExecutionPlan for HashJoinExec { self.join_type, &self.on, self.mode, - self.projection.as_ref(), + self.projection.as_deref(), )?, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), @@ -1181,7 +1279,7 @@ impl ExecutionPlan for HashJoinExec { let right_stream = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) diff --git a/datafusion/physical-plan/src/joins/hash_join/mod.rs b/datafusion/physical-plan/src/joins/hash_join/mod.rs index 8592e1d96..b915802ea 100644 --- a/datafusion/physical-plan/src/joins/hash_join/mod.rs +++ b/datafusion/physical-plan/src/joins/hash_join/mod.rs @@ -17,7 +17,7 @@ //! [`HashJoinExec`] Partitioned Hash Join Operator -pub use exec::HashJoinExec; +pub use exec::{HashJoinExec, HashJoinExecBuilder}; pub use partitioned_hash_eval::{HashExpr, HashTableLookupExpr, SeededRandomState}; mod exec; diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 848d0472f..2cdfa1e6a 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -20,8 +20,10 @@ use arrow::array::BooleanBufferBuilder; pub use cross_join::CrossJoinExec; use datafusion_physical_expr::PhysicalExprRef; -pub use hash_join::{HashExpr, HashJoinExec, HashTableLookupExpr, SeededRandomState}; -pub use nested_loop_join::NestedLoopJoinExec; +pub use hash_join::{ + HashExpr, HashJoinExec, HashJoinExecBuilder, HashTableLookupExpr, SeededRandomState, +}; +pub use nested_loop_join::{NestedLoopJoinExec, NestedLoopJoinExecBuilder}; use parking_lot::Mutex; // Note: SortMergeJoin is not used in plans yet pub use piecewise_merge_join::PiecewiseMergeJoinExec; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b57f91322..e6bc26c34 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -71,6 +71,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; +use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use futures::{Stream, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; @@ -192,7 +193,7 @@ pub struct NestedLoopJoinExec { /// Information of index and left / right placement of columns column_indices: Vec<ColumnIndex>, /// Projection to apply to the output of the join - projection: Option<Vec<usize>>, + projection: Option<ProjectionRef>, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -200,34 +201,76 @@ pub struct NestedLoopJoinExec { cache: PlanProperties, } -impl NestedLoopJoinExec { - /// Try to create a new [`NestedLoopJoinExec`] - pub fn try_new( +/// Helps to build [`NestedLoopJoinExec`]. +pub struct NestedLoopJoinExecBuilder { + left: Arc<dyn ExecutionPlan>, + right: Arc<dyn ExecutionPlan>, + join_type: JoinType, + filter: Option<JoinFilter>, + projection: Option<ProjectionRef>, +} + +impl NestedLoopJoinExecBuilder { + /// Make a new [`NestedLoopJoinExecBuilder`]. + pub fn new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, - filter: Option<JoinFilter>, - join_type: &JoinType, - projection: Option<Vec<usize>>, - ) -> Result<Self> { + join_type: JoinType, + ) -> Self { + Self { + left, + right, + join_type, + filter: None, + projection: None, + } + } + + /// Set projection from the vector. + pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self { + self.with_projection_ref(projection.map(Into::into)) + } + + /// Set projection from the shared reference. + pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) -> Self { + self.projection = projection; + self + } + + /// Set optional filter. + pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self { + self.filter = filter; + self + } + + /// Build resulting execution plan. + pub fn build(self) -> Result<NestedLoopJoinExec> { + let Self { + left, + right, + join_type, + filter, + projection, + } = self; + let left_schema = left.schema(); let right_schema = right.schema(); check_join_is_valid(&left_schema, &right_schema, &[])?; let (join_schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); + build_join_schema(&left_schema, &right_schema, &join_type); let join_schema = Arc::new(join_schema); - let cache = Self::compute_properties( + let cache = NestedLoopJoinExec::compute_properties( &left, &right, &join_schema, - *join_type, - projection.as_ref(), + join_type, + projection.as_deref(), )?; - Ok(NestedLoopJoinExec { left, right, filter, - join_type: *join_type, + join_type, join_schema, build_side_data: Default::default(), column_indices, @@ -236,6 +279,34 @@ impl NestedLoopJoinExec { cache, }) } +} + +impl From<&NestedLoopJoinExec> for NestedLoopJoinExecBuilder { + fn from(exec: &NestedLoopJoinExec) -> Self { + Self { + left: Arc::clone(exec.left()), + right: Arc::clone(exec.right()), + join_type: exec.join_type, + filter: exec.filter.clone(), + projection: exec.projection.clone(), + } + } +} + +impl NestedLoopJoinExec { + /// Try to create a new [`NestedLoopJoinExec`] + pub fn try_new( + left: Arc<dyn ExecutionPlan>, + right: Arc<dyn ExecutionPlan>, + filter: Option<JoinFilter>, + join_type: &JoinType, + projection: Option<Vec<usize>>, + ) -> Result<Self> { + NestedLoopJoinExecBuilder::new(left, right, *join_type) + .with_projection(projection) + .with_filter(filter) + .build() + } /// left side pub fn left(&self) -> &Arc<dyn ExecutionPlan> { @@ -257,8 +328,8 @@ impl NestedLoopJoinExec { &self.join_type } - pub fn projection(&self) -> Option<&Vec<usize>> { - self.projection.as_ref() + pub fn projection(&self) -> &Option<ProjectionRef> { + &self.projection } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -267,7 +338,7 @@ impl NestedLoopJoinExec { right: &Arc<dyn ExecutionPlan>, schema: &SchemaRef, join_type: JoinType, - projection: Option<&Vec<usize>>, + projection: Option<&[usize]>, ) -> Result<PlanProperties> { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -310,7 +381,7 @@ impl NestedLoopJoinExec { if let Some(projection) = projection { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; + let out_schema = project_schema(schema, Some(&projection))?; output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); @@ -334,22 +405,14 @@ impl NestedLoopJoinExec { } pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> { + let projection = projection.map(Into::into); // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - Self::try_new( - Arc::clone(&self.left), - Arc::clone(&self.right), - self.filter.clone(), - &self.join_type, - projection, - ) + can_project(&self.schema(), projection.as_deref())?; + let projection = + combine_projections(projection.as_ref(), self.projection.as_ref())?; + NestedLoopJoinExecBuilder::from(self) + .with_projection_ref(projection) + .build() } /// Returns a new `ExecutionPlan` that runs NestedLoopsJoins with the left @@ -371,7 +434,7 @@ impl NestedLoopJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), )?; @@ -476,13 +539,16 @@ impl ExecutionPlan for NestedLoopJoinExec { self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { - Ok(Arc::new(NestedLoopJoinExec::try_new( - Arc::clone(&children[0]), - Arc::clone(&children[1]), - self.filter.clone(), - &self.join_type, - self.projection.clone(), - )?)) + Ok(Arc::new( + NestedLoopJoinExecBuilder::new( + Arc::clone(&children[0]), + Arc::clone(&children[1]), + self.join_type, + ) + .with_filter(self.filter.clone()) + .with_projection_ref(self.projection.clone()) + .build()?, + )) } fn execute( @@ -521,7 +587,7 @@ impl ExecutionPlan for NestedLoopJoinExec { let probe_side_data = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04..e709703e0 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1674,7 +1674,7 @@ fn swap_reverting_projection( pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, - projection: Option<&Vec<usize>>, + projection: Option<&[usize]>, join_type: &JoinType, ) -> Option<Vec<usize>> { match join_type { @@ -1685,7 +1685,7 @@ pub fn swap_join_projection( | JoinType::RightAnti | JoinType::RightSemi | JoinType::LeftMark - | JoinType::RightMark => projection.cloned(), + | JoinType::RightMark => projection.map(|p| p.to_vec()), _ => projection.map(|p| { p.iter() .map(|i| { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f00360292..76711a8f8 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -138,13 +138,19 @@ impl ProjectionExec { E: Into<ProjectionExpr>, { let input_schema = input.schema(); - // convert argument to Vec<ProjectionExpr> - let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>(); - let projection = ProjectionExprs::new(expr_vec); + let expr_arc = expr.into_iter().map(Into::into).collect::<Arc<_>>(); + let projection = ProjectionExprs::from_expressions(expr_arc); let projector = projection.make_projector(&input_schema)?; + Self::try_from_projector(projector, input) + } + fn try_from_projector( + projector: Projector, + input: Arc<dyn ExecutionPlan>, + ) -> Result<Self> { // Construct a map from the input expressions to the output expression of the Projection - let projection_mapping = projection.projection_mapping(&input_schema)?; + let projection_mapping = + projector.projection().projection_mapping(&input.schema())?; let cache = Self::compute_properties( &input, &projection_mapping, @@ -305,8 +311,8 @@ impl ExecutionPlan for ProjectionExec { self: Arc<Self>, mut children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { - ProjectionExec::try_new( - self.projector.projection().clone(), + ProjectionExec::try_from_projector( + self.projector.clone(), children.swap_remove(0), ) .map(|p| Arc::new(p) as _) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index cb731ff04..60d8b5705 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3173,7 +3173,7 @@ impl protobuf::PhysicalPlanNode { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().map_or_else(Vec::new, |v| { + projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { v.iter().map(|x| *x as u32).collect::<Vec<u32>>() }), }, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
