kosiew commented on code in PR #22988:
URL: https://github.com/apache/datafusion/pull/22988#discussion_r3466496112
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+ target_table_ref.clone(),
+ &target_table_source.schema(),
+ )?);
+
+ // 2. Plan the source (USING clause) as a LogicalPlan
+ let mut planner_context = PlannerContext::new();
+ let source_table_with_joins = TableWithJoins {
+ relation: source,
+ joins: vec![],
+ };
+ let source_plan =
+ self.plan_from_tables(vec![source_table_with_joins], &mut
planner_context)?;
+
+ // 3. Build a combined schema for resolving expressions in ON and WHEN
clauses
+ let combined_schema =
+ Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+ // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+ let on_expr = self.sql_to_expr(*on, &combined_schema, &mut
planner_context)?;
+
+ // 5. Convert each WHEN clause
+ let df_clauses = clauses
+ .into_iter()
+ .map(|clause| {
+ self.merge_clause_to_plan(
+ clause,
+ &combined_schema,
+ &target_schema,
+ &target_alias,
+ &mut planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // 6. Build the DmlStatement
Review Comment:
Small cleanup suggestion: this could return
`Ok(LogicalPlan::Dml(DmlStatement::new(...)))` directly instead of binding `let
plan` and then returning `Ok(plan)`.
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+ target_table_ref.clone(),
+ &target_table_source.schema(),
+ )?);
+
+ // 2. Plan the source (USING clause) as a LogicalPlan
+ let mut planner_context = PlannerContext::new();
+ let source_table_with_joins = TableWithJoins {
+ relation: source,
+ joins: vec![],
+ };
+ let source_plan =
+ self.plan_from_tables(vec![source_table_with_joins], &mut
planner_context)?;
+
+ // 3. Build a combined schema for resolving expressions in ON and WHEN
clauses
+ let combined_schema =
+ Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+ // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+ let on_expr = self.sql_to_expr(*on, &combined_schema, &mut
planner_context)?;
+
+ // 5. Convert each WHEN clause
+ let df_clauses = clauses
+ .into_iter()
+ .map(|clause| {
+ self.merge_clause_to_plan(
+ clause,
+ &combined_schema,
+ &target_schema,
+ &target_alias,
+ &mut planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // 6. Build the DmlStatement
+ let plan = LogicalPlan::Dml(DmlStatement::new(
+ target_table_ref,
+ target_table_source,
+ WriteOp::MergeInto(Box::new(MergeIntoOp {
+ on: on_expr,
+ clauses: df_clauses,
+ })),
+ Arc::new(source_plan),
+ ));
+
+ Ok(plan)
+ }
+
+ fn merge_clause_to_plan(
+ &self,
+ clause: ast::MergeClause,
+ combined_schema: &DFSchema,
+ target_schema: &DFSchema,
+ _target_alias: &Option<ast::TableAlias>,
+ planner_context: &mut PlannerContext,
+ ) -> Result<MergeIntoClause> {
+ let kind = match clause.clause_kind {
+ ast::MergeClauseKind::Matched => MergeIntoClauseKind::Matched,
+ ast::MergeClauseKind::NotMatched =>
MergeIntoClauseKind::NotMatched,
+ ast::MergeClauseKind::NotMatchedByTarget => {
+ MergeIntoClauseKind::NotMatchedByTarget
+ }
+ ast::MergeClauseKind::NotMatchedBySource => {
+ MergeIntoClauseKind::NotMatchedBySource
+ }
+ };
+
+ let predicate = clause
+ .predicate
+ .map(|p| self.sql_to_expr(p, combined_schema, planner_context))
+ .transpose()?;
+
+ let action = match clause.action {
+ ast::MergeAction::Update(update_expr) => {
+ let assignments = update_expr
+ .assignments
+ .into_iter()
+ .map(|assign| {
+ let col_name = match &assign.target {
+ AssignmentTarget::ColumnName(cols) => cols
+ .0
+ .iter()
+ .last()
+ .ok_or_else(|| plan_datafusion_err!("Empty
column id"))?
+ .as_ident()
+ .unwrap()
+ .value
+ .clone(),
+ _ => plan_err!("Tuples are not supported")?,
+ };
+ // Validate column exists in target
+ target_schema.field_with_unqualified_name(&col_name)?;
+ let value = self.sql_to_expr(
+ assign.value,
+ combined_schema,
+ planner_context,
+ )?;
+ Ok((col_name, value))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ MergeIntoAction::Update(assignments)
+ }
+ ast::MergeAction::Insert(insert_expr) => {
+ let columns: Vec<String> = insert_expr
Review Comment:
I think the `MERGE INSERT` actions need the same kind of validation that
regular INSERT planning does. At the moment, unknown columns, duplicate
columns, and mismatched `columns` / `values` lengths can all plan successfully.
Empty `columns` also does not appear to require values for every target column.
That can leave TableProviders receiving malformed actions instead of a
validated logical operation. Could we normalize and validate target column
names, reject duplicates, and verify that the value count matches either the
explicit columns or the full target schema?
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
+ target_table_ref.clone(),
+ &target_table_source.schema(),
+ )?);
+
+ // 2. Plan the source (USING clause) as a LogicalPlan
+ let mut planner_context = PlannerContext::new();
+ let source_table_with_joins = TableWithJoins {
+ relation: source,
+ joins: vec![],
+ };
+ let source_plan =
+ self.plan_from_tables(vec![source_table_with_joins], &mut
planner_context)?;
+
+ // 3. Build a combined schema for resolving expressions in ON and WHEN
clauses
+ let combined_schema =
+ Arc::new(target_schema.as_ref().join(source_plan.schema())?);
+
+ // 4. Convert the ON condition from sqlparser Expr to datafusion Expr
+ let on_expr = self.sql_to_expr(*on, &combined_schema, &mut
planner_context)?;
+
+ // 5. Convert each WHEN clause
+ let df_clauses = clauses
+ .into_iter()
+ .map(|clause| {
+ self.merge_clause_to_plan(
+ clause,
+ &combined_schema,
+ &target_schema,
+ &target_alias,
+ &mut planner_context,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // 6. Build the DmlStatement
+ let plan = LogicalPlan::Dml(DmlStatement::new(
+ target_table_ref,
+ target_table_source,
+ WriteOp::MergeInto(Box::new(MergeIntoOp {
+ on: on_expr,
+ clauses: df_clauses,
+ })),
+ Arc::new(source_plan),
+ ));
+
+ Ok(plan)
+ }
+
+ fn merge_clause_to_plan(
+ &self,
+ clause: ast::MergeClause,
+ combined_schema: &DFSchema,
+ target_schema: &DFSchema,
+ _target_alias: &Option<ast::TableAlias>,
+ planner_context: &mut PlannerContext,
+ ) -> Result<MergeIntoClause> {
+ let kind = match clause.clause_kind {
+ ast::MergeClauseKind::Matched => MergeIntoClauseKind::Matched,
+ ast::MergeClauseKind::NotMatched =>
MergeIntoClauseKind::NotMatched,
+ ast::MergeClauseKind::NotMatchedByTarget => {
+ MergeIntoClauseKind::NotMatchedByTarget
+ }
+ ast::MergeClauseKind::NotMatchedBySource => {
+ MergeIntoClauseKind::NotMatchedBySource
+ }
+ };
+
+ let predicate = clause
+ .predicate
+ .map(|p| self.sql_to_expr(p, combined_schema, planner_context))
+ .transpose()?;
+
+ let action = match clause.action {
+ ast::MergeAction::Update(update_expr) => {
+ let assignments = update_expr
+ .assignments
+ .into_iter()
+ .map(|assign| {
+ let col_name = match &assign.target {
Review Comment:
Small follow-up suggestion: the update-column extraction and insert-column
extraction both walk an `ObjectName` and take the last identifier. It might be
worth adding a small private helper that returns `Result<String>` and produces
a planner error for non-ident parts, rather than using `unwrap()` at each call
site.
##########
datafusion/sql/src/statement.rs:
##########
@@ -2407,6 +2411,189 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
Ok(plan)
}
+ fn merge_to_plan(&self, merge: ast::Merge) -> Result<LogicalPlan> {
+ let ast::Merge {
+ table,
+ source,
+ on,
+ clauses,
+ into: _,
+ merge_token: _,
+ optimizer_hints,
+ output,
+ } = merge;
+
+ if !optimizer_hints.is_empty() {
+ plan_err!("Optimizer hints not supported")?;
+ }
+
+ if output.is_some() {
+ return not_impl_err!("MERGE OUTPUT clause is not supported");
+ }
+
+ // 1. Resolve target table
+ let (target_table_name, target_alias) = match &table {
+ TableFactor::Table { name, alias, .. } => (name.clone(),
alias.clone()),
+ _ => plan_err!("Cannot MERGE INTO non-table relation!")?,
+ };
+ let target_table_ref =
self.object_name_to_table_reference(target_table_name)?;
+ let target_table_source = self
+ .context_provider
+ .get_table_source(target_table_ref.clone())?;
+ let target_schema = Arc::new(DFSchema::try_from_qualified_schema(
Review Comment:
I think target aliases need a bit more handling here. Right now `MERGE INTO
target AS t USING source AS s ON t.id = s.id ...` accepts the alias, but
`target_schema` is still qualified with the table name rather than `t`. That
means normal alias-qualified references in the `ON` clause or `WHEN AND`
expressions will not resolve.
Could we either reject target aliases explicitly for now, or qualify the
target side with the alias so this matches the rest of SQL planning?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]