jonathanc-n commented on code in PR #15862: URL: https://github.com/apache/datafusion/pull/15862#discussion_r2061054598
########## datafusion/sql/src/statement.rs: ########## @@ -2074,6 +2073,178 @@ impl<S: ContextProvider> SqlToRel<'_, S> { Ok(plan) } + fn merge_to_plan( + &self, + source_table: TableFactor, + target_table: TableFactor, + on: Box<SQLExpr>, + clauses: Vec<MergeClause>, + ) -> Result<LogicalPlan> { + let mut ctx = PlannerContext::new(); + + let target_name: ObjectName; + let source_name: ObjectName; + + match target_table { + TableFactor::Table { + name, + alias, + args, + with_hints, + version, + with_ordinality, + partitions, + json_path, + sample, + index_hints, + } => { + target_name = name; + } + _ => return plan_err!("Target table can only be a table for MERGE."), + } + + let target_ref = self.object_name_to_table_reference(target_name)?; + let target_src = self.context_provider.get_table_source(target_ref.clone())?; + let target_scan = + LogicalPlanBuilder::scan(target_ref.clone(), Arc::clone(&target_src), None)? + .project(projected_columns, lit(true).alias("target_exists")])? // add flag for matching target Review Comment: This is an earlier commit, i just put the a projected column here as a placeholder, I will update the pr. ########## datafusion/sql/src/statement.rs: ########## @@ -2074,6 +2073,178 @@ impl<S: ContextProvider> SqlToRel<'_, S> { Ok(plan) } + fn merge_to_plan( + &self, + source_table: TableFactor, + target_table: TableFactor, + on: Box<SQLExpr>, + clauses: Vec<MergeClause>, + ) -> Result<LogicalPlan> { + let mut ctx = PlannerContext::new(); + + let target_name: ObjectName; + let source_name: ObjectName; + + match target_table { + TableFactor::Table { + name, + alias, + args, + with_hints, + version, + with_ordinality, + partitions, + json_path, + sample, + index_hints, + } => { + target_name = name; + } + _ => return plan_err!("Target table can only be a table for MERGE."), + } + + let target_ref = self.object_name_to_table_reference(target_name)?; + let target_src = self.context_provider.get_table_source(target_ref.clone())?; + let target_scan = + LogicalPlanBuilder::scan(target_ref.clone(), Arc::clone(&target_src), None)? + .project(projected_columns, lit(true).alias("target_exists")])? // add flag for matching target + .build()?; + + match source_table { + TableFactor::Table { + name, + alias, + args, + with_hints, + version, + with_ordinality, + partitions, + json_path, + sample, + index_hints, + } => { + source_name = name; + } + _ => { + return plan_err!("Source table can currently only be a table for MERGE.") + } + } + + let source_ref = self.object_name_to_table_reference(source_name)?; + let source_src = self.context_provider.get_table_source(source_ref.clone())?; + let source_scan = + LogicalPlanBuilder::scan(source_ref.clone(), Arc::clone(&source_src), None)? + .project(projected_columns, lit(true).alias("source_exists")])? // add flag for matching source + .build()?; + + let target_schema = target_scan.schema(); + + let joined_schema = DFSchema::from( + target_scan.schema().join(source_scan.schema())? + ); + + let on_df_expr = self.sql_to_expr( + *on, + &joined_schema, + &mut ctx, + )?; + + let join_plan = LogicalPlan::Join(PlanJoin { + left: Arc::new(target_scan.clone()), + right: Arc::new(source_scan.clone()), + on: vec![], + filter: Some(on_df_expr), + join_type: JoinType::Full, + join_constraint: JoinConstraint::On, + schema: Arc::new(target_scan.schema().join(source_scan.schema())?), + null_equals_null: false, + }); + + // Flag checks for both tables + let both_not_null = col("target_exists").is_not_null() + .and(col("source_exists").is_not_null()); + let only_source = col("target_exists").is_null() + .and(col("source_exists").is_not_null()); + let only_target = col("target_exists").is_not_null() + .and(col("source_exists").is_null()); + + let mut when_then: Vec<(Box<Expr>, Box<Expr>)> = Vec::new(); + let mut delete_condition = Vec::<Expr>::new(); + + let mut planner_context = PlannerContext::new(); + + for clause in clauses { + let base = match clause.clause_kind { + MergeClauseKind::Matched => both_not_null.clone(), + MergeClauseKind::NotMatchedByTarget | MergeClauseKind::NotMatched => only_source.clone(), + MergeClauseKind::NotMatchedBySource => only_target.clone(), + }; + + // Combine predicate and column check + let when_expr = if let Some(pred) = &clause.predicate { + let predicate = self.sql_to_expr(*pred, &joined_schema, &mut planner_context)?; + base.and(predicate) + } else { + base + }; + + match &clause.action { + MergeAction::Update { assignments } => { + // each assignment (col = expr) becomes its own `when -> then` + for assign in assignments { + let value = Box::new(self.sql_to_expr(assign.value, &joined_schema, &mut planner_context)?); + when_then.push(( + Box::new(when_expr.clone()), + value, + )); + } + } + MergeAction::Insert(insert_expr) => { + match &insert_expr.kind { + MergeInsertKind::Values(Values{ rows, .. }) => { + let first_row = &rows[0]; + for (col_ident, val) in insert_expr.columns.iter().zip(first_row) { + let value = Box::new(self.sql_to_expr(val.clone(), &joined_schema, &mut planner_context)?); + when_then.push(( + Box::new(when_expr.clone()), + Box::new(value.clone().alias(&col_ident.value)), + )); + } + } + MergeInsertKind::Row => { + for col_ident in &insert_expr.columns { + let src_col = Expr::Column(col_ident.clone().into()); + when_then.push((Box::new(when_expr.clone()), Box::new(src_col))); + } + } + } + } + + MergeAction::Delete => { + delete_condition.push(when_expr.clone()); + } + } + } + + let delete_pred = delete_condition + .into_iter() + .reduce(|a, b| a.or(b)) + .unwrap_or_else(|| lit(false)); + + let merged = LogicalPlanBuilder::from(join_plan) + .filter(delete_pred.not())? + .project(vec![ + Expr::Case(Case { Review Comment: Is there a better way to do this, I have not implemented it yet however using a case expression like this would need a case expression for every column that is to be updated. I think it would be better to somehow do the full update to the entire row at once. Is there an expression for this? ########## datafusion/sql/src/statement.rs: ########## @@ -2074,6 +2073,178 @@ impl<S: ContextProvider> SqlToRel<'_, S> { Ok(plan) } + fn merge_to_plan( + &self, + source_table: TableFactor, + target_table: TableFactor, + on: Box<SQLExpr>, + clauses: Vec<MergeClause>, + ) -> Result<LogicalPlan> { + let mut ctx = PlannerContext::new(); + + let target_name: ObjectName; + let source_name: ObjectName; + + match target_table { + TableFactor::Table { + name, + alias, + args, + with_hints, + version, + with_ordinality, + partitions, + json_path, + sample, + index_hints, + } => { + target_name = name; + } + _ => return plan_err!("Target table can only be a table for MERGE."), + } + + let target_ref = self.object_name_to_table_reference(target_name)?; + let target_src = self.context_provider.get_table_source(target_ref.clone())?; + let target_scan = + LogicalPlanBuilder::scan(target_ref.clone(), Arc::clone(&target_src), None)? + .project(projected_columns, lit(true).alias("target_exists")])? // add flag for matching target Review Comment: This is an earlier commit, i just put the a projected column here as a placeholder, I will update the pr later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org