This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 56dc7f8c2199 [SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2
56dc7f8c2199 is described below

commit 56dc7f8c2199bd3f2822004a1d9853188a9db465
Author: Huaxin Gao <[email protected]>
AuthorDate: Thu Dec 21 10:13:55 2023 +0800

    [SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2
    
    ### What changes were proposed in this pull request?
    Add `MergeInto` support in `DataFrameWriterV2`
    
    ### Why are the changes needed?
    Spark currently supports merge into sql statement. We want DataFrame to 
have the same support.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. This PR introduces new API like the following:
    
    ```
          spark.table("source")
            .mergeInto("target", $"source.id" === $"target.id")
            .whenNotMatched()
            .insertAll()
            .merge()
    ```
    
    ### How was this patch tested?
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44119 from huaxingao/mergeinto.
    
    Authored-by: Huaxin Gao <[email protected]>
    Signed-off-by: Jiaan Geng <[email protected]>
---
 .../src/main/resources/error/error-classes.json    |   6 +
 .../CheckConnectJvmClientCompatibility.scala       |  11 +-
 docs/sql-error-conditions.md                       |   6 +
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  32 +
 .../org/apache/spark/sql/MergeIntoWriter.scala     | 329 +++++++
 .../sql/connector/MergeIntoDataFrameSuite.scala    | 946 +++++++++++++++++++++
 6 files changed, 1329 insertions(+), 1 deletion(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 930042505379..df223f3298ef 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2846,6 +2846,12 @@
     ],
     "sqlState" : "42000"
   },
+  "NO_MERGE_ACTION_SPECIFIED" : {
+    "message" : [
+      "df.mergeInto needs to be followed by at least one of 
whenMatched/whenNotMatched/whenNotMatchedBySource."
+    ],
+    "sqlState" : "42K0E"
+  },
   "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA" : {
     "message" : [
       "Cannot find <catalystFieldPath> in Protobuf schema."
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a9b6f102a512..bd5ff6af7464 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -298,7 +298,16 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[MissingClassProblem](
         "org.apache.spark.sql.artifact.util.ArtifactUtils"),
       ProblemFilters.exclude[MissingClassProblem](
-        "org.apache.spark.sql.artifact.util.ArtifactUtils$"))
+        "org.apache.spark.sql.artifact.util.ArtifactUtils$"),
+
+      // MergeIntoWriter
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched$"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched$"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource$"))
     checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
   }
 
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 94c7c167e392..a1af6863913e 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1640,6 +1640,12 @@ Can't determine the default value for `<colName>` since 
it is not nullable and i
 
 No handler for UDAF '`<functionName>`'. Use sparkSession.udf.register(...) 
instead.
 
+### NO_MERGE_ACTION_SPECIFIED
+
+[SQLSTATE: 
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+df.mergeInto needs to be followed by at least one of 
whenMatched/whenNotMatched/whenNotMatchedBySource.
+
 ### NO_SQL_TYPE_IN_PROTOBUF_SCHEMA
 
 [SQLSTATE: 
42S22](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 2256f9db8c64..a3dc976647be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -4129,6 +4129,38 @@ class Dataset[T] private[sql](
     new DataFrameWriterV2[T](table, this)
   }
 
+  /**
+   * Merges a set of updates, insertions, and deletions based on a source 
table into
+   * a target table.
+   *
+   * Scala Examples:
+   * {{{
+   *   spark.table("source")
+   *     .mergeInto("target", $"source.id" === $"target.id")
+   *     .whenMatched($"salary" === 100)
+   *     .delete()
+   *     .whenNotMatched()
+   *     .insertAll()
+   *     .whenNotMatchedBySource($"salary" === 100)
+   *     .update(Map(
+   *       "salary" -> lit(200)
+   *     ))
+   *     .merge()
+   * }}}
+   *
+   * @group basic
+   * @since 4.0.0
+   */
+  def mergeInto(table: String, condition: Column): MergeIntoWriter[T] = {
+    if (isStreaming) {
+      logicalPlan.failAnalysis(
+        errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
+        messageParameters = Map("methodName" -> toSQLId("mergeInto")))
+    }
+
+    new MergeIntoWriter[T](table, this, condition)
+  }
+
   /**
    * Interface for saving the content of the streaming Dataset out into 
external storage.
    *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
new file mode 100644
index 000000000000..ca04b9bfc55f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkRuntimeException
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction, 
UpdateStarAction}
+import org.apache.spark.sql.functions.expr
+
+/**
+ * `MergeIntoWriter` provides methods to define and execute merge actions based
+ * on specified conditions.
+ *
+ * @tparam T the type of data in the Dataset.
+ * @param table the name of the target table for the merge operation.
+ * @param ds the source Dataset to merge into the target table.
+ * @param on the merge condition.
+ *
+ * @since 4.0.0
+ */
+@Experimental
+class MergeIntoWriter[T] private[sql] (table: String, ds: Dataset[T], on: 
Column) {
+
+  private val df: DataFrame = ds.toDF()
+
+  private val sparkSession = ds.sparkSession
+
+  private val tableName = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(table)
+
+  private val logicalPlan = df.queryExecution.logical
+
+  private[sql] var matchedActions: Seq[MergeAction] = Seq.empty[MergeAction]
+  private[sql] var notMatchedActions: Seq[MergeAction] = Seq.empty[MergeAction]
+  private[sql] var notMatchedBySourceActions: Seq[MergeAction] = 
Seq.empty[MergeAction]
+
+  /**
+   * Initialize a `WhenMatched` action without any condition.
+   *
+   * This `WhenMatched` action will be executed when a source row matches a 
target table row based
+   * on the merge condition.
+   *
+   * This `WhenMatched` can be followed by one of the following merge actions:
+   *   - `updateAll`: Update all the matched target table rows with source 
dataset rows.
+   *   - `update(Map)`: Update all the matched target table rows while 
changing only
+   *     a subset of columns based on the provided assignment.
+   *   - `delete`: Delete all target rows that have a match in the source 
table.
+   *
+   * @return a new `WhenMatched` object.
+   */
+  def whenMatched(): WhenMatched[T] = {
+    new WhenMatched[T](this, None)
+  }
+
+  /**
+   * Initialize a `WhenMatched` action with a condition.
+   *
+   * This `WhenMatched` action will be executed when a source row matches a 
target table row based
+   * on the merge condition and the specified `condition` is satisfied.
+   *
+   * This `WhenMatched` can be followed by one of the following merge actions:
+   *   - `updateAll`: Update all the matched target table rows with source 
dataset rows.
+   *   - `update(Map)`: Update all the matched target table rows while 
changing only
+   *     a subset of columns based on the provided assignment.
+   *   - `delete`: Delete all target rows that have a match in the source 
table.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenMatched` object configured with the specified 
condition.
+   */
+  def whenMatched(condition: Column): WhenMatched[T] = {
+    new WhenMatched[T](this, Some(condition.expr))
+  }
+
+  /**
+   * Initialize a `WhenNotMatched` action without any condition.
+   *
+   * This `WhenNotMatched` action will be executed when a source row does not 
match any target row
+   * based on the merge condition.
+   *
+   * This `WhenNotMatched` can be followed by one of the following merge 
actions:
+   *   - `insertAll`: Insert all rows from the source that are not already in 
the target table.
+   *   - `insert(Map)`: Insert all rows from the source that are not already 
in the target table,
+   *      with the specified columns based on the provided assignment.
+   *
+   * @return a new `WhenNotMatched` object.
+   */
+  def whenNotMatched(): WhenNotMatched[T] = {
+    new WhenNotMatched[T](this, None)
+  }
+
+  /**
+   * Initialize a `WhenNotMatched` action with a condition.
+   *
+   * This `WhenNotMatched` action will be executed when a source row does not 
match any target row
+   * based on the merge condition and the specified `condition` is satisfied.
+   *
+   * This `WhenNotMatched` can be followed by one of the following merge 
actions:
+   *   - `insertAll`: Insert all rows from the source that are not already in 
the target table.
+   *   - `insert(Map)`: Insert all rows from the source that are not already 
in the target table,
+   *     with the specified columns based on the provided assignment.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatched` object configured with the specified 
condition.
+   */
+  def whenNotMatched(condition: Column): WhenNotMatched[T] = {
+    new WhenNotMatched[T](this, Some(condition.expr))
+  }
+
+  /**
+   * Initialize a `WhenNotMatchedBySource` action without any condition.
+   *
+   * This `WhenNotMatchedBySource` action will be executed when a target row 
does not match any
+   * rows in the source table based on the merge condition.
+   *
+   * This `WhenNotMatchedBySource` can be followed by one of the following 
merge actions:
+   *   - `updateAll`: Update all the not matched target table rows with source 
dataset rows.
+   *   - `update(Map)`: Update all the not matched target table rows while 
changing only
+   *     the specified columns based on the provided assignment.
+   *   - `delete`: Delete all target rows that have no matches in the source 
table.
+   *
+   * @return a new `WhenNotMatchedBySource` object.
+   */
+  def whenNotMatchedBySource(): WhenNotMatchedBySource[T] = {
+    new WhenNotMatchedBySource[T](this, None)
+  }
+
+  /**
+   * Initialize a `WhenNotMatchedBySource` action with a condition.
+   *
+   * This `WhenNotMatchedBySource` action will be executed when a target row 
does not match any
+   * rows in the source table based on the merge condition and the specified 
`condition`
+   * is satisfied.
+   *
+   * This `WhenNotMatchedBySource` can be followed by one of the following 
merge actions:
+   *   - `updateAll`: Update all the not matched target table rows with source 
dataset rows.
+   *   - `update(Map)`: Update all the not matched target table rows while 
changing only
+   *     the specified columns based on the provided assignment.
+   *   - `delete`: Delete all target rows that have no matches in the source 
table.
+   *
+   * @param condition a `Column` representing the condition to be evaluated 
for the action.
+   * @return a new `WhenNotMatchedBySource` object configured with the 
specified condition.
+   */
+  def whenNotMatchedBySource(condition: Column): WhenNotMatchedBySource[T] = {
+    new WhenNotMatchedBySource[T](this, Some(condition.expr))
+  }
+
+  /**
+   * Executes the merge operation.
+   */
+  def merge(): Unit = {
+    if (matchedActions.isEmpty && notMatchedActions.isEmpty && 
notMatchedBySourceActions.isEmpty) {
+      throw new SparkRuntimeException(
+        errorClass = "NO_MERGE_ACTION_SPECIFIED",
+        messageParameters = Map.empty)
+    }
+
+    val merge = MergeIntoTable(
+      UnresolvedRelation(tableName),
+      logicalPlan,
+      on.expr,
+      matchedActions,
+      notMatchedActions,
+      notMatchedBySourceActions)
+    val qe = sparkSession.sessionState.executePlan(merge)
+    qe.assertCommandExecuted()
+  }
+
+  private[sql] def withNewMatchedAction(action: MergeAction): 
MergeIntoWriter[T] = {
+    this.matchedActions = this.matchedActions :+ action
+    this
+  }
+
+  private[sql] def withNewNotMatchedAction(action: MergeAction): 
MergeIntoWriter[T] = {
+    this.notMatchedActions = this.notMatchedActions :+ action
+    this
+  }
+
+  private[sql] def withNewNotMatchedBySourceAction(action: MergeAction): 
MergeIntoWriter[T] = {
+    this.notMatchedBySourceActions = this.notMatchedBySourceActions :+ action
+    this
+  }
+}
+
+/**
+ * A class for defining actions to be taken when matching rows in a DataFrame 
during
+ * a merge operation.
+ *
+ * @param mergeIntoWriter   The MergeIntoWriter instance responsible for 
writing data to a
+ *                          target DataFrame.
+ * @param condition         An optional condition Expression that specifies 
when the actions
+ *                          should be applied.
+ *                          If the condition is None, the actions will be 
applied to all matched
+ *                          rows.
+ *
+ * @tparam T                The type of data in the MergeIntoWriter.
+ */
+case class WhenMatched[T] private[sql](
+    mergeIntoWriter: MergeIntoWriter[T],
+    condition: Option[Expression]) {
+  /**
+   * Specifies an action to update all matched rows in the DataFrame.
+   *
+   * @return The MergeIntoWriter instance with the update all action 
configured.
+   */
+  def updateAll(): MergeIntoWriter[T] = {
+    mergeIntoWriter.withNewMatchedAction(UpdateStarAction(condition))
+  }
+
+  /**
+   * Specifies an action to update matched rows in the DataFrame with the 
provided column
+   * assignments.
+   *
+   * @param map A Map of column names to Column expressions representing the 
updates to be applied.
+   * @return The MergeIntoWriter instance with the update action configured.
+   */
+  def update(map: Map[String, Column]): MergeIntoWriter[T] = {
+    mergeIntoWriter.withNewMatchedAction(
+      UpdateAction(condition, map.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq))
+  }
+
+  /**
+   * Specifies an action to delete matched rows from the DataFrame.
+   *
+   * @return The MergeIntoWriter instance with the delete action configured.
+   */
+  def delete(): MergeIntoWriter[T] = {
+    mergeIntoWriter.withNewMatchedAction(DeleteAction(condition))
+  }
+}
+
+/**
+ * A class for defining actions to be taken when no matching rows are found in 
a DataFrame
+ * during a merge operation.
+ *
+ * @param MergeIntoWriter   The MergeIntoWriter instance responsible for 
writing data to a
+ *                          target DataFrame.
+ * @param condition         An optional condition Expression that specifies 
when the actions
+ *                          defined in this configuration should be applied.
+ *                          If the condition is None, the actions will be 
applied when there
+ *                          are no matching rows.
+ * @tparam T                The type of data in the MergeIntoWriter.
+ */
+case class WhenNotMatched[T] private[sql](
+    mergeIntoWriter: MergeIntoWriter[T],
+    condition: Option[Expression]) {
+
+  /**
+   * Specifies an action to insert all non-matched rows into the DataFrame.
+   *
+   * @return The MergeIntoWriter instance with the insert all action 
configured.
+   */
+  def insertAll(): MergeIntoWriter[T] = {
+    mergeIntoWriter.withNewNotMatchedAction(InsertStarAction(condition))
+  }
+
+  /**
+   * Specifies an action to insert non-matched rows into the DataFrame with 
the provided
+   * column assignments.
+   *
+   * @param map A Map of column names to Column expressions representing the 
values to be inserted.
+   * @return The MergeIntoWriter instance with the insert action configured.
+   */
+  def insert(map: Map[String, Column]): MergeIntoWriter[T] = {
+    mergeIntoWriter.withNewNotMatchedAction(
+      InsertAction(condition, map.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq))
+  }
+}
+
+
+/**
+ * A class for defining actions to be performed when there is no match by 
source
+ * during a merge operation in a MergeIntoWriter.
+ *
+ * @param MergeIntoWriter the MergeIntoWriter instance to which the merge 
actions will be applied.
+ * @param condition       an optional condition to be used with the merge 
actions.
+ * @tparam T the type parameter for the MergeIntoWriter.
+ */
+case class WhenNotMatchedBySource[T] private[sql](
+    mergeIntoWriter: MergeIntoWriter[T],
+    condition: Option[Expression]) {
+
+  /**
+   * Specifies an action to update all non-matched rows in the target 
DataFrame when
+   * not matched by the source.
+   *
+   * @return The MergeIntoWriter instance with the update all action 
configured.
+   */
+  def updateAll(): MergeIntoWriter[T] = {
+    
mergeIntoWriter.withNewNotMatchedBySourceAction(UpdateStarAction(condition))
+  }
+
+  /**
+   * Specifies an action to update non-matched rows in the target DataFrame 
with the provided
+   * column assignments when not matched by the source.
+   *
+   * @param map A Map of column names to Column expressions representing the 
updates to be applied.
+   * @return The MergeIntoWriter instance with the update action configured.
+   */
+  def update(map: Map[String, Column]): MergeIntoWriter[T] = {
+    mergeIntoWriter.withNewNotMatchedBySourceAction(
+      UpdateAction(condition, map.map(x => Assignment(expr(x._1).expr, 
x._2.expr)).toSeq))
+  }
+
+  /**
+   * Specifies an action to delete non-matched rows from the target DataFrame 
when not matched by
+   * the source.
+   *
+   * @return The MergeIntoWriter instance with the delete action configured.
+   */
+  def delete(): MergeIntoWriter[T] = {
+    mergeIntoWriter.withNewNotMatchedBySourceAction(DeleteAction(condition))
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala
new file mode 100644
index 000000000000..ed44111c81d2
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala
@@ -0,0 +1,946 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions._
+
+class MergeIntoDataFrameSuite extends RowLevelOperationSuiteBase {
+
+  import testImplicits._
+
+  test("merge into empty table with NOT MATCHED clause") {
+    withTempView("source") {
+      createTable("pk INT NOT NULL, salary INT, dep STRING")
+
+      val sourceRows = Seq(
+        (1, 100, "hr"),
+        (2, 200, "finance"),
+        (3, 300, "hr"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatched()
+        .insertAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 100, "hr"), // insert
+          Row(2, 200, "finance"), // insert
+          Row(3, 300, "hr"))) // insert
+    }
+  }
+
+  test("merge into empty table with conditional NOT MATCHED clause") {
+    withTempView("source") {
+      createTable("pk INT NOT NULL, salary INT, dep STRING")
+
+      val sourceRows = Seq(
+        (1, 100, "hr"),
+        (2, 200, "finance"),
+        (3, 300, "hr"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatched($"source.pk" >= 2)
+        .insertAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(2, 200, "finance"), // insert
+          Row(3, 300, "hr"))) // insert
+    }
+  }
+
+  test("merge into with conditional WHEN MATCHED clause (update)") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "corrupted" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 100, "software"),
+        (2, 200, "finance"),
+        (3, 300, "software"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched($"source.pk" === 2)
+        .updateAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 100, "hr"), // unchanged
+          Row(2, 200, "finance"))) // update
+    }
+  }
+
+  test("merge into with conditional WHEN MATCHED clause (delete)") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "corrupted" }
+          |""".stripMargin)
+
+      Seq(1, 2, 3).toDF("pk").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched(col(tableNameAsString + ".salary") === 200)
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(Row(1, 100, "hr"))) // unchanged
+    }
+  }
+
+  test("merge into with assignments to primary key in NOT MATCHED BY SOURCE") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "finance" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 100, "software"),
+        (5, 500, "finance"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+           tableNameAsString + ".salary" -> lit(-1)
+        ))
+        .whenNotMatchedBySource()
+        .update(Map(
+          tableNameAsString + ".pk" -> lit(-1)
+         ))
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, -1, "hr"), // update (matched)
+          Row(-1, 200, "finance"))) // update (not matched by source)
+    }
+  }
+
+  test("merge into with assignments to primary key in MATCHED") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "finance" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 100, "software"),
+        (5, 500, "finance"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+          tableNameAsString + ".pk" -> lit(-1)
+        ))
+        .whenNotMatchedBySource()
+        .update(Map(
+          tableNameAsString + ".salary" -> lit(-1)
+        ))
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(-1, 100, "hr"), // update (matched)
+          Row(2, -1, "finance"))) // update (not matched by source)
+    }
+  }
+
+  test("merge with all types of clauses") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |{ "pk": 4, "salary": 400, "dep": "hr" }
+          |{ "pk": 5, "salary": 500, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceDF = Seq(3, 4, 5, 6).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+          tableNameAsString + ".salary" -> 
col("cat.ns1.test_table.salary").plus(lit(1))
+        ))
+        .whenNotMatched()
+        .insert(Map(
+          "pk" -> col("source.pk"),
+          "salary" -> lit(0),
+          "dep" -> lit("new")
+        ))
+        .whenNotMatchedBySource()
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(3, 301, "hr"), // update
+          Row(4, 401, "hr"), // update
+          Row(5, 501, "hr"), // update
+          Row(6, 0, "new"))) // insert
+    }
+  }
+
+  test("merge with all types of clauses (update and insert star)") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 101, "support"),
+        (2, 201, "support"),
+        (4, 401, "support"),
+        (5, 501, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched(col(tableNameAsString + ".pk") === 1)
+        .updateAll()
+        .whenNotMatched($"source.pk" === 4)
+        .insertAll()
+        .whenNotMatchedBySource(
+          col(tableNameAsString + ".pk") === col(tableNameAsString + 
".salary") / 100)
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 101, "support"), // update
+          Row(2, 200, "software"), // unchanged
+          Row(4, 401, "support"))) // insert
+    }
+  }
+
+  test("merge with all types of conditional clauses") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |{ "pk": 4, "salary": 400, "dep": "hr" }
+          |{ "pk": 5, "salary": 500, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceDF = Seq(3, 4, 5, 6, 7).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched(col(tableNameAsString + ".pk") === 4)
+        .update(Map(
+          tableNameAsString + ".salary" -> col(tableNameAsString + 
".salary").plus(lit(1))
+        ))
+        .whenNotMatched($"pk" === 6)
+        .insert(Map(
+          "pk" -> col("source.pk"),
+          "salary" -> lit(0),
+          "dep" -> lit("new")
+        ))
+        .whenNotMatchedBySource($"salary" === 100)
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(2, 200, "software"), // unchanged
+          Row(3, 300, "hr"), // unchanged
+          Row(4, 401, "hr"), // update
+          Row(5, 500, "hr"), // unchanged
+          Row(6, 0, "new"))) // insert
+    }
+  }
+
+  test("merge with one NOT MATCHED BY SOURCE clause") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceDF = Seq(1, 2).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatchedBySource()
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 100, "hr"), // unchanged
+          Row(2, 200, "software"))) // unchanged
+    }
+  }
+
+  test("merge with one conditional NOT MATCHED BY SOURCE clause") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceDF = Seq(2).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatchedBySource($"salary" === 100)
+        .update(Map(
+          "salary" -> lit(-1)
+        ))
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, -1, "hr"), // updated
+          Row(2, 200, "software"), // unchanged
+          Row(3, 300, "hr"))) // unchanged
+    }
+  }
+
+  test("merge with MATCHED and NOT MATCHED BY SOURCE clauses") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceDF = Seq(2).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .delete()
+        .whenNotMatchedBySource($"salary" === 100)
+        .update(Map(
+          "salary" -> lit(-1)
+        ))
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, -1, "hr"), // updated
+          Row(3, 300, "hr"))) // unchanged
+    }
+  }
+
+  test("merge with NOT MATCHED and NOT MATCHED BY SOURCE clauses") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceDF = Seq(2, 3, 4).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatched()
+        .insert(Map(
+          "pk" -> col("pk"),
+          "salary" -> lit(-1),
+          "dep" -> lit("new")
+        ))
+        .whenNotMatchedBySource()
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(2, 200, "software"), // unchanged
+          Row(3, 300, "hr"), // unchanged
+          Row(4, -1, "new"))) // insert
+    }
+  }
+
+  test("merge with multiple NOT MATCHED BY SOURCE clauses") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceDF = Seq(5, 6, 7).toDF("pk")
+      sourceDF.createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatchedBySource($"salary" === 100)
+        .update(Map(
+          "salary" -> col("salary").plus(lit(1))
+        ))
+        .whenNotMatchedBySource($"salary" === 300)
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 101, "hr"), // update
+          Row(2, 200, "software"))) // unchanged
+    }
+  }
+
+  test("merge with MATCHED BY SOURCE clause and NULL values") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+        """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "id": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (2, 2, 201, "support"),
+        (1, 1, 101, "support"),
+        (3, 3, 301, "support"))
+      sourceRows.toDF("pk", "id", "salary", 
"dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString,
+          $"source.id" === col(tableNameAsString + ".id") && 
(col(tableNameAsString + ".id") < 3))
+        .whenMatched()
+        .updateAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, null, 100, "hr"), // unchanged
+          Row(2, 2, 201, "support"), // update
+          Row(3, 3, 300, "hr"))) // unchanged
+    }
+  }
+
+  test("merge cardinality check with unconditional MATCHED clause (delete)") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 6, "salary": 600, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 101, "support"),
+        (1, 102, "support"),
+        (2, 201, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(Row(6, 600, "software"))) // unchanged
+    }
+  }
+
+  test("merge cardinality check with only NOT MATCHED clauses") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 6, "salary": 600, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 101, "support"),
+        (1, 102, "support"),
+        (2, 201, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatched()
+        .insertAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 100, "hr"), // unchanged
+          Row(2, 201, "support"), // insert
+          Row(6, 600, "software"))) // unchanged
+    }
+  }
+
+  test("merge with extra columns in source") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |{ "pk": 3, "salary": 300, "dep": "hr" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, "smth", 101, "support"),
+        (2, "smth", 201, "support"),
+        (4, "smth", 401, "support"))
+      sourceRows.toDF("pk", "extra", "salary", 
"dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+          "salary" -> col("source.salary").plus(lit(1))
+        ))
+        .whenNotMatched()
+        .insert(Map(
+          "pk" -> col("source.pk"),
+          "salary" -> col("source.salary"),
+          "dep" -> col("source.dep")
+        ))
+        .whenNotMatchedBySource()
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 102, "hr"), // update
+          Row(2, 202, "software"), // update
+          Row(4, 401, "support"))) // insert
+    }
+  }
+
+  test("merge with NULL values in target and source") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+        """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (5, None, 501, "support"),
+        (6, Some(6), 601, "support"))
+      sourceRows.toDF("pk", "id", "salary", 
"dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .updateAll()
+        .whenNotMatched()
+        .insertAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, null, 100, "hr"), // unchanged
+          Row(2, 2, 200, "software"), // unchanged
+          Row(5, null, 501, "support"), // insert
+          Row(6, 6, 601, "support"))) // insert
+    }
+  }
+
+  test("merge with <=>") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+        """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (5, None, 501, "support"),
+        (6, Some(6), 601, "support"))
+      sourceRows.toDF("pk", "id", "salary", 
"dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.id" <=> col(tableNameAsString + 
".id"))
+        .whenMatched()
+        .updateAll()
+        .whenNotMatched()
+        .insertAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(2, 2, 200, "software"), // unchanged
+          Row(5, null, 501, "support"), // updated
+          Row(6, 6, 601, "support"))) // insert
+    }
+  }
+
+  test("merge with NULL ON condition") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+        """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (5, None, 501, "support"),
+        (6, Some(2), 201, "support"))
+      sourceRows.toDF("pk", "id", "salary", 
"dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk") && lit(null))
+        .whenMatched()
+        .update(Map(
+          "salary" -> col("source.salary")
+        ))
+        .whenNotMatched()
+        .insertAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, null, 100, "hr"), // unchanged
+          Row(2, 2, 200, "software"), // unchanged
+          Row(5, null, 501, "support"), // new
+          Row(6, 2, 201, "support"))) // new
+    }
+  }
+
+  test("merge with NULL clause conditions") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 101, "support"),
+        (3, 301, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched(lit(null))
+        .update(Map(
+          "salary" -> col("source.salary")
+        ))
+        .whenNotMatched(lit(null))
+        .insertAll()
+        .whenNotMatchedBySource(lit(null))
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 100, "hr"), // unchanged
+          Row(2, 200, "software"))) // unchanged
+    }
+  }
+
+  test("merge with multiple matching clauses") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        (1, 101, "support"),
+        (3, 301, "support"))
+      sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched(col(tableNameAsString + ".pk") === 1)
+        .update(Map(
+          "salary" -> col(tableNameAsString + ".salary").plus(lit(5))
+        ))
+        .whenMatched(col(tableNameAsString + ".salary") === 100)
+        .update(Map(
+          "salary" -> col(tableNameAsString + ".salary").plus(lit(2))
+        ))
+        .whenNotMatchedBySource(col(tableNameAsString + ".pk") === 2)
+        .update(Map(
+          "salary" -> col("salary").minus(lit(1))
+        ))
+        .whenNotMatchedBySource(col(tableNameAsString + ".salary") === 200)
+        .delete()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 105, "hr"), // updated (matched)
+          Row(2, 199, "software"))) // updated (not matched by source)
+    }
+  }
+
+  test("merge resolves and aligns columns by name") {
+    withTempView("source") {
+      createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+        """{ "pk": 1, "salary": 100, "dep": "hr" }
+          |{ "pk": 2, "salary": 200, "dep": "software" }
+          |""".stripMargin)
+
+      val sourceRows = Seq(
+        ("support", 1, 101),
+        ("support", 3, 301))
+      sourceRows.toDF("dep", "pk", "salary").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .updateAll()
+        .whenNotMatched()
+        .insertAll()
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, 101, "support"), // update
+          Row(2, 200, "software"), // unchanged
+          Row(3, 301, "support"))) // insert
+    }
+  }
+
+  test("merge refreshed relation cache") {
+    withTempView("temp", "source") {
+      withCache("temp") {
+        createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+          """{ "pk": 1, "salary": 100, "dep": "hr" }
+            |{ "pk": 2, "salary": 100, "dep": "software" }
+            |{ "pk": 3, "salary": 300, "dep": "hr" }
+            |""".stripMargin)
+
+        // define a view on top of the table
+        val query = sql(s"SELECT * FROM $tableNameAsString WHERE salary = 100")
+        query.createOrReplaceTempView("temp")
+
+        // cache the view
+        sql("CACHE TABLE temp")
+
+        // verify the view returns expected results
+        checkAnswer(
+          sql("SELECT * FROM temp"),
+          Row(1, 100, "hr") :: Row(2, 100, "software") :: Nil)
+
+        val sourceRows = Seq(
+          ("support", 1, 101),
+          ("support", 3, 301))
+        sourceRows.toDF("dep", "pk", 
"salary").createOrReplaceTempView("source")
+
+        // merge changes into the table
+        spark.table("source")
+          .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString 
+ ".pk"))
+          .whenMatched()
+          .updateAll()
+          .whenNotMatched()
+          .insertAll()
+          .merge()
+
+        // verify the merge was successful
+        checkAnswer(
+          sql(s"SELECT * FROM $tableNameAsString"),
+          Seq(
+            Row(1, 101, "support"), // update
+            Row(2, 100, "software"), // unchanged
+            Row(3, 301, "support"))) // insert
+
+        // verify the view reflects the changes in the table
+        checkAnswer(sql("SELECT * FROM temp"), Row(2, 100, "software") :: Nil)
+      }
+    }
+  }
+
+  test("merge with updates to nested struct fields in MATCHED clauses") {
+    withTempView("source") {
+      createAndInitTable(
+        s"""pk INT NOT NULL,
+           |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, 
STRING>>>,
+           |dep STRING""".stripMargin,
+        """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } } 
}, "dep": "hr" }""")
+
+      Seq(1, 3).toDF("pk").createOrReplaceTempView("source")
+
+      // update primitive, array, map columns inside a struct
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+          "s.c1" -> lit(-1),
+          "s.c2.m" -> map(lit('k'), lit('v')),
+          "s.c2.a" -> array(lit(-1))
+        ))
+        .merge()
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"))), "hr")))
+
+      // set primitive, array, map columns to NULL (proper casts should be in 
inserted)
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+          "s.c1" -> lit(null),
+          "s.c2" -> lit(null)
+        ))
+        .merge()
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Row(1, Row(null, null), "hr") :: Nil)
+
+      // assign an entire struct
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+          "s" -> struct(
+            lit(1).as("c1"),
+            struct(array(lit(1)).as("a"), lit(null).as("m")).as("c2"))
+        ))
+        .merge()
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Row(1, Row(1, Row(Seq(1), null)), "hr") :: Nil)
+    }
+  }
+
+  test("merge with updates to nested struct fields in NOT MATCHED BY SOURCE 
clauses") {
+    withTempView("source") {
+      createAndInitTable(
+        s"""pk INT NOT NULL,
+           |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING, 
STRING>>>,
+           |dep STRING""".stripMargin,
+        """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } } 
}, "dep": "hr" }""")
+
+      Seq(2, 4).toDF("pk").createOrReplaceTempView("source")
+
+      // update primitive, array, map columns inside a struct
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatchedBySource()
+        .update(Map(
+          "s.c1" -> lit(-1),
+          "s.c2.m" -> map(lit('k'), lit('v')),
+          "s.c2.a" -> array(lit(-1))
+        ))
+        .merge()
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"))), "hr")))
+
+      // set primitive, array, map columns to NULL (proper casts should be in 
inserted)
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatchedBySource()
+        .update(Map(
+          "s.c1" -> lit(null),
+          "s.c2" -> lit(null)
+        ))
+        .merge()
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Row(1, Row(null, null), "hr") :: Nil)
+
+      // assign an entire struct
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenNotMatchedBySource()
+        .update(Map(
+          "s" -> struct(
+            lit(1).as("c1"),
+            struct(array(lit(1)).as("a"), lit(null).as("m")).as("c2"))
+        ))
+        .merge()
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Row(1, Row(1, Row(Seq(1), null)), "hr") :: Nil)
+    }
+  }
+
+  test("merge with char/varchar columns") {
+    withTempView("source") {
+      createTable("pk INT NOT NULL, s STRUCT<n_c: CHAR(3), n_vc: VARCHAR(5)>, 
dep STRING")
+
+      append("pk INT NOT NULL, s STRUCT<n_c: STRING, n_vc: STRING>, dep 
STRING",
+        """{ "pk": 1, "s": { "n_c": "aaa", "n_vc": "aaa" }, "dep": "hr" }
+          |{ "pk": 2, "s": { "n_c": "bbb", "n_vc": "bbb" }, "dep": "software" }
+          |{ "pk": 3, "s": { "n_c": "ccc", "n_vc": "ccc" }, "dep": "hr" }
+          |""".stripMargin)
+
+      Seq(1, 2, 4).toDF("pk").createOrReplaceTempView("source")
+
+      spark.table("source")
+        .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString + 
".pk"))
+        .whenMatched()
+        .update(Map(
+          "s.n_c" -> lit("x1"),
+          "s.n_vc" -> lit("x2")
+        ))
+        .whenNotMatchedBySource()
+        .update(Map(
+          "s.n_c" -> lit("y1"),
+          "s.n_vc" -> lit("y2")
+        ))
+        .merge()
+
+      checkAnswer(
+        sql(s"SELECT * FROM $tableNameAsString"),
+        Seq(
+          Row(1, Row("x1 ", "x2"), "hr"), // update (matched)
+          Row(2, Row("x1 ", "x2"), "software"), // update (matched)
+          Row(3, Row("y1 ", "y2"), "hr"))) // update (not matched by source)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to