jonathanc-n commented on code in PR #15862:
URL: https://github.com/apache/datafusion/pull/15862#discussion_r2061054598


##########
datafusion/sql/src/statement.rs:
##########
@@ -2074,6 +2073,178 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         Ok(plan)
     }
 
+    fn merge_to_plan(
+        &self,
+        source_table: TableFactor,
+        target_table: TableFactor,
+        on: Box<SQLExpr>,
+        clauses: Vec<MergeClause>,
+    ) -> Result<LogicalPlan> {
+        let mut ctx = PlannerContext::new();
+
+        let target_name: ObjectName;
+        let source_name: ObjectName;
+
+        match target_table {
+            TableFactor::Table {
+                name,
+                alias,
+                args,
+                with_hints,
+                version,
+                with_ordinality,
+                partitions,
+                json_path,
+                sample,
+                index_hints,
+            } => {
+                target_name = name;
+            }
+            _ => return plan_err!("Target table can only be a table for 
MERGE."),
+        }
+
+        let target_ref = self.object_name_to_table_reference(target_name)?;
+        let target_src = 
self.context_provider.get_table_source(target_ref.clone())?;
+        let target_scan =
+            LogicalPlanBuilder::scan(target_ref.clone(), 
Arc::clone(&target_src), None)?
+                .project(projected_columns, 
lit(true).alias("target_exists")])? // add flag for matching target

Review Comment:
   This is an earlier commit, i just put the a projected column here as a 
placeholder, I will update the pr.



##########
datafusion/sql/src/statement.rs:
##########
@@ -2074,6 +2073,178 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         Ok(plan)
     }
 
+    fn merge_to_plan(
+        &self,
+        source_table: TableFactor,
+        target_table: TableFactor,
+        on: Box<SQLExpr>,
+        clauses: Vec<MergeClause>,
+    ) -> Result<LogicalPlan> {
+        let mut ctx = PlannerContext::new();
+
+        let target_name: ObjectName;
+        let source_name: ObjectName;
+
+        match target_table {
+            TableFactor::Table {
+                name,
+                alias,
+                args,
+                with_hints,
+                version,
+                with_ordinality,
+                partitions,
+                json_path,
+                sample,
+                index_hints,
+            } => {
+                target_name = name;
+            }
+            _ => return plan_err!("Target table can only be a table for 
MERGE."),
+        }
+
+        let target_ref = self.object_name_to_table_reference(target_name)?;
+        let target_src = 
self.context_provider.get_table_source(target_ref.clone())?;
+        let target_scan =
+            LogicalPlanBuilder::scan(target_ref.clone(), 
Arc::clone(&target_src), None)?
+                .project(projected_columns, 
lit(true).alias("target_exists")])? // add flag for matching target
+                .build()?;
+
+        match source_table {
+            TableFactor::Table {
+                name,
+                alias,
+                args,
+                with_hints,
+                version,
+                with_ordinality,
+                partitions,
+                json_path,
+                sample,
+                index_hints,
+            } => {
+                source_name = name;
+            }
+            _ => {
+                return plan_err!("Source table can currently only be a table 
for MERGE.")
+            }
+        }
+
+        let source_ref = self.object_name_to_table_reference(source_name)?;
+        let source_src = 
self.context_provider.get_table_source(source_ref.clone())?;
+        let source_scan =
+            LogicalPlanBuilder::scan(source_ref.clone(), 
Arc::clone(&source_src), None)?
+                .project(projected_columns, 
lit(true).alias("source_exists")])? // add flag for matching source
+                .build()?;
+
+        let target_schema = target_scan.schema();
+
+        let joined_schema = DFSchema::from(
+            target_scan.schema().join(source_scan.schema())?
+        );
+
+        let on_df_expr = self.sql_to_expr(
+            *on,
+            &joined_schema,
+            &mut ctx,
+        )?;
+
+        let join_plan = LogicalPlan::Join(PlanJoin {
+            left: Arc::new(target_scan.clone()),
+            right: Arc::new(source_scan.clone()),
+            on: vec![],
+            filter: Some(on_df_expr),
+            join_type: JoinType::Full,
+            join_constraint: JoinConstraint::On,
+            schema: Arc::new(target_scan.schema().join(source_scan.schema())?),
+            null_equals_null: false,
+        });
+
+        // Flag checks for both tables
+        let both_not_null = col("target_exists").is_not_null()
+            .and(col("source_exists").is_not_null());
+        let only_source = col("target_exists").is_null()
+            .and(col("source_exists").is_not_null());
+        let only_target = col("target_exists").is_not_null()
+            .and(col("source_exists").is_null());
+
+        let mut when_then: Vec<(Box<Expr>, Box<Expr>)> = Vec::new();
+        let mut delete_condition = Vec::<Expr>::new();
+
+        let mut planner_context = PlannerContext::new();
+        
+        for clause in clauses {
+            let base = match clause.clause_kind {
+                MergeClauseKind::Matched => both_not_null.clone(),
+                MergeClauseKind::NotMatchedByTarget | 
MergeClauseKind::NotMatched => only_source.clone(),
+                MergeClauseKind::NotMatchedBySource => only_target.clone(),
+            };
+
+            // Combine predicate and column check
+            let when_expr = if let Some(pred) = &clause.predicate {
+                let predicate = self.sql_to_expr(*pred, &joined_schema, &mut 
planner_context)?;
+                base.and(predicate)
+            } else {
+                base
+            };
+
+            match &clause.action {
+                MergeAction::Update { assignments } => {
+                    // each assignment (col = expr) becomes its own `when -> 
then`
+                    for assign in assignments {
+                        let value = Box::new(self.sql_to_expr(assign.value, 
&joined_schema, &mut planner_context)?);
+                        when_then.push((
+                            Box::new(when_expr.clone()),
+                            value,
+                        ));
+                    }
+                }
+                MergeAction::Insert(insert_expr) => {
+                    match &insert_expr.kind {
+                        MergeInsertKind::Values(Values{ rows, .. }) => {
+                            let first_row = &rows[0];
+                            for (col_ident, val) in 
insert_expr.columns.iter().zip(first_row) {
+                                let value = 
Box::new(self.sql_to_expr(val.clone(), &joined_schema, &mut planner_context)?);
+                                when_then.push((
+                                    Box::new(when_expr.clone()),
+                                    
Box::new(value.clone().alias(&col_ident.value)),
+                                ));
+                            }
+                        }
+                        MergeInsertKind::Row => {
+                            for col_ident in &insert_expr.columns {
+                                let src_col = 
Expr::Column(col_ident.clone().into());
+                                when_then.push((Box::new(when_expr.clone()), 
Box::new(src_col)));
+                            }
+                        }
+                    }
+                }
+        
+                MergeAction::Delete => {
+                    delete_condition.push(when_expr.clone());
+                }
+            }
+        }
+
+        let delete_pred = delete_condition
+            .into_iter()
+            .reduce(|a, b| a.or(b))
+            .unwrap_or_else(|| lit(false)); 
+
+        let merged = LogicalPlanBuilder::from(join_plan)
+            .filter(delete_pred.not())?
+            .project(vec![
+                Expr::Case(Case {

Review Comment:
   Is there a better way to do this, I have not implemented it yet however 
using a case expression like this would need a case expression for every column 
that is to be updated. I think it would be better to somehow do the full update 
to the entire row at once. Is there an expression for this?



##########
datafusion/sql/src/statement.rs:
##########
@@ -2074,6 +2073,178 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
         Ok(plan)
     }
 
+    fn merge_to_plan(
+        &self,
+        source_table: TableFactor,
+        target_table: TableFactor,
+        on: Box<SQLExpr>,
+        clauses: Vec<MergeClause>,
+    ) -> Result<LogicalPlan> {
+        let mut ctx = PlannerContext::new();
+
+        let target_name: ObjectName;
+        let source_name: ObjectName;
+
+        match target_table {
+            TableFactor::Table {
+                name,
+                alias,
+                args,
+                with_hints,
+                version,
+                with_ordinality,
+                partitions,
+                json_path,
+                sample,
+                index_hints,
+            } => {
+                target_name = name;
+            }
+            _ => return plan_err!("Target table can only be a table for 
MERGE."),
+        }
+
+        let target_ref = self.object_name_to_table_reference(target_name)?;
+        let target_src = 
self.context_provider.get_table_source(target_ref.clone())?;
+        let target_scan =
+            LogicalPlanBuilder::scan(target_ref.clone(), 
Arc::clone(&target_src), None)?
+                .project(projected_columns, 
lit(true).alias("target_exists")])? // add flag for matching target

Review Comment:
   This is an earlier commit, i just put the a projected column here as a 
placeholder, I will update the pr later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to