ethan-tyler commented on code in PR #20763:
URL: https://github.com/apache/datafusion/pull/20763#discussion_r3203047161


##########
datafusion/proto/proto/datafusion.proto:
##########
@@ -299,13 +299,63 @@ message DmlNode{
     INSERT_OVERWRITE = 4;
     INSERT_REPLACE = 5;
     TRUNCATE = 6;
+    MERGE_INTO = 7;
   }
   Type dml_type = 1;
   LogicalPlanNode input = 2;
   TableReference table_name = 3;
   LogicalPlanNode target = 5;
+  // Populated only when dml_type == MERGE_INTO.
+  MergeIntoOpNode merge_into = 6;

Review Comment:
   looks like you need to regen, you can do this via 
`datafusion/proto/regen.sh`. I ran it locally it boxes some nested fields in 
`prost.rs`, which means `to_proto` / `from_proto` will need updates.
   



##########
datafusion/proto/src/logical_plan/from_proto.rs:
##########
@@ -243,10 +245,137 @@ impl From<protobuf::dml_node::Type> for WriteOp {
             protobuf::dml_node::Type::InsertReplace => 
WriteOp::Insert(InsertOp::Replace),
             protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
             protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
+            // MERGE_INTO carries a payload (`MergeIntoOpNode`) that this
+            // tag-only conversion cannot read; callers must use
+            // [`parse_write_op`] for `DmlNode`s with a MergeInto payload.
+            protobuf::dml_node::Type::MergeInto => unreachable!(
+                "WriteOp::MergeInto requires the MergeIntoOpNode payload; use 
parse_write_op",

Review Comment:
   Doc comment is helpful but `unreachable!` in a public From impl is risky. 
Generally speaking, callers don't expect infallible From to panic on a valid 
variant. If this can't handle `MERGE_INTO` without the payload, I'd rather see 
`TryFrom` or drop the From impl and route through `parse_write_op`.



##########
datafusion/proto/src/logical_plan/from_proto.rs:
##########
@@ -243,10 +245,137 @@ impl From<protobuf::dml_node::Type> for WriteOp {
             protobuf::dml_node::Type::InsertReplace => 
WriteOp::Insert(InsertOp::Replace),
             protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
             protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
+            // MERGE_INTO carries a payload (`MergeIntoOpNode`) that this
+            // tag-only conversion cannot read; callers must use
+            // [`parse_write_op`] for `DmlNode`s with a MergeInto payload.
+            protobuf::dml_node::Type::MergeInto => unreachable!(
+                "WriteOp::MergeInto requires the MergeIntoOpNode payload; use 
parse_write_op",
+            ),
+        }
+    }
+}
+
+impl From<protobuf::merge_into_clause_node::Kind> for MergeIntoClauseKind {
+    fn from(k: protobuf::merge_into_clause_node::Kind) -> Self {
+        match k {
+            protobuf::merge_into_clause_node::Kind::Matched => {
+                MergeIntoClauseKind::Matched
+            }
+            protobuf::merge_into_clause_node::Kind::NotMatched => {
+                MergeIntoClauseKind::NotMatched
+            }
+            protobuf::merge_into_clause_node::Kind::NotMatchedByTarget => {
+                MergeIntoClauseKind::NotMatchedByTarget
+            }
+            protobuf::merge_into_clause_node::Kind::NotMatchedBySource => {
+                MergeIntoClauseKind::NotMatchedBySource
+            }
         }
     }
 }
 
+/// Reconstruct a [`WriteOp`] from a [`protobuf::DmlNode`], reading the
+/// `merge_into` payload when the type tag is `MergeInto`.
+pub fn parse_write_op(
+    node: &protobuf::DmlNode,
+    ctx: &TaskContext,
+    codec: &dyn LogicalExtensionCodec,
+) -> Result<WriteOp, Error> {
+    let typ = node.dml_type();
+    if matches!(typ, protobuf::dml_node::Type::MergeInto) {
+        let merge_into = node.merge_into.as_ref().ok_or_else(|| {
+            Error::General(
+                "DmlNode with MERGE_INTO type is missing the merge_into 
payload"
+                    .to_string(),
+            )
+        })?;
+        return Ok(WriteOp::MergeInto(parse_merge_into_op(
+            merge_into, ctx, codec,
+        )?));
+    }
+    Ok(typ.into())
+}
+
+fn parse_merge_into_op(
+    op: &protobuf::MergeIntoOpNode,
+    ctx: &TaskContext,
+    codec: &dyn LogicalExtensionCodec,
+) -> Result<MergeIntoOp, Error> {
+    let on = op.on.as_ref().ok_or_else(|| {
+        Error::General("MergeIntoOpNode is missing required `on` 
expression".to_string())
+    })?;
+    let on = parse_expr(on, ctx, codec)?;
+    let clauses = op
+        .clauses
+        .iter()
+        .map(|c| parse_merge_into_clause(c, ctx, codec))
+        .collect::<Result<Vec<_>, Error>>()?;
+    Ok(MergeIntoOp { on, clauses })
+}
+
+fn parse_merge_into_clause(
+    clause: &protobuf::MergeIntoClauseNode,
+    ctx: &TaskContext,
+    codec: &dyn LogicalExtensionCodec,
+) -> Result<MergeIntoClause, Error> {
+    let kind = protobuf::merge_into_clause_node::Kind::try_from(clause.kind)

Review Comment:
   Your defensive error paths are good but I would add some tests



##########
datafusion/expr/src/logical_plan/dml.rs:
##########
@@ -239,6 +239,8 @@ pub enum WriteOp {
     Ctas,
     /// `TRUNCATE` operation
     Truncate,
+    /// `MERGE INTO` operation
+    MergeInto(MergeIntoOp),

Review Comment:
   This adds a variant to a public exhaustive enum and flagged by CI. I would 
either document the intent accepted breaking change or mark it as 
`#[non_exhaustive]`.
   



##########
datafusion/expr/src/logical_plan/dml.rs:
##########
@@ -291,10 +294,94 @@ impl Display for InsertOp {
     }
 }
 
+/// Describes a MERGE INTO operation's parameters.
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub struct MergeIntoOp {
+    /// The join condition from `ON <expr>`.
+    pub on: Expr,
+    /// The WHEN clauses, in the order they appeared in the SQL.
+    pub clauses: Vec<MergeIntoClause>,
+}
+
+/// A single WHEN clause within a MERGE INTO statement.
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub struct MergeIntoClause {
+    /// Whether this fires on matched or unmatched rows.
+    pub kind: MergeIntoClauseKind,
+    /// Optional additional predicate (`AND <expr>`).
+    pub predicate: Option<Expr>,
+    /// The action to take.
+    pub action: MergeIntoAction,
+}
+
+/// Which rows a MERGE WHEN clause applies to.
+///
+/// Mirrors `sqlparser::ast::MergeClauseKind` so that the SQL spelling is
+/// preserved through the logical plan.
+///
+/// **Note on `NotMatched` vs `NotMatchedByTarget`:** these two variants are

Review Comment:
   nit: Docs say `NotMatched` and `NotMatchedByTarget` must be treated 
identically. A helper on `MergeIntoClauseKind` would save consumers from 
remembering that in every match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to