This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch add_join_projection in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit e8edecfd4efdd5d0a1af6439fc59736313a5ce3e Author: Daniƫl Heres <daniel.he...@coralogix.com> AuthorDate: Mon Jun 26 23:08:23 2023 +0200 Use projection --- .../src/physical_optimizer/dist_enforcement.rs | 5 + .../core/src/physical_optimizer/join_selection.rs | 11 +++ .../core/src/physical_optimizer/pipeline_fixer.rs | 1 + .../core/src/physical_plan/joins/hash_join.rs | 11 ++- .../src/physical_plan/joins/nested_loop_join.rs | 2 +- .../src/physical_plan/joins/sort_merge_join.rs | 2 +- .../src/physical_plan/joins/symmetric_hash_join.rs | 5 +- datafusion/core/src/physical_plan/joins/utils.rs | 3 +- datafusion/core/src/physical_planner.rs | 5 + datafusion/expr/src/logical_plan/builder.rs | 102 ++++++++++++--------- datafusion/expr/src/logical_plan/plan.rs | 11 ++- datafusion/expr/src/utils.rs | 10 +- datafusion/optimizer/src/eliminate_cross_join.rs | 3 + datafusion/optimizer/src/eliminate_outer_join.rs | 1 + .../optimizer/src/extract_equijoin_predicate.rs | 2 + datafusion/optimizer/src/push_down_limit.rs | 1 + datafusion/optimizer/src/push_down_projection.rs | 13 ++- 17 files changed, 134 insertions(+), 54 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index cb98e69d7a..1e539d18f6 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -154,6 +154,7 @@ fn adjust_input_keys_ordering( join_type, mode, null_equals_null, + projection, .. }) = plan_any.downcast_ref::<HashJoinExec>() { @@ -169,6 +170,7 @@ fn adjust_input_keys_ordering( join_type, PartitionMode::Partitioned, *null_equals_null, + projection.clone(), )?) as Arc<dyn ExecutionPlan>) }; Some(reorder_partitioned_join_keys( @@ -541,6 +543,7 @@ fn reorder_join_keys_to_inputs( join_type, mode, null_equals_null, + projection, .. }) = plan_any.downcast_ref::<HashJoinExec>() { @@ -570,6 +573,7 @@ fn reorder_join_keys_to_inputs( join_type, PartitionMode::Partitioned, *null_equals_null, + projection.clone(), )?)) } else { Ok(plan) @@ -1123,6 +1127,7 @@ mod tests { join_type, PartitionMode::Partitioned, false, + None, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index a9dec73c36..ac24c6275f 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -137,6 +137,7 @@ pub fn swap_hash_join( &swap_join_type(*hash_join.join_type()), partition_mode, hash_join.null_equals_null(), + None, )?; if matches!( hash_join.join_type(), @@ -333,6 +334,7 @@ fn try_collect_left( hash_join.join_type(), PartitionMode::CollectLeft, hash_join.null_equals_null(), + hash_join.projection.clone(), )?))) } } @@ -344,6 +346,7 @@ fn try_collect_left( hash_join.join_type(), PartitionMode::CollectLeft, hash_join.null_equals_null(), + hash_join.projection.clone(), )?))), (false, true) => { if supports_swap(*hash_join.join_type()) { @@ -371,6 +374,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl hash_join.join_type(), PartitionMode::Partitioned, hash_join.null_equals_null(), + hash_join.projection.clone(), )?)) } } @@ -495,6 +499,7 @@ mod tests { &JoinType::Left, PartitionMode::CollectLeft, false, + None, ) .unwrap(); @@ -543,6 +548,7 @@ mod tests { &JoinType::Left, PartitionMode::CollectLeft, false, + None, ) .unwrap(); @@ -594,6 +600,7 @@ mod tests { &join_type, PartitionMode::Partitioned, false, + None, ) .unwrap(); @@ -659,6 +666,7 @@ mod tests { &JoinType::Inner, PartitionMode::CollectLeft, false, + None, ) .unwrap(); let child_schema = child_join.schema(); @@ -675,6 +683,7 @@ mod tests { &JoinType::Left, PartitionMode::CollectLeft, false, + None, ) .unwrap(); @@ -712,6 +721,7 @@ mod tests { &JoinType::Inner, PartitionMode::CollectLeft, false, + None, ) .unwrap(); @@ -937,6 +947,7 @@ mod tests { &JoinType::Inner, PartitionMode::Auto, false, + None, ) .unwrap(); diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs index caae774345..cfe34558c4 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs @@ -644,6 +644,7 @@ mod hash_join_tests { &t.initial_join_type, t.initial_mode, false, + None, )?; let initial_hash_join_state = PipelineStatePropagator { diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index a3c553c9b3..b38811eefd 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -122,6 +122,8 @@ pub struct HashJoinExec { column_indices: Vec<ColumnIndex>, /// If null_equals_null is true, null == null else null != null pub(crate) null_equals_null: bool, + /// Optional output projection + pub projection: Option<Vec<Column>>, } impl HashJoinExec { @@ -136,6 +138,7 @@ impl HashJoinExec { join_type: &JoinType, partition_mode: PartitionMode, null_equals_null: bool, + projection: Option<Vec<Column>>, ) -> Result<Self> { let left_schema = left.schema(); let right_schema = right.schema(); @@ -148,7 +151,7 @@ impl HashJoinExec { check_join_is_valid(&left_schema, &right_schema, &on)?; let (schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); + build_join_schema(&left_schema, &right_schema, join_type, projection); let random_state = RandomState::with_seeds(0, 0, 0, 0); @@ -165,6 +168,7 @@ impl HashJoinExec { metrics: ExecutionPlanMetricsSet::new(), column_indices, null_equals_null, + projection, }) } @@ -337,6 +341,7 @@ impl ExecutionPlan for HashJoinExec { &self.join_type, self.mode, self.null_equals_null, + self.projection, )?)) } @@ -1358,6 +1363,7 @@ mod tests { join_type, PartitionMode::CollectLeft, null_equals_null, + None, ) } @@ -1377,6 +1383,7 @@ mod tests { join_type, PartitionMode::CollectLeft, null_equals_null, + None, ) } @@ -1431,6 +1438,7 @@ mod tests { join_type, PartitionMode::Partitioned, null_equals_null, + None, )?; let columns = columns(&join.schema()); @@ -3164,6 +3172,7 @@ mod tests { &join_type, PartitionMode::Partitioned, false, + None, )?; let stream = join.execute(1, task_ctx)?; diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs index 6586456fd2..5a2b0e8337 100644 --- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs +++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs @@ -106,7 +106,7 @@ impl NestedLoopJoinExec { let right_schema = right.schema(); check_join_is_valid(&left_schema, &right_schema, &[])?; let (schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); + build_join_schema(&left_schema, &right_schema, join_type, None); Ok(NestedLoopJoinExec { left, right, diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index bc8c686670..324f8582e9 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -177,7 +177,7 @@ impl SortMergeJoinExec { }; let schema = - Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0); + Arc::new(build_join_schema(&left_schema, &right_schema, &join_type, None).0); Ok(Self { left, diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index b46aba2fb5..7df848e3eb 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -295,7 +295,7 @@ impl SymmetricHashJoinExec { // Build the join schema from the left and right schemas: let (schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); + build_join_schema(&left_schema, &right_schema, join_type, None); // Initialize the random state for the join operation: let random_state = RandomState::with_seeds(0, 0, 0, 0); @@ -1862,6 +1862,7 @@ mod tests { join_type, PartitionMode::Partitioned, null_equals_null, + None, )?; let mut batches = vec![]; @@ -3026,7 +3027,7 @@ mod tests { // Build the join schema from the left and right schemas let (schema, join_column_indices) = - build_join_schema(&left_schema, &right_schema, &join_type); + build_join_schema(&left_schema, &right_schema, &join_type, None); let join_schema = Arc::new(schema); // Sort information for MemoryExec diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 627bdeebc5..772201315d 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -350,6 +350,7 @@ pub fn build_join_schema( left: &Schema, right: &Schema, join_type: &JoinType, + projection: Option<Vec<Column>>, ) -> (Schema, Vec<ColumnIndex>) { let (fields, column_indices): (SchemaBuilder, Vec<ColumnIndex>) = match join_type { JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { @@ -1197,7 +1198,7 @@ mod tests { ]; for (left_in, right_in, join_type, left_out, right_out) in cases { - let (schema, _) = build_join_schema(left_in, right_in, &join_type); + let (schema, _) = build_join_schema(left_in, right_in, &join_type, None); let expected_fields = left_out .fields() diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 75566208e3..525a6e34fb 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -901,6 +901,7 @@ impl DefaultPhysicalPlanner { join_type, null_equals_null, schema: join_schema, + projection, .. }) => { let null_equals_null = *null_equals_null; @@ -990,6 +991,8 @@ impl DefaultPhysicalPlanner { }) .collect::<Result<join_utils::JoinOn>>()?; + let projection: Option<Vec<Column>> = projection.map(|proj|proj.iter().enumerate().map(|col|Column::new(col.name, 0)).collect()); + let join_filter = match filter { Some(expr) => { // Extract columns from filter expression and saved in a HashSet @@ -1095,6 +1098,7 @@ impl DefaultPhysicalPlanner { join_type, partition_mode, null_equals_null, + projection.clone(), )?)) } else { Ok(Arc::new(HashJoinExec::try_new( @@ -1105,6 +1109,7 @@ impl DefaultPhysicalPlanner { join_type, PartitionMode::CollectLeft, null_equals_null, + projection.clone(), )?)) } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3d34c087ac..34ae0eb61a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -721,7 +721,7 @@ impl LogicalPlanBuilder { .map(|(l, r)| (Expr::Column(l), Expr::Column(r))) .collect(); let join_schema = - build_join_schema(self.plan.schema(), right.schema(), &join_type)?; + build_join_schema(self.plan.schema(), right.schema(), &join_type, None)?; Ok(Self::from(LogicalPlan::Join(Join { left: Arc::new(self.plan), @@ -732,6 +732,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null, + projection: None, }))) } @@ -754,7 +755,7 @@ impl LogicalPlanBuilder { let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect(); let join_schema = - build_join_schema(self.plan.schema(), right.schema(), &join_type)?; + build_join_schema(self.plan.schema(), right.schema(), &join_type, None)?; let mut join_on: Vec<(Expr, Expr)> = vec![]; let mut filters: Option<Expr> = None; for (l, r) in &on { @@ -796,6 +797,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::Using, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + projection: None, }))) } } @@ -1012,7 +1014,7 @@ impl LogicalPlanBuilder { .collect::<Result<Vec<_>>>()?; let join_schema = - build_join_schema(self.plan.schema(), right.schema(), &join_type)?; + build_join_schema(self.plan.schema(), right.schema(), &join_type, None)?; Ok(Self::from(LogicalPlan::Join(Join { left: Arc::new(self.plan), @@ -1023,6 +1025,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + projection: None, }))) } @@ -1038,6 +1041,7 @@ pub fn build_join_schema( left: &DFSchema, right: &DFSchema, join_type: &JoinType, + projection: Option<&Vec<Column>>, ) -> Result<DFSchema> { fn nullify_fields(fields: &[DFField]) -> Vec<DFField> { fields @@ -1049,51 +1053,65 @@ pub fn build_join_schema( let right_fields = right.fields(); let left_fields = left.fields(); - let fields: Vec<DFField> = match join_type { - JoinType::Inner => { - // left then right - left_fields + let fields = { + if let Some(projection) = projection { + projection .iter() - .chain(right_fields.iter()) - .cloned() - .collect() - } - JoinType::Left => { - // left then right, right set to nullable in case of not matched scenario - left_fields - .iter() - .chain(&nullify_fields(right_fields)) - .cloned() - .collect() - } - JoinType::Right => { - // left then right, left set to nullable in case of not matched scenario - nullify_fields(left_fields) - .iter() - .chain(right_fields.iter()) - .cloned() - .collect() - } - JoinType::Full => { - // left then right, all set to nullable in case of not matched scenario - nullify_fields(left_fields) - .iter() - .chain(&nullify_fields(right_fields)) - .cloned() - .collect() - } - JoinType::LeftSemi | JoinType::LeftAnti => { - // Only use the left side for the schema - left_fields.clone() - } - JoinType::RightSemi | JoinType::RightAnti => { - // Only use the right side for the schema - right_fields.clone() + .map(|col| { + left.field_from_column(col) + .or_else(|_| right.field_from_column(col)) + .cloned() + }) + .collect::<Result<Vec<DFField>>>()? + } else { + match join_type { + JoinType::Inner => { + // left then right + left_fields + .iter() + .chain(right_fields.iter()) + .cloned() + .collect() + } + JoinType::Left => { + // left then right, right set to nullable in case of not matched scenario + left_fields + .iter() + .chain(&nullify_fields(right_fields)) + .cloned() + .collect() + } + JoinType::Right => { + // left then right, left set to nullable in case of not matched scenario + nullify_fields(left_fields) + .iter() + .chain(right_fields.iter()) + .cloned() + .collect() + } + JoinType::Full => { + // left then right, all set to nullable in case of not matched scenario + nullify_fields(left_fields) + .iter() + .chain(&nullify_fields(right_fields)) + .cloned() + .collect() + } + JoinType::LeftSemi | JoinType::LeftAnti => { + // Only use the left side for the schema + left_fields.clone() + } + JoinType::RightSemi | JoinType::RightAnti => { + // Only use the right side for the schema + right_fields.clone() + } + } } }; let mut metadata = left.metadata().clone(); metadata.extend(right.metadata().clone()); + DFSchema::new_with_metadata(fields, metadata) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ab45047acf..d4517917ec 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1660,6 +1660,8 @@ pub struct Join { pub schema: DFSchemaRef, /// If null_equals_null is true, null == null else null != null pub null_equals_null: bool, + /// optional projection + pub projection: Option<Vec<Column>>, } impl Join { @@ -1681,8 +1683,12 @@ impl Join { .zip(column_on.1.into_iter()) .map(|(l, r)| (Expr::Column(l), Expr::Column(r))) .collect(); - let join_schema = - build_join_schema(left.schema(), right.schema(), &original_join.join_type)?; + let join_schema = build_join_schema( + left.schema(), + right.schema(), + &original_join.join_type, + original_join.projection.as_ref(), + )?; Ok(Join { left, @@ -1693,6 +1699,7 @@ impl Join { join_constraint: original_join.join_constraint, schema: Arc::new(join_schema), null_equals_null: original_join.null_equals_null, + projection: None, }) } } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 2b6fc5793a..2ae116e900 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -845,10 +845,15 @@ pub fn from_plan( join_constraint, on, null_equals_null, + projection, .. }) => { - let schema = - build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; + let schema = build_join_schema( + inputs[0].schema(), + inputs[1].schema(), + join_type, + projection.as_ref(), + )?; let equi_expr_count = on.len(); assert!(expr.len() >= equi_expr_count); @@ -881,6 +886,7 @@ pub fn from_plan( filter: filter_expr, schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, + projection: projection.clone(), })) } LogicalPlan::CrossJoin(_) => { diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 533566a0bf..e32311d67d 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -214,6 +214,7 @@ fn find_inner_join( left_input.schema(), right_input.schema(), &JoinType::Inner, + None, )?); return Ok(LogicalPlan::Join(Join { @@ -225,6 +226,7 @@ fn find_inner_join( filter: None, schema: join_schema, null_equals_null: false, + projection: None, })); } } @@ -233,6 +235,7 @@ fn find_inner_join( left_input.schema(), right.schema(), &JoinType::Inner, + None, )?); Ok(LogicalPlan::CrossJoin(CrossJoin { diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index e4d57f0209..e2df09da56 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -105,6 +105,7 @@ impl OptimizerRule for EliminateOuterJoin { filter: join.filter.clone(), schema: join.schema.clone(), null_equals_null: join.null_equals_null, + projection: join.projection.clone(), }); let new_plan = plan.with_new_inputs(&[new_join])?; Ok(Some(new_plan)) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 20b9c62971..41db207d53 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -55,6 +55,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + projection, }) => { let left_schema = left.schema(); let right_schema = right.schema(); @@ -80,6 +81,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint: *join_constraint, schema: schema.clone(), null_equals_null: *null_equals_null, + projection: projection.clone(), }) }); diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 6703a1d787..42915b2e3c 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -266,6 +266,7 @@ fn push_down_join(join: &Join, limit: usize) -> Option<Join> { join_constraint: join.join_constraint, schema: join.schema.clone(), null_equals_null: join.null_equals_null, + projection: join.projection.clone(), }) } } diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 4773a944f4..65409b0bd0 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -101,6 +101,10 @@ impl OptimizerRule for PushDownProjection { for e in projection.expr.iter() { expr_to_columns(e, &mut push_columns)?; } + + // Keep columns to use for join output projection + let output_columns = push_columns.clone(); + for (l, r) in join.on.iter() { expr_to_columns(l, &mut push_columns)?; expr_to_columns(r, &mut push_columns)?; @@ -119,9 +123,14 @@ impl OptimizerRule for PushDownProjection { join.right.schema(), join.right.clone(), )?; - let new_join = child_plan.with_new_inputs(&[new_left, new_right])?; - generate_plan!(projection_is_empty, plan, new_join) + let mut join = join.clone(); + + join.left = Arc::new(new_left); + join.right = Arc::new(new_right); + join.projection = Some(output_columns.into_iter().collect()); + + generate_plan!(projection_is_empty, plan, LogicalPlan::Join(join)) } LogicalPlan::CrossJoin(join) => { // collect column in on/filter in join and projection.